Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/logic/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ app.listen(port, () => {
});

// ETHERS JS CONTRACT CONFIG
const provider = new JsonRpcProvider(process.env.MAINNET_RPC);
export const provider = new JsonRpcProvider(process.env.MAINNET_RPC);

export const m3ter = new Contract(
process.env.M3TER_CONTRACT_ADDRESS || "0x9C547B649475f1bE81323AefdbcF209C17961D5E",
Expand All @@ -42,6 +42,23 @@ export const m3ter = new Contract(

export const rollup = new Contract(
process.env.ROLLUP_CONTRACT_ADDRESS || "0xf8f2d4315DB5db38f3e5c45D0bCd59959c603d9b",
["function nonce(uint256 tokenId) external view returns (bytes6)"],
["function nonce(uint256) external view returns (bytes6)"],
provider
);

export const ccipRevenueReader = new Contract(
process.env.CCIP_REVENUE_READER_ADDRESS || "0xD648cdF47e9534B2FCfb18C1E94CA9AAff07BA0E",
[
"function read(uint256 tokenId, address target, address verifier) public view returns (uint256)",
"function readCallback(bytes[] memory data, bytes memory) external pure returns (uint256)",
"function verifierCount() external view returns (uint256)",
"function verifiers(uint256) external view returns (string, address)",
],
provider
);

export const priceContext = new Contract(
process.env.PRICE_CONTEXT_ADDRESS || "0xc6D5Ff8E80F4Ee511Db4bCf6a0BcEbF9f41aAA32",
["function owed(uint256 tokenId) public view returns (uint256)"],
provider
);
38 changes: 27 additions & 11 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@ import {
updateMeterNonce,
} from "../store/sqlite";
import { State, TransactionRecord } from "../types";
import { getProverURL, sendPendingTransactionsToProver } from "./verify";
import { getProverURL, sendPendingTransactionsToProver } from "./prover";
import { decodePayload } from "./decode";
import { verifyPayloadSignature } from "../utils";
import { getLatestTransactionNonce, pruneAndSyncOnchain } from "./sync";
import { getLocalIPv4, verifyPayloadSignature } from "../utils";
import {
getLatestTransactionNonce,
pruneAndSyncOnchain,
getCrossChainRevenue,
getOwedFromPriceContext,
} from "./sync";

const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST || getLocalIPv4();
const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain
let isProcessingMessage = false; // Lock to prevent concurrent message processing

export function handleUplinks() {
const client = connect({
host: process.env.CHIRPSTACK_HOST,
host: CHIRPSTACK_HOST,
port: 1883,
clean: true,
connectTimeout: 9000,
Expand All @@ -31,7 +38,7 @@ export function handleUplinks() {

client.on("connect", () => {
client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => {
console.log("\nConnected & Subscribed\n");
console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`);
});
});

Expand All @@ -50,6 +57,15 @@ export function handleUplinks() {
}

export async function handleMessage(blob: Buffer) {
// Check if another message is already being processed
if (isProcessingMessage) {
console.log("[warn] Message dropped - another message is already being processed");
return;
}

// Set lock
isProcessingMessage = true;

try {
const message = JSON.parse(blob.toString());

Expand Down Expand Up @@ -91,9 +107,6 @@ export async function handleMessage(blob: Buffer) {

if (!existingMeter) {
const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`));
// if (tokenId === 0) {
// throw new Error("Token ID not found for public key: " + publicKey);
// }

const latestNonce = await getLatestTransactionNonce(tokenId);

Expand Down Expand Up @@ -207,9 +220,9 @@ export async function handleMessage(blob: Buffer) {
console.error("Error sending pending transactions to prover:", error);
}
}

const state =
decoded.nonce === expectedNonce ? { is_on: true } : { nonce: m3ter.latestNonce, is_on: true };
const is_on =
(await getCrossChainRevenue(m3ter.tokenId)) >= (await getOwedFromPriceContext(m3ter.tokenId));
const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on };

// TODO: remove the following block after testing
// if transaction nonce is 0 and the latest nonce is 0
Expand All @@ -227,5 +240,8 @@ export async function handleMessage(blob: Buffer) {
);
} catch (error) {
console.error("❌ Error handling MQTT message:", error);
} finally {
// Release lock
isProcessingMessage = false;
}
}
File renamed without changes.
79 changes: 78 additions & 1 deletion src/logic/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import {
pruneTransactionsBefore,
updateMeterNonce,
} from "../store/sqlite";
import { rollup as rollupContract } from "./context";
import {
provider,
rollup as rollupContract,
ccipRevenueReader as ccipRevenueReaderContract,
priceContext as priceContextContract,
} from "./context";
import { JsonRpcProvider, Contract } from "ethers";
import { retry } from "../utils";

export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise<number> {
const meter =
Expand Down Expand Up @@ -50,3 +57,73 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis

return latestNonce;
}

// get revenue across suppored chains
export async function getCrossChainRevenue(tokenId: number): Promise<number> {
try {
// Get the number of verifiers
const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount()));

let totalRevenue = 0;

// Iterate through all verifiers and get revenue from each chain
for (let i = 0; i < verifierCount; i++) {
try {
// Get verifier info (ensName, targetContractAddress)
const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i));

console.log(`[info] Getting revenue from ENS: ${ensName}, target: ${targetAddress}`);

// Resolve ENS name to get the verifier address
const verifierAddress = await retry(() => provider.resolveName(ensName));

if (!verifierAddress) {
console.error(`[error] Failed to resolve ENS name: ${ensName}`);
continue;
}

console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`);

// Get revenue from this specific chain using CCIP read
// Parameters: tokenId, target (L2 contract), verifier (resolved from ENS)
const revenue = await retry(() =>
ccipRevenueReaderContract.read(tokenId, targetAddress, verifierAddress, {
enableCcipRead: true,
})
);
const revenueAmount = Number(revenue);

console.log(`[info] Revenue from ${ensName} (${verifierAddress}): ${revenueAmount}`);
totalRevenue += revenueAmount;
} catch (error) {
console.error(`[error] Failed to get revenue from verifier ${i}:`, error);
// Continue with other verifiers even if one fails
}
}

console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`);
return totalRevenue;
} catch (error) {
console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error);
throw error;
}
}

// get owed from price context
export async function getOwedFromPriceContext(tokenId: number): Promise<number> {
try {
return await retry(async () => {
console.log(`[info] Getting owed amount for token ${tokenId} from price context`);

// Call the price context to get the amount the user owes with CCIP read enabled
const owedAmount = await priceContextContract.owed(tokenId);
const owed = Number(owedAmount);

console.log(`[info] Owed amount for token ${tokenId}: ${owed}`);
return owed;
});
} catch (error) {
console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error);
throw error;
}
}
50 changes: 50 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
import { TransactionRecord, BatchTransactionPayload } from "./types";
import { createPublicKey, verify } from "crypto";
import os from "os";

/**
* Retries a function up to 5 times with exponential backoff
* @param fn Function to retry
* @param maxRetries Maximum number of retries (default: 5)
* @param baseDelay Base delay in milliseconds (default: 1000)
* @returns Promise that resolves with the function result or rejects with the last error
*/
export async function retry<T>(
fn: () => Promise<T>,
maxRetries: number = 5,
baseDelay: number = 1000
): Promise<T> {
let lastError: Error;

for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;

if (attempt === maxRetries) {
throw lastError;
}

const delay = baseDelay * Math.pow(2, attempt);
console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error);

await new Promise((resolve) => setTimeout(resolve, delay));
}
}

throw lastError!;
}

export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] {
return transactions.map((transaction) => ({
Expand Down Expand Up @@ -32,3 +67,18 @@ export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer):
return false;
}
}

export function getLocalIPv4() {
const nets = os.networkInterfaces();
console.log(nets);
for (const network of Object.values(nets)) {
if (network) {
for (const iface of network) {
if (iface.family === "IPv4" && !iface.internal) {
return iface.address;
}
}
}
}
return "127.0.0.1";
}