Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"author": "Godwin",
"license": "MIT",
"dependencies": {
"@ardrive/turbo-sdk": "^1.25.0",
"@ardrive/turbo-sdk": "^1.30.0",
"@chirpstack/chirpstack-api": "^4.7.0",
"@grpc/grpc-js": "^1.10.3",
"arweave": "^1.15.7",
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ handleUplinks();
// Initialize database tables and jobs
setupDatabase();

console.log("[server]: Server is starting...", process.env); // todo: remove

app.get("/", async (req: Request, res: Response) => {
const m3ters = getAllMeterRecords();
res.render("index", { m3ters });
Expand Down
24 changes: 11 additions & 13 deletions src/logic/arweave.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import { TurboFactory } from "@ardrive/turbo-sdk";
import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk";
import { Readable } from "stream";
import Arweave from "arweave";
import type { DecodedPayload, M3terPayload } from "../types";
import type { DecodedPayload } from "../types";

export async function interact(
m3terId: number,
payload: M3terPayload,
decoded: DecodedPayload
) {
export async function interact(m3terId: number, decoded: DecodedPayload) {
// encode transaction into standard format (payload[0])
// format: nonce | energy | signature | voltage | device_id | longitude | latitude
const transactionHex = payload[0];
const transactionHex = decoded.buf;

const arweave = Arweave.init({
host: "arweave.net",
Expand All @@ -19,19 +15,21 @@ export async function interact(
});

const key = await arweave.wallets.generate();
const turbo = TurboFactory.authenticated({ privateKey: key });
const signer = new ArweaveSigner(key);
const turbo = TurboFactory.authenticated({ signer });

const contractLabel = process.env.CONTRACT_LABEL || "M3ters";

const byteLength = Buffer.byteLength(transactionHex, "utf8");
const byteLength = transactionHex.length;

return await turbo.uploadFile({
fileStreamFactory: () => Readable.from(Buffer.from(transactionHex, "utf8")),
fileStreamFactory: () => Readable.from([transactionHex.toString("hex")], { encoding: "utf8" }),
fileSizeFactory: () => byteLength,
dataItemOpts: {
paidBy: await arweave.wallets.jwkToAddress(key),
tags: [
{ name: "Contract-Label", value: contractLabel },
{ name: "Contract-Use", value: "M3tering Protocol" },
{ name: "Contract-Use", value: "M3tering Protocol Test" },
{ name: "Content-Type", value: "text/plain" },
{ name: "M3ter-ID", value: m3terId.toString() },
{ name: "Timestamp", value: Date.now().toString() },
Expand All @@ -41,7 +39,7 @@ export async function interact(
{ name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" },
{ name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" },
{ name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" },
{ name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }
{ name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" },
],
},
});
Expand Down
5 changes: 2 additions & 3 deletions src/logic/decode.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import type { DecodedPayload } from "../types";

export function decodePayload(hex: string) {
const buf = Buffer.from(hex, "hex");

export function decodePayload(buf: Buffer) {
if (buf.length < 72) {
throw new Error("Payload too short. Must be at least 72 bytes");
}
Expand Down Expand Up @@ -46,5 +44,6 @@ export function decodePayload(hex: string) {
energy: energyKWh,
signature,
extensions: ext,
buf,
};
}
52 changes: 43 additions & 9 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ export function handleUplinks() {
});
});

client.on("error", (err) => {
console.error("Connection error: ", err);
client.end();
});

client.on("reconnect", () => {
console.log("Reconnecting...");
});

client.on("message", async (_, blob) => {
return await handleMessage(blob);
});
Expand All @@ -30,47 +39,72 @@ export function handleUplinks() {
async function handleMessage(blob: Buffer) {
try {
const message = JSON.parse(blob.toString());
const payload = JSON.parse(Buffer.from(message["data"], "base64").toString());
// encode transaction into standard format (payload[0])

console.log("[info] Received uplink from device:", JSON.stringify(message));

const payload = Buffer.from(message["data"], "hex");
// encode transaction into standard format (payload is hex string)
// format: nonce | energy | signature | voltage | device_id | longitude | latitude
const transactionHex = payload[0];
const publicKey = payload[1];
const transactionHex = payload;
const decoded = decodePayload(transactionHex);
const publicKey = decoded.extensions.deviceId;

console.log("[info] Decoded payload:", decoded);

if (!publicKey) {
throw new Error("Invalid Public Key");
}

const m3ter = getMeterByPublicKey(publicKey ?? "");

if (!m3ter) {
console.error("Meter not found for public key:", publicKey);
return;
}

console.log("[info] Received blob for meter", m3ter?.tokenId, "nonce", m3ter?.latestNonce + 1);

const decoded = decodePayload(transactionHex);
console.log(
"[info] Received blob for meter",
m3ter?.tokenId,
"expected nonce:",
m3ter?.latestNonce + 1,
"got:",
decoded.nonce
);

// if device nonce is correct
const expectedNonce = m3ter.latestNonce + 1;

let state;
if (decoded.nonce === expectedNonce) {
state = { is_on: true };

console.log("[info] Nonce is valid:", decoded.nonce);
// Upload to arweave
await interact(m3ter.tokenId, payload, decoded);
await interact(m3ter.tokenId, decoded);

console.log("[info] Uploaded transaction to Arweave for meter", m3ter.tokenId);

// save transaction to local store
const transactionRecord = {
nonce: decoded.nonce,
verified: false,
identifier: m3ter.tokenId.toString(),
receivedAt: Date.now(),
raw: transactionHex,
raw: transactionHex.toString("hex"),
} as TransactionRecord;

try {
insertTransaction(transactionRecord);

console.log("[info] Inserted transaction record:", transactionRecord);
} catch (error) {
console.error("Error inserting transaction:", error);
}

updateMeterNonce(publicKey, expectedNonce);

console.log("[info] Updated meter nonce to:", expectedNonce);

try {
// send pending transactions to prover node
const proverURL = await getProverURL();
Expand Down
2 changes: 1 addition & 1 deletion src/logic/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { rollup } from "./context";
import { getUnverifiedTransactionRecords } from "../store/sqlite";
import { buildBatchPayload } from "../utils";

const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3tering.com";
const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing";

// Prover node structure
export interface ProverNode {
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ export interface DecodedPayload {
longitude?: number;
latitude?: number;
};
buf: Buffer;
}