Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#test scripts
emulate.ts
test-database.ts
script.*.ts

# Database files
*.db
Expand Down
8 changes: 4 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import "dotenv/config";
import { handleUplinks } from "./logic/mqtt";
import { Request, Response } from "express";
import { app, m3ter } from "./logic/context";
import { app, m3ter, rollup } from "./logic/context";
import setupDatabase, {
getAllMeterRecords,
saveMeter,
Expand All @@ -13,8 +13,6 @@ handleUplinks();
// Initialize database tables and jobs
setupDatabase();

console.log("[server]: Server is starting...", process.env); // todo: remove

app.get("/", async (req: Request, res: Response) => {
const m3ters = getAllMeterRecords();
res.render("index", { m3ters });
Expand All @@ -25,10 +23,12 @@ app.post("/", async (req: Request, res: Response) => {
try {
const tokenId = (await req.body).tokenId;
const publicKey = await m3ter.publicKey(tokenId);
const latestNonce = await rollup.nonce(tokenId);
saveMeter({
publicKey,
tokenId,
latestNonce: 0, // Initialize latestNonce to 0
latestNonce: Number(latestNonce),
devEui: (await req.body).devEui ?? null,
});
} catch (err) {
console.error(err);
Expand Down
7 changes: 5 additions & 2 deletions src/logic/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ app.listen(port, () => {
const provider = new JsonRpcProvider(process.env.MAINNET_RPC);

export const m3ter = new Contract(
"0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F",
["function publicKey(uint256) view returns (bytes32)"],
"0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F",
[
"function publicKey(uint256) view returns (bytes32)",
"function tokenID(bytes32) view returns (uint256)",
],
provider
);

Expand Down
76 changes: 59 additions & 17 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ import { connect } from "mqtt";
import { enqueue } from "./grpc";
import { interact } from "./arweave";
import { encode } from "./encode";
import { getMeterByPublicKey, insertTransaction, updateMeterNonce } from "../store/sqlite";
import { m3ter as m3terContract, rollup as rollupContract } from "./context";
import {
getAllMeterRecords,
getMeterByDevEui,
getMeterByPublicKey,
insertTransaction,
saveMeter,
updateMeterDevEui,
updateMeterNonce,
} from "../store/sqlite";
import { State, TransactionRecord } from "../types";
import { getProverURL, sendPendingTransactionsToProver } from "./verify";
import { decodePayload } from "./decode";
Expand Down Expand Up @@ -36,30 +45,63 @@ export function handleUplinks() {
});
}

async function handleMessage(blob: Buffer) {
export async function handleMessage(blob: Buffer) {
try {
const message = JSON.parse(blob.toString());

console.log("[info] Received uplink from device:", JSON.stringify(message));

const payload = Buffer.from(message["data"], "hex");
const payload = Buffer.from(message["data"], "base64");
// encode transaction into standard format (payload is hex string)
// format: nonce | energy | signature | voltage | device_id | longitude | latitude
const transactionHex = payload;
const decoded = decodePayload(transactionHex);
const publicKey = decoded.extensions.deviceId;
let publicKey = decoded.extensions.deviceId;

console.log("[info] Decoded payload:", decoded);

if (!publicKey) {
throw new Error("Invalid Public Key");
if (publicKey) {
// save public key with device EUI mapping if not already saved
const existingMeter = getMeterByPublicKey(`0x${publicKey}`);

if (!existingMeter) {
const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`));
if (tokenId === 0) {
throw new Error("Token ID not found for public key: " + publicKey);
}

const latestNonce = Number(await rollupContract.nonce(tokenId));

// save new meter with devEui
const newMeter = {
publicKey: `0x${publicKey}`,
devEui: message["deviceInfo"]["devEui"],
tokenId,
latestNonce,
};
saveMeter(newMeter);
console.log("[info] Saved new meter:", newMeter);
} else if (existingMeter && !existingMeter.devEui) {
// update existing meter with devEui if not already set
updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]);
console.log("[info] Updated meter with DevEui:", existingMeter.tokenId);
}
} else {
// try to find meter by DevEui
const devEui = message["deviceInfo"]["devEui"];
const meterByDevEui = getMeterByDevEui(devEui);

if (!meterByDevEui) {
throw new Error("Device EUI not associated with any meter: " + devEui);
}

publicKey = meterByDevEui.publicKey.replace("0x", "");
}

const m3ter = getMeterByPublicKey(publicKey ?? "");
const m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null;

if (!m3ter) {
console.error("Meter not found for public key:", publicKey);
return;
throw new Error("Meter not found for public key: " + publicKey);
}

console.log(
Expand All @@ -74,10 +116,7 @@ async function handleMessage(blob: Buffer) {
// if device nonce is correct
const expectedNonce = m3ter.latestNonce + 1;

let state;
if (decoded.nonce === expectedNonce) {
state = { is_on: true };

console.log("[info] Nonce is valid:", decoded.nonce);
// Upload to arweave
await interact(m3ter.tokenId, decoded);
Expand Down Expand Up @@ -118,13 +157,16 @@ async function handleMessage(blob: Buffer) {
}
}

const state =
decoded.nonce === m3ter.latestNonce + 1 || decoded.nonce === 0
? { is_on: true }
: { nonce: m3ter.latestNonce, is_on: true };

console.log("[info] Enqueuing state:", state);

enqueue(
message["deviceInfo"]["devEui"],
encode(
(state ? state : { nonce: expectedNonce, is_on: true }) as State,
decoded.extensions.latitude ?? 0,
decoded.extensions.longitude ?? 0
)
encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0)
);
} catch (error) {
console.log(error);
Expand Down
57 changes: 49 additions & 8 deletions src/store/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import { MeterRecord, TransactionRecord } from "../types";
let db: DatabaseType;
let insertMeterQuery: DatabaseStatementType;
let getMeterByPublicKeyQuery: DatabaseStatementType;
let getMeterByDevEuiQuery: DatabaseStatementType;
let getMeterByTokenIdQuery: DatabaseStatementType;
let getAllMetersQuery: DatabaseStatementType;
let deleteMeterByPublicKeyQuery: DatabaseStatementType;
let updateMeterNonceQuery: DatabaseStatementType;
let updateMeterDevEuiQuery: DatabaseStatementType;
// transaction queries
let createTransactionQuery: DatabaseStatementType;
let getTransactionByNonceQuery: DatabaseStatementType;
Expand Down Expand Up @@ -68,8 +70,11 @@ function initializeMetersTable() {
return db.exec(`
CREATE TABLE IF NOT EXISTS meters (
publicKey TEXT,
devEui TEXT,
tokenId INTEGER,
latestNonce INTEGER DEFAULT -1
latestNonce INTEGER DEFAULT -1,

UNIQUE(publicKey, tokenId)
)
`);
}
Expand All @@ -78,20 +83,24 @@ function initializeMetersTable() {
function prepareQueries() {
// meter queries
insertMeterQuery = db.prepare(`
INSERT OR REPLACE INTO meters (publicKey, tokenId, latestNonce)
VALUES (@publicKey, @tokenId, @latestNonce)
INSERT OR REPLACE INTO meters (publicKey, devEui, tokenId, latestNonce)
VALUES (@publicKey, @devEui, @tokenId, @latestNonce)
`);

getMeterByPublicKeyQuery = db.prepare(`
SELECT publicKey, tokenId, latestNonce FROM meters WHERE publicKey = ?
SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE publicKey = ?
`);

getMeterByDevEuiQuery = db.prepare(`
SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE devEui = ?
`);

getMeterByTokenIdQuery = db.prepare(`
SELECT publicKey, tokenId, latestNonce FROM meters WHERE tokenId = ?
SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE tokenId = ?
`);

getAllMetersQuery = db.prepare(`
SELECT publicKey, tokenId, latestNonce FROM meters
SELECT publicKey, devEui, tokenId, latestNonce FROM meters
`);

deleteMeterByPublicKeyQuery = db.prepare(`
Expand All @@ -102,6 +111,10 @@ function prepareQueries() {
UPDATE meters SET latestNonce = ? WHERE publicKey = ?
`);

updateMeterDevEuiQuery = db.prepare(`
UPDATE meters SET devEui = ? WHERE publicKey = ?
`);

// transaction queries
createTransactionQuery = db.prepare(`
INSERT INTO transactions (nonce, identifier, verified, receivedAt, raw)
Expand Down Expand Up @@ -134,6 +147,7 @@ export function saveMeter(meterData: MeterRecord): void {
publicKey: meterData.publicKey,
tokenId: meterData.tokenId,
latestNonce: meterData.latestNonce,
devEui: meterData.devEui ?? null,
});
} catch (err: any) {
console.error("Failed to save meter:", err);
Expand All @@ -151,6 +165,16 @@ export function getMeterByPublicKey(publicKey: string): MeterRecord | null {
}
}

export function getMeterByDevEui(devEui: string): MeterRecord | null {
try {
const result = getMeterByDevEuiQuery.get(devEui) as MeterRecord | undefined;
return result || null;
} catch (err: any) {
console.error("Failed to get meter by DevEui:", err);
return null;
}
}

export function getMeterByTokenId(tokenId: string): MeterRecord | null {
try {
const result = getMeterByTokenIdQuery.get(tokenId) as MeterRecord | undefined;
Expand Down Expand Up @@ -199,15 +223,32 @@ export function updateMeterNonce(publicKey: string, nonce: number): boolean {
}
}

export function updateMeterDevEui(publicKey: string, devEui: string): boolean {
try {
const result = updateMeterDevEuiQuery.run(devEui, publicKey);
const updated = result.changes > 0;
if (!updated) {
console.log("Meter not found for DevEui update:", { publicKey });
}
return updated;
} catch (err: any) {
console.error("Failed to update meter DevEui:", err);
return false;
}
}

// Transaction insertion function
export function insertTransaction(transactionData: TransactionRecord): void {
try {
const existingTransaction = getTransactionByNonceQuery.get(transactionData.nonce, transactionData.identifier) as TransactionRecord | undefined;
const existingTransaction = getTransactionByNonceQuery.get(
transactionData.nonce,
transactionData.identifier
) as TransactionRecord | undefined;

if (existingTransaction) {
throw new Error(`Transaction with nonce ${transactionData.nonce} already exists`);
}

transactionData.verified = +Boolean(transactionData.verified) as 0 | 1; // Ensure verified is set
createTransactionQuery.run(transactionData);
} catch (err: any) {
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Meter interface for database operations
export interface MeterRecord {
publicKey: string;
devEui: string | null; // Optional field for device EUI
tokenId: number;
latestNonce: number; // Optional field for tracking latest nonce
}
Expand Down
Loading