diff --git a/.gitignore b/.gitignore index d03d680..195e462 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ emulate.ts test-database.ts script.*.ts +script.*.js # Database files *.db diff --git a/src/logic/arweave.ts b/src/logic/arweave.ts index 59f4bff..a75f0d8 100644 --- a/src/logic/arweave.ts +++ b/src/logic/arweave.ts @@ -20,10 +20,10 @@ export async function interact(m3terId: number, decoded: DecodedPayload) { const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; - const byteLength = transactionHex.length; + const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); return await turbo.uploadFile({ - fileStreamFactory: () => Readable.from([transactionHex.toString("hex")], { encoding: "utf8" }), + fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), fileSizeFactory: () => byteLength, dataItemOpts: { paidBy: await arweave.wallets.jwkToAddress(key), @@ -42,5 +42,19 @@ export async function interact(m3terId: number, decoded: DecodedPayload) { { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, ], }, + events: { + onUploadProgress: (progress) => { + console.log("[arweave] Upload progress:", progress); + }, + onError: (error) => { + console.error("[arweave] Upload error:", error); + }, + onSuccess(event) { + console.log("[arweave] Upload successful! Transaction ID:", event); + }, + onUploadSuccess(event) { + console.log("[arweave] Upload completed! Transaction ID:", event); + }, + }, }); } diff --git a/src/logic/context.ts b/src/logic/context.ts index 407287e..9b01fa0 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -32,7 +32,7 @@ app.listen(port, () => { const provider = new JsonRpcProvider(process.env.MAINNET_RPC); export const m3ter = new Contract( - "0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F", + "0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F" [ "function publicKey(uint256) view returns (bytes32)", "function tokenID(bytes32) view returns (uint256)", diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index 139dbab..b8fb62b 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -15,6 +15,7 @@ import { import { State, TransactionRecord } from "../types"; import { getProverURL, sendPendingTransactionsToProver } from "./verify"; import { decodePayload } from "./decode"; +import { verifyPayloadSignature } from "../utils"; export function handleUplinks() { const client = connect({ @@ -57,10 +58,31 @@ export async function handleMessage(blob: Buffer) { const transactionHex = payload; const decoded = decodePayload(transactionHex); let publicKey = decoded.extensions.deviceId; + let payloadHadPublicKey = !!publicKey; console.log("[info] Decoded payload:", decoded); - if (publicKey) { + if (!publicKey) { + // try to find public key by DevEui + const devEui = message["deviceInfo"]["devEui"]; + const meterByDevEui = getMeterByDevEui(devEui); + + if (!meterByDevEui) { + throw new Error("Device EUI not associated with any meter: " + devEui); + } + + publicKey = meterByDevEui.publicKey.replace("0x", ""); + } + + // verify transaction signature + const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex")); + if (!isValid) { + throw new Error("Invalid transaction signature for meter with public key: " + publicKey); + } + + console.log("[info] Verified signature"); + + if (payloadHadPublicKey) { // save public key with device EUI mapping if not already saved const existingMeter = getMeterByPublicKey(`0x${publicKey}`); @@ -86,16 +108,6 @@ export async function handleMessage(blob: Buffer) { updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); console.log("[info] Updated meter with DevEui:", existingMeter.tokenId); } - } else { - // try to find meter by DevEui - const devEui = message["deviceInfo"]["devEui"]; - const meterByDevEui = getMeterByDevEui(devEui); - - if (!meterByDevEui) { - throw new Error("Device EUI not associated with any meter: " + devEui); - } - - publicKey = meterByDevEui.publicKey.replace("0x", ""); } const m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; @@ -118,6 +130,7 @@ export async function handleMessage(blob: Buffer) { if (decoded.nonce === expectedNonce) { console.log("[info] Nonce is valid:", decoded.nonce); + // Upload to arweave await interact(m3ter.tokenId, decoded); @@ -126,8 +139,7 @@ export async function handleMessage(blob: Buffer) { // save transaction to local store const transactionRecord = { nonce: decoded.nonce, - verified: false, - identifier: m3ter.tokenId.toString(), + identifier: m3ter.tokenId, receivedAt: Date.now(), raw: transactionHex.toString("hex"), } as TransactionRecord; @@ -140,7 +152,7 @@ export async function handleMessage(blob: Buffer) { console.error("Error inserting transaction:", error); } - updateMeterNonce(publicKey, expectedNonce); + updateMeterNonce(`0x${publicKey}`, expectedNonce); console.log("[info] Updated meter nonce to:", expectedNonce); @@ -158,10 +170,18 @@ export async function handleMessage(blob: Buffer) { } const state = - decoded.nonce === m3ter.latestNonce + 1 || decoded.nonce === 0 + decoded.nonce === m3ter.latestNonce + 1 || (decoded.nonce === 0 && m3ter.latestNonce === 0) ? { is_on: true } : { nonce: m3ter.latestNonce, is_on: true }; + // TODO: remove the following block after testing + // if transaction nonce is 0 and the latest nonce is 0 + // update the latest nonce to 1, respond with 1 + if (decoded.nonce === 0 && m3ter.latestNonce === 0) { + updateMeterNonce(`0x${publicKey}`, 1); + state.nonce = 1; + } + console.log("[info] Enqueuing state:", state); enqueue( @@ -169,6 +189,6 @@ export async function handleMessage(blob: Buffer) { encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) ); } catch (error) { - console.log(error); + console.error("❌ Error handling MQTT message:", error); } } diff --git a/src/logic/sync.ts b/src/logic/sync.ts new file mode 100644 index 0000000..0759050 --- /dev/null +++ b/src/logic/sync.ts @@ -0,0 +1,21 @@ +import { getAllMeterRecords, pruneTransactionsBefore, updateMeterNonce } from "../store/sqlite"; +import { rollup as rollupContract } from "./context"; + +export async function pruneAndSyncWithBlockchain() { + // Get all meter records from the local database + const meters = getAllMeterRecords(); + + for (const meter of meters) { + const { publicKey, latestNonce } = meter; + + // Check the latest nonce on the blockchain + const blockchainNonce = Number(await rollupContract.nonce(meter.tokenId)); + + if (blockchainNonce > latestNonce) { + // If the blockchain nonce is greater, update the local record + updateMeterNonce(publicKey, blockchainNonce); + // prune transactions with nonce less than or equal to blockchainNonce + pruneTransactionsBefore(meter.tokenId, blockchainNonce); + } + } +} diff --git a/src/logic/verify.ts b/src/logic/verify.ts index 972f86b..713c0bc 100644 --- a/src/logic/verify.ts +++ b/src/logic/verify.ts @@ -1,7 +1,7 @@ import { BatchTransactionPayload, TransactionRecord } from "../types"; import { rollup } from "./context"; -import { getUnverifiedTransactionRecords } from "../store/sqlite"; import { buildBatchPayload } from "../utils"; +import { getAllTransactionRecords } from "../store/sqlite"; const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; @@ -130,7 +130,7 @@ export async function getProverURL(): Promise { } export async function sendPendingTransactionsToProver(proverURL: string) { - const pendingTransactions = getUnverifiedTransactionRecords(); + const pendingTransactions = getAllTransactionRecords(); if (!proverURL) { console.error("No active prover node available"); @@ -139,5 +139,8 @@ export async function sendPendingTransactionsToProver(proverURL: string) { const requestPayload = buildBatchPayload(pendingTransactions); + console.log("[info] Sending", requestPayload.length, "transactions to prover at", proverURL); + console.log("[info] Request payload:", requestPayload); + return await sendTransactionsToProver(proverURL, requestPayload); } diff --git a/src/store/sqlite.ts b/src/store/sqlite.ts index 5a0ad33..aff9c7a 100644 --- a/src/store/sqlite.ts +++ b/src/store/sqlite.ts @@ -16,9 +16,6 @@ let updateMeterDevEuiQuery: DatabaseStatementType; // transaction queries let createTransactionQuery: DatabaseStatementType; let getTransactionByNonceQuery: DatabaseStatementType; -let getUnverifiedTransactionRecordsQuery: DatabaseStatementType; -let markTransactionAsVerifiedQuery: DatabaseStatementType; -let deleteVerifiedTransactionRecordsQuery: DatabaseStatementType; /** * setup database @@ -53,9 +50,8 @@ function initializeTransactionsTable() { return db.exec(` CREATE TABLE IF NOT EXISTS transactions ( nonce INTEGER, - identifier TEXT, + identifier INTEGER, -- Meter token ID receivedAt INTEGER, - verified BOOLEAN DEFAULT FALSE, raw TEXT, UNIQUE(nonce, identifier) @@ -117,25 +113,13 @@ function prepareQueries() { // transaction queries createTransactionQuery = db.prepare(` - INSERT INTO transactions (nonce, identifier, verified, receivedAt, raw) - VALUES (@nonce, @identifier, @verified, @receivedAt, @raw) + INSERT INTO transactions (nonce, identifier, receivedAt, raw) + VALUES (@nonce, @identifier, @receivedAt, @raw) `); getTransactionByNonceQuery = db.prepare(` SELECT * FROM transactions WHERE nonce = ? AND identifier = ? `); - - getUnverifiedTransactionRecordsQuery = db.prepare(` - SELECT * FROM transactions WHERE verified = FALSE - `); - - markTransactionAsVerifiedQuery = db.prepare(` - UPDATE transactions SET verified = TRUE WHERE nonce = ? - `); - - deleteVerifiedTransactionRecordsQuery = db.prepare(` - DELETE FROM transactions WHERE verified = TRUE - `); } // Meter management functions @@ -249,7 +233,6 @@ export function insertTransaction(transactionData: TransactionRecord): void { throw new Error(`Transaction with nonce ${transactionData.nonce} already exists`); } - transactionData.verified = +Boolean(transactionData.verified) as 0 | 1; // Ensure verified is set createTransactionQuery.run(transactionData); } catch (err: any) { console.error("Failed to insert transaction:", err); @@ -257,50 +240,25 @@ export function insertTransaction(transactionData: TransactionRecord): void { } } -export function getTransactionByNonce(nonce: number): TransactionRecord | null { +export function getAllTransactionRecords(): TransactionRecord[] { try { - const result = getTransactionByNonceQuery.get(nonce) as TransactionRecord | undefined; - return result || null; - } catch (err: any) { - console.error("Failed to get transaction by nonce:", err); - return null; - } -} - -// Transaction verification functions -export function getUnverifiedTransactionRecords(): TransactionRecord[] { - try { - const results = getUnverifiedTransactionRecordsQuery.all() as TransactionRecord[]; + const results = db.prepare(`SELECT * FROM transactions`).all() as TransactionRecord[]; return results; } catch (err: any) { - console.error("Failed to get unverified transactions:", err); + console.error("Failed to get all transactions:", err); return []; } } -export function markTransactionAsVerified(nonce: number): boolean { - try { - const result = markTransactionAsVerifiedQuery.run(nonce); - const updated = result.changes > 0; - if (!updated) { - console.log("Transaction not found for verification:", { - nonce, - }); - } - return updated; - } catch (err: any) { - console.error("Failed to mark transaction as verified:", err); - return false; - } -} - -export function deleteVerifiedTransactionRecords(): number { +export function pruneTransactionsBefore(nonce: number, meterNumber: number) { try { - const result = deleteVerifiedTransactionRecordsQuery.run(); - const deletedCount = result.changes; - return deletedCount; + const result = db + .prepare(`DELETE FROM transactions WHERE identifier = ? AND nonce < ?`) + .run(meterNumber, nonce); + console.log( + `Pruned ${result.changes} transactions for meter ${meterNumber} with nonce < ${nonce}` + ); } catch (err: any) { - console.error("Failed to delete verified transactions:", err); - return 0; + console.error("Failed to prune transactions:", err); } } diff --git a/src/types.ts b/src/types.ts index 2023423..0439b23 100644 --- a/src/types.ts +++ b/src/types.ts @@ -9,8 +9,7 @@ export interface MeterRecord { // transaction database record export interface TransactionRecord { nonce: number; - identifier: string; - verified: boolean | 0 | 1; // Optional field to indicate if the transaction is verified + identifier: number; // Meter token ID receivedAt: number; raw: string; // Raw transaction data in hex format } @@ -34,11 +33,6 @@ export interface State { token_id: number; } -export interface M3terPayload { - 0: string; // encoded format: nonce | energy | signature | voltage | device_id | longitude | latitude - 1: string; -} - export interface DecodedPayload { nonce: number; energy: number; diff --git a/src/utils.ts b/src/utils.ts index abd88bd..0d95e3a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,5 @@ import { TransactionRecord, BatchTransactionPayload } from "./types"; +import { createPublicKey, verify } from "crypto"; export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { return transactions.map((transaction) => ({ @@ -6,3 +7,28 @@ export function buildBatchPayload(transactions: TransactionRecord[]): BatchTrans message: transaction.raw, })); } + +export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { + try { + const message = transaction.subarray(0, 8); + const signature = transaction.subarray(8, 72); + + // Wrap raw key in SPKI DER + const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); + const derKey = Buffer.concat([spkiPrefix, rawPubKey]); + + const publicKey = createPublicKey({ + key: derKey, + format: "der", + type: "spki", + }); + + // Verify + const ok = verify(null, message, publicKey, signature); + + return ok; + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +} diff --git a/tests/store/sqlite.test.ts b/tests/store/sqlite.test.ts index e2e0d13..de23145 100644 --- a/tests/store/sqlite.test.ts +++ b/tests/store/sqlite.test.ts @@ -5,13 +5,12 @@ import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey, updateMeterNonce, - getUnverifiedTransactionRecords, - markTransactionAsVerified, - deleteVerifiedTransactionRecords, insertTransaction, deleteDatabase, getMeterByDevEui, updateMeterDevEui, + getAllTransactionRecords, + pruneTransactionsBefore, } from "../../src/store/sqlite"; beforeEach(() => { @@ -25,7 +24,7 @@ afterEach(() => { it("should have no meters and transactions", () => { const meters = getAllMeterRecords(); - const transactions = getUnverifiedTransactionRecords(); + const transactions = getAllTransactionRecords(); expect(meters).toHaveLength(0); expect(transactions).toHaveLength(0); }); @@ -83,7 +82,6 @@ it("should get meter by device EUI", () => { expect(retrievedMeter).toEqual(meterData); }); - it("should delete meter", () => { const meterData = { publicKey: "test_public_key", @@ -133,43 +131,38 @@ it("should update meter devEui", () => { it("should insert transaction", () => { const transactionData = { nonce: 1, - identifier: "0", // meter token ID + identifier: 0, // meter token ID receivedAt: Date.now(), raw: "", - verified: false, }; insertTransaction(transactionData); - const transactions = getUnverifiedTransactionRecords(); + const transactions = getAllTransactionRecords(); expect(transactions).toHaveLength(1); - expect(transactions[0]).toEqual({ ...transactionData, verified: 0 }); -}); - -it("should mark transaction as verified", () => { - const transactionData = { - nonce: 1, - identifier: 0, // meter token ID - receivedAt: Date.now(), - }; - - markTransactionAsVerified(transactionData.nonce); - const verifiedTransactions = getUnverifiedTransactionRecords(); - expect(verifiedTransactions).toHaveLength(0); + expect(transactions[0]).toEqual({ ...transactionData }); }); -it("should delete verified transactions", () => { - const transactionData = { - nonce: 1, - identifier: "0", // meter token ID - verified: true, - receivedAt: Date.now(), - raw: "", - }; - insertTransaction(transactionData); - - const deleted = deleteVerifiedTransactionRecords(); - expect(deleted).toBe(1); - - const transactions = getUnverifiedTransactionRecords(); - expect(transactions).toHaveLength(0); +it("should prune transactions before a given nonce for a specific meter", () => { + const meterTokenId = 1; + const nonceToPrune = 5; + + // Insert some transactions for the meter + for (let i = 0; i < 10; i++) { + insertTransaction({ + nonce: i, + identifier: meterTokenId, + receivedAt: Date.now(), + raw: "", + }); + } + + let transactions = getAllTransactionRecords(); + expect(transactions).toHaveLength(10); + + // Prune transactions + pruneTransactionsBefore(nonceToPrune, meterTokenId); + + transactions = getAllTransactionRecords(); + expect(transactions).toHaveLength(5); + expect(transactions.every((tx) => tx.nonce >= nonceToPrune)).toBe(true); });