From 2b64a8b44dadf71a6874d1538ea4795a32f491ca Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Tue, 14 Oct 2025 08:01:13 +0100 Subject: [PATCH 1/8] fix: prevent concurrent message processing in handleMessage function --- src/logic/mqtt.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index 6bdd19e..d3a9b2a 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -19,6 +19,7 @@ import { verifyPayloadSignature } from "../utils"; import { getLatestTransactionNonce, pruneAndSyncOnchain } from "./sync"; const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain +let isProcessingMessage = false; // Lock to prevent concurrent message processing export function handleUplinks() { const client = connect({ @@ -50,6 +51,15 @@ export function handleUplinks() { } export async function handleMessage(blob: Buffer) { + // Check if another message is already being processed + if (isProcessingMessage) { + console.log("[warn] Message dropped - another message is already being processed"); + return; + } + + // Set lock + isProcessingMessage = true; + try { const message = JSON.parse(blob.toString()); @@ -91,9 +101,6 @@ export async function handleMessage(blob: Buffer) { if (!existingMeter) { const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); - // if (tokenId === 0) { - // throw new Error("Token ID not found for public key: " + publicKey); - // } const latestNonce = await getLatestTransactionNonce(tokenId); @@ -227,5 +234,8 @@ export async function handleMessage(blob: Buffer) { ); } catch (error) { console.error("❌ Error handling MQTT message:", error); + } finally { + // Release lock + isProcessingMessage = false; } } From 26b7e3875d8bb784e0b19580fadd1db645f74401 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Tue, 14 Oct 2025 09:49:19 +0100 Subject: [PATCH 2/8] feat: refactor context and sync logic, add cross-chain revenue functions, and remove deprecated prover logic --- src/logic/context.ts | 21 +++++++- src/logic/mqtt.ts | 15 ++++-- src/logic/{verify.ts => prover.ts} | 0 src/logic/sync.ts | 79 +++++++++++++++++++++++++++++- src/utils.ts | 34 +++++++++++++ 5 files changed, 141 insertions(+), 8 deletions(-) rename src/logic/{verify.ts => prover.ts} (100%) diff --git a/src/logic/context.ts b/src/logic/context.ts index ec3664a..008b13c 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -29,7 +29,7 @@ app.listen(port, () => { }); // ETHERS JS CONTRACT CONFIG -const provider = new JsonRpcProvider(process.env.MAINNET_RPC); +export const provider = new JsonRpcProvider(process.env.MAINNET_RPC); export const m3ter = new Contract( process.env.M3TER_CONTRACT_ADDRESS || "0x9C547B649475f1bE81323AefdbcF209C17961D5E", @@ -42,6 +42,23 @@ export const m3ter = new Contract( export const rollup = new Contract( process.env.ROLLUP_CONTRACT_ADDRESS || "0xf8f2d4315DB5db38f3e5c45D0bCd59959c603d9b", - ["function nonce(uint256 tokenId) external view returns (bytes6)"], + ["function nonce(uint256) external view returns (bytes6)"], + provider +); + +export const ccipRevenueReader = new Contract( + process.env.CCIP_REVENUE_READER_ADDRESS || "0x1a1b02d8c67b0fDcf4E379855868DeB470E169cf", + [ + "function read(uint256 tokenId, address target, address verifier) public view returns (uint256)", + "function readCallback(bytes[] memory data, bytes memory) external pure returns (uint256)", + "function verifierCount() external view returns (uint256)", + "function verifiers(uint256) external view returns (string, address)", + ], + provider +); + +export const priceContext = new Contract( + process.env.PRICE_CONTEXT_ADDRESS || "0x0000000000000000000000000000000000000000", + ["function owed(uint256 tokenId) public view returns (uint256)"], provider ); diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index d3a9b2a..1ae7bbd 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -13,10 +13,15 @@ import { updateMeterNonce, } from "../store/sqlite"; import { State, TransactionRecord } from "../types"; -import { getProverURL, sendPendingTransactionsToProver } from "./verify"; +import { getProverURL, sendPendingTransactionsToProver } from "./prover"; import { decodePayload } from "./decode"; import { verifyPayloadSignature } from "../utils"; -import { getLatestTransactionNonce, pruneAndSyncOnchain } from "./sync"; +import { + getLatestTransactionNonce, + pruneAndSyncOnchain, + getCrossChainRevenue, + getOwedFromPriceContext, +} from "./sync"; const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain let isProcessingMessage = false; // Lock to prevent concurrent message processing @@ -214,9 +219,9 @@ export async function handleMessage(blob: Buffer) { console.error("Error sending pending transactions to prover:", error); } } - - const state = - decoded.nonce === expectedNonce ? { is_on: true } : { nonce: m3ter.latestNonce, is_on: true }; + const is_on = + (await getCrossChainRevenue(m3ter.tokenId)) >= (await getOwedFromPriceContext(m3ter.tokenId)); + const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; // TODO: remove the following block after testing // if transaction nonce is 0 and the latest nonce is 0 diff --git a/src/logic/verify.ts b/src/logic/prover.ts similarity index 100% rename from src/logic/verify.ts rename to src/logic/prover.ts diff --git a/src/logic/sync.ts b/src/logic/sync.ts index ade23c9..4cb4469 100644 --- a/src/logic/sync.ts +++ b/src/logic/sync.ts @@ -6,7 +6,14 @@ import { pruneTransactionsBefore, updateMeterNonce, } from "../store/sqlite"; -import { rollup as rollupContract } from "./context"; +import { + provider, + rollup as rollupContract, + ccipRevenueReader as ccipRevenueReaderContract, + priceContext as priceContextContract, +} from "./context"; +import { JsonRpcProvider, Contract } from "ethers"; +import { retry } from "../utils"; export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { const meter = @@ -50,3 +57,73 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis return latestNonce; } + +// get revenue across suppored chains +export async function getCrossChainRevenue(tokenId: number): Promise { + try { + // Get the number of verifiers + const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); + + let totalRevenue = 0; + + // Iterate through all verifiers and get revenue from each chain + for (let i = 0; i < verifierCount; i++) { + try { + // Get verifier info (ensName, targetContractAddress) + const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); + + console.log(`[info] Getting revenue from ENS: ${ensName}, target: ${targetAddress}`); + + // Resolve ENS name to get the verifier address + const verifierAddress = await retry(() => provider.resolveName(ensName)); + + if (!verifierAddress) { + console.error(`[error] Failed to resolve ENS name: ${ensName}`); + continue; + } + + console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); + + // Get revenue from this specific chain using CCIP read + // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) + const revenue = await retry(() => + ccipRevenueReaderContract.read(tokenId, targetAddress, verifierAddress, { + enableCcipRead: true, + }) + ); + const revenueAmount = Number(revenue); + + console.log(`[info] Revenue from ${ensName} (${verifierAddress}): ${revenueAmount}`); + totalRevenue += revenueAmount; + } catch (error) { + console.error(`[error] Failed to get revenue from verifier ${i}:`, error); + // Continue with other verifiers even if one fails + } + } + + console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`); + return totalRevenue; + } catch (error) { + console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error); + throw error; + } +} + +// get owed from price context +export async function getOwedFromPriceContext(tokenId: number): Promise { + try { + return await retry(async () => { + console.log(`[info] Getting owed amount for token ${tokenId} from price context`); + + // Call the price context to get the amount the user owes with CCIP read enabled + const owedAmount = await priceContextContract.owed(tokenId); + const owed = Number(owedAmount); + + console.log(`[info] Owed amount for token ${tokenId}: ${owed}`); + return owed; + }); + } catch (error) { + console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error); + throw error; + } +} diff --git a/src/utils.ts b/src/utils.ts index dff5122..d4bdd87 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,6 +1,40 @@ import { TransactionRecord, BatchTransactionPayload } from "./types"; import { createPublicKey, verify } from "crypto"; +/** + * Retries a function up to 5 times with exponential backoff + * @param fn Function to retry + * @param maxRetries Maximum number of retries (default: 5) + * @param baseDelay Base delay in milliseconds (default: 1000) + * @returns Promise that resolves with the function result or rejects with the last error + */ +export async function retry( + fn: () => Promise, + maxRetries: number = 5, + baseDelay: number = 1000 +): Promise { + let lastError: Error; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + + if (attempt === maxRetries) { + throw lastError; + } + + const delay = baseDelay * Math.pow(2, attempt); + console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); + + await new Promise(resolve => setTimeout(resolve, delay)); + } + } + + throw lastError!; +} + export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { return transactions.map((transaction) => ({ m3ter_id: Number(transaction.identifier), From dcf44ed08c35a9777ce21c6075dcc27f5839ec12 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Tue, 14 Oct 2025 10:20:55 +0100 Subject: [PATCH 3/8] fix: update default addresses for CCIP revenue reader and price context contracts --- src/logic/context.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/logic/context.ts b/src/logic/context.ts index 008b13c..851a640 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -47,7 +47,7 @@ export const rollup = new Contract( ); export const ccipRevenueReader = new Contract( - process.env.CCIP_REVENUE_READER_ADDRESS || "0x1a1b02d8c67b0fDcf4E379855868DeB470E169cf", + process.env.CCIP_REVENUE_READER_ADDRESS || "0xD648cdF47e9534B2FCfb18C1E94CA9AAff07BA0E", [ "function read(uint256 tokenId, address target, address verifier) public view returns (uint256)", "function readCallback(bytes[] memory data, bytes memory) external pure returns (uint256)", @@ -58,7 +58,7 @@ export const ccipRevenueReader = new Contract( ); export const priceContext = new Contract( - process.env.PRICE_CONTEXT_ADDRESS || "0x0000000000000000000000000000000000000000", + process.env.PRICE_CONTEXT_ADDRESS || "0xfe77DC466E1cA1Ce1553085921b33A25846d6819", ["function owed(uint256 tokenId) public view returns (uint256)"], provider ); From 8f42c746ec6faa7a2934499c67df2639d87cb946 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Tue, 14 Oct 2025 18:47:26 +0100 Subject: [PATCH 4/8] fix: update default PRICE_CONTEXT_ADDRESS in context configuration --- src/logic/context.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/logic/context.ts b/src/logic/context.ts index 851a640..73e7aa9 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -58,7 +58,7 @@ export const ccipRevenueReader = new Contract( ); export const priceContext = new Contract( - process.env.PRICE_CONTEXT_ADDRESS || "0xfe77DC466E1cA1Ce1553085921b33A25846d6819", + process.env.PRICE_CONTEXT_ADDRESS || "0x803ec9176182B863FD2cD69CF8bC68b1aB1C7b0A", ["function owed(uint256 tokenId) public view returns (uint256)"], provider ); From 55084ba2bc18691ca9a879bea8185b150e34c1fa Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Tue, 14 Oct 2025 19:28:11 +0100 Subject: [PATCH 5/8] ... --- src/logic/context.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/logic/context.ts b/src/logic/context.ts index 73e7aa9..9ddecaa 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -58,7 +58,7 @@ export const ccipRevenueReader = new Contract( ); export const priceContext = new Contract( - process.env.PRICE_CONTEXT_ADDRESS || "0x803ec9176182B863FD2cD69CF8bC68b1aB1C7b0A", + process.env.PRICE_CONTEXT_ADDRESS || "0xc6D5Ff8E80F4Ee511Db4bCf6a0BcEbF9f41aAA32", ["function owed(uint256 tokenId) public view returns (uint256)"], provider ); From c17b786751dacaf80859dbef42dc65dbdd86b689 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Wed, 15 Oct 2025 12:42:31 +0100 Subject: [PATCH 6/8] fix: improve host configuration by using local IPv4 address as fallback --- src/logic/mqtt.ts | 4 ++-- src/utils.ts | 27 +++++++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index 1ae7bbd..76b625f 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -15,7 +15,7 @@ import { import { State, TransactionRecord } from "../types"; import { getProverURL, sendPendingTransactionsToProver } from "./prover"; import { decodePayload } from "./decode"; -import { verifyPayloadSignature } from "../utils"; +import { getLocalIPv4, verifyPayloadSignature } from "../utils"; import { getLatestTransactionNonce, pruneAndSyncOnchain, @@ -28,7 +28,7 @@ let isProcessingMessage = false; // Lock to prevent concurrent message processin export function handleUplinks() { const client = connect({ - host: process.env.CHIRPSTACK_HOST, + host: process.env.CHIRPSTACK_HOST ?? getLocalIPv4(), port: 1883, clean: true, connectTimeout: 9000, diff --git a/src/utils.ts b/src/utils.ts index d4bdd87..5e0f205 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import { TransactionRecord, BatchTransactionPayload } from "./types"; import { createPublicKey, verify } from "crypto"; +import os from "os"; /** * Retries a function up to 5 times with exponential backoff @@ -14,24 +15,24 @@ export async function retry( baseDelay: number = 1000 ): Promise { let lastError: Error; - + for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await fn(); } catch (error) { lastError = error as Error; - + if (attempt === maxRetries) { throw lastError; } - + const delay = baseDelay * Math.pow(2, attempt); console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); - - await new Promise(resolve => setTimeout(resolve, delay)); + + await new Promise((resolve) => setTimeout(resolve, delay)); } } - + throw lastError!; } @@ -66,3 +67,17 @@ export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): return false; } } + +export function getLocalIPv4() { + const nets = os.networkInterfaces(); + for (const network of Object.values(nets)) { + if (network) { + for (const iface of network) { + if (iface.family === "IPv4" && !iface.internal) { + return iface.address; + } + } + } + } + return "127.0.0.1"; +} From 3a92f743bba621620009187b12aa348147931112 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Wed, 15 Oct 2025 12:45:33 +0100 Subject: [PATCH 7/8] fix: simplify CHIRPSTACK_HOST assignment and enhance connection logging --- src/logic/mqtt.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index 76b625f..c7a33fb 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -23,12 +23,13 @@ import { getOwedFromPriceContext, } from "./sync"; +const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST || getLocalIPv4(); const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain let isProcessingMessage = false; // Lock to prevent concurrent message processing export function handleUplinks() { const client = connect({ - host: process.env.CHIRPSTACK_HOST ?? getLocalIPv4(), + host: CHIRPSTACK_HOST, port: 1883, clean: true, connectTimeout: 9000, @@ -37,7 +38,7 @@ export function handleUplinks() { client.on("connect", () => { client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => { - console.log("\nConnected & Subscribed\n"); + console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); }); }); From 4ada477aaca8b2cc073771a8c3c4af94ad6e292c Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Wed, 15 Oct 2025 12:49:10 +0100 Subject: [PATCH 8/8] ... --- src/utils.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils.ts b/src/utils.ts index 5e0f205..90b8c23 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -70,6 +70,7 @@ export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): export function getLocalIPv4() { const nets = os.networkInterfaces(); + console.log(nets); for (const network of Object.values(nets)) { if (network) { for (const iface of network) {