Skip to content

Commit

Permalink
ADD: mempool processor & push notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
Overtorment committed Jul 18, 2020
1 parent baaae67 commit 432c450
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 8 deletions.
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
web: npm start
worker: npm run worker-blockprocessor
worker2: npm run worker-sender
worker3: npm run worker-processmempool
26 changes: 25 additions & 1 deletion openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ openapi: 3.0.0
info:
title: GroundControl push server API
description: Push notifications server for BlueWallet
version: 0.0.5
version: 0.0.6
servers:
- url: http://localhost:3001
paths:
Expand Down Expand Up @@ -97,10 +97,12 @@ components:
enum:
- 1
- 2
- 3
description: >
type:
* `1` - Your lightning invoice was paid
* `2` - New transaction to one of your addresses
* `3` - New unconfirmed transaction to one of your addresses
"token":
type: "string"
"os":
Expand Down Expand Up @@ -154,3 +156,25 @@ components:
txid:
type: "string"
description: txid of the transaction where this address is one of the outputs

PushNotificationOnchainAddressGotUnconfirmedTransaction:
allOf: # Combines PushNotificationBase and the inline model
- $ref: "#/components/schemas/PushNotificationBase"
- type: object
required:
- address
- sat
- txid
properties:
type:
type: "integer"
enum: [3]
sat:
type: "integer"
description: amount of satoshis
address:
type: "string"
description: user's onchain address that has incoming transaction
txid:
type: "string"
description: txid of the transaction where this address is one of the outputs
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "groundcontrol",
"version": "0.0.7",
"version": "0.0.8",
"description": "GroundControl push server API",
"devDependencies": {
"prettier": "2.0.5"
Expand All @@ -24,6 +24,7 @@
"lint": "npx prettier --write .",
"start": "ts-node src/index.ts",
"worker-blockprocessor": "ts-node src/worker-blockprocessor.ts",
"worker-processmempool": "ts-node src/worker-processmempool.ts",
"worker-sender": "ts-node src/worker-sender.ts"
}
}
36 changes: 34 additions & 2 deletions src/class/GroundControlToMajorTom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,38 @@ const http2 = require("http2");
* @see https://firebase.google.com/docs/cloud-messaging/http-server-ref
*/
export class GroundControlToMajorTom {
static async pushOnchainAddressGotUnconfirmedTransaction(
serverKey: string,
apnsPem: string,
pushNotification: Components.Schemas.PushNotificationOnchainAddressGotUnconfirmedTransaction
): Promise<[object, object]> {
const fcmPayload = {
data: {},
notification: {
title: "New unconfirmed transaction",
body: "You received new transfer on " + pushNotification.address,
badge: pushNotification.badge,
tag: pushNotification.txid,
},
};

const apnsPayload = {
aps: {
badge: pushNotification.badge,
alert: {
title: "New unconfirmed transaction",
body: "You received new transfer on " + pushNotification.address,
},
sound: "default",
},
data: {},
};

if (pushNotification.os === "android") return GroundControlToMajorTom._pushToFcm(serverKey, pushNotification.token, fcmPayload, pushNotification);
if (pushNotification.os === "ios")
return GroundControlToMajorTom._pushToApns(apnsPem, pushNotification.token, apnsPayload, pushNotification, pushNotification.txid);
}

static async pushOnchainAddressWasPaid(
serverKey: string,
apnsPem: string,
Expand All @@ -26,7 +58,7 @@ export class GroundControlToMajorTom {
data: {},
notification: {
title: "+" + pushNotification.sat + " sats",
body: "Your received new transfer on " + pushNotification.address,
body: "You received new transfer on " + pushNotification.address,
badge: pushNotification.badge,
tag: pushNotification.txid,
},
Expand All @@ -37,7 +69,7 @@ export class GroundControlToMajorTom {
badge: pushNotification.badge,
alert: {
title: "+" + pushNotification.sat + " sats",
body: "Your received new transfer on " + pushNotification.address,
body: "You received new transfer on " + pushNotification.address,
},
sound: "default",
},
Expand Down
24 changes: 23 additions & 1 deletion src/openapi/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ declare namespace Components {
* type:
* * `1` - Your lightning invoice was paid
* * `2` - New transaction to one of your addresses
* * `3` - New unconfirmed transaction to one of your addresses
*
*/
type: 1 | 2;
type: 1 | 2 | 3;
token: string;
os: "android" | "ios";
badge?: number;
Expand Down Expand Up @@ -78,6 +79,27 @@ declare namespace Components {
*/
txid: string;
}
/**
* payload for push notification delivered to phone
*/
export interface PushNotificationOnchainAddressGotUnconfirmedTransaction {
type: 3;
token: string;
os: "android" | "ios";
badge?: number;
/**
* amount of satoshis
*/
sat: number;
/**
* user's onchain address that has incoming transaction
*/
address: string;
/**
* txid of the transaction where this address is one of the outputs
*/
txid: string;
}
export interface ServerInfo {
name?: string;
description?: string;
Expand Down
4 changes: 2 additions & 2 deletions src/worker-blockprocessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ async function processBlock(blockNum, sendQueueRepository: Repository<SendQueue>
}

console.warn(addresses.length, "addresses paid in block");
allPotentialPushPayloadsArray.push({ address: "bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led", txid: "666", sat: 1488, type: 2, token: "", os: "ios" }); // debug fixme
addresses.push("bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led"); // debug fixme
// allPotentialPushPayloadsArray.push({ address: "bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led", txid: "666", sat: 1488, type: 2, token: "", os: "ios" }); // debug fixme
// addresses.push("bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led"); // debug fixme

const query = getRepository(TokenToAddress).createQueryBuilder().where("address IN (:...address)", { address: addresses });

Expand Down
151 changes: 151 additions & 0 deletions src/worker-processmempool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import "./openapi/api";
import "reflect-metadata";
import { createConnection, getRepository, Repository } from "typeorm";
import { TokenToAddress } from "./entity/TokenToAddress";
import { SendQueue } from "./entity/SendQueue";
import { KeyValue } from "./entity/KeyValue";
require("dotenv").config();
const url = require("url");
let jayson = require("jayson/promise");
let rpc = url.parse(process.env.BITCOIN_RPC);
let client = jayson.client.http(rpc);

let processedTxids = {};
const parsed = url.parse(process.env.JAWSDB_MARIA_URL);
if (!process.env.JAWSDB_MARIA_URL || !process.env.BITCOIN_RPC) {
console.error("not all env variables set");
process.exit();
}

process
.on("unhandledRejection", (reason, p) => {
console.error(reason, "Unhandled Rejection at Promise", p);
process.exit(1);
})
.on("uncaughtException", (err) => {
console.error(err, "Uncaught Exception thrown");
process.exit(1);
});

let sendQueueRepository: Repository<SendQueue>;

async function processMempool() {
console.log("cached txids=", Object.keys(processedTxids).length);
const responseGetrawmempool = await client.request("getrawmempool", []);
console.log(responseGetrawmempool.result.length, "txs in mempool");

let addresses: string[] = [];
let allPotentialPushPayloadsArray: Components.Schemas.PushNotificationOnchainAddressGotUnconfirmedTransaction[] = [];

let rpcBatch = [];
const batchSize = 100;
let countTxidsProcessed = 0;
for (const txid of responseGetrawmempool.result) {
countTxidsProcessed++;
if (!txid) continue;
if (!processedTxids[txid]) rpcBatch.push(client.request("getrawtransaction", [txid, true], undefined, false));
if (rpcBatch.length >= batchSize || countTxidsProcessed === responseGetrawmempool.result.length) {
const startBatch = +new Date();
// got enough txids lets batch fetch them from bitcoind rpc
const responses = await client.request(rpcBatch);
for (const response of responses) {
if (response.result && response.result.vout) {
for (const output of response.result.vout) {
if (output.scriptPubKey && output.scriptPubKey.addresses) {
for (const address of output.scriptPubKey.addresses) {
addresses.push(address);
processedTxids[response.result.txid] = true;
const payload: Components.Schemas.PushNotificationOnchainAddressGotUnconfirmedTransaction = {
address,
txid: response.result.txid,
sat: Math.floor(output.value * 100000000),
type: 3,
token: "",
os: "ios",
};
allPotentialPushPayloadsArray.push(payload);
}
}
}
}
}

// allPotentialPushPayloadsArray.push({ address: "bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led", txid: "666", sat: 1488, type: 3, token: "", os: "ios" }); // debug fixme
// addresses.push("bc1qaemfnglf928kd9ma2jzdypk333au6ctu7h7led"); // debug fixme

if (addresses.length === 0) {
allPotentialPushPayloadsArray = [];
addresses = [];
rpcBatch = [];
continue;
}

// fetching found addresses from db:
const query = getRepository(TokenToAddress).createQueryBuilder().where("address IN (:...address)", { address: addresses });
for (const t2a of await query.getMany()) {
// found all addresses that we are tracking on behalf of our users. now,
// iterating all addresses in a block to see if there is a match.
// we could only iterate tracked addresses, but that would imply deduplication which is not good (for example,
// in a single block user could get several incoming payments to different owned addresses)
// cycle in cycle is less than optimal, but we can live with that for now
for (let payload of allPotentialPushPayloadsArray) {
if (t2a.address === payload.address) {
console.log("enqueueing", payload);
payload.os = t2a.os === "android" ? "android" : "ios"; // hacky
payload.token = t2a.token;
payload.type = 3;
await sendQueueRepository.save({
data: JSON.stringify(payload),
});
}
}
}

allPotentialPushPayloadsArray = [];
addresses = [];
rpcBatch = [];

const endBatch = +new Date();
// process.stdout.write('.');
console.log("batch took", (endBatch - startBatch) / 1000, "sec");
}
}
}

createConnection({
type: "mariadb",
host: parsed.hostname,
port: parsed.port,
username: parsed.auth.split(":")[0],
password: parsed.auth.split(":")[1],
database: parsed.path.replace("/", ""),
synchronize: true,
logging: false,
entities: ["src/entity/**/*.ts"],
migrations: ["src/migration/**/*.ts"],
subscribers: ["src/subscriber/**/*.ts"],
cli: {
entitiesDir: "src/entity",
migrationsDir: "src/migration",
subscribersDir: "src/subscriber",
},
})
.then(async (connection) => {
// start worker
console.log("running");

sendQueueRepository = getRepository(SendQueue);

while (1) {
const start = +new Date();
try {
await processMempool();
} catch (error) {
console.log(error);
}
const end = +new Date();
console.log("processing mempool took", (end - start) / 1000, "sec");
console.log("-----------------------");
}
})
.catch((error) => console.log(error));
7 changes: 7 additions & 0 deletions src/worker-sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ createConnection({
await GroundControlToMajorTom.pushOnchainAddressWasPaid(serverKey, apnsPem, payload);
await sendQueueRepository.remove(record);
break;
case 3:
payload = <Components.Schemas.PushNotificationOnchainAddressGotUnconfirmedTransaction>payload;
console.warn("pushing to token", payload.token, payload.os);
payload.badge = 1;
await GroundControlToMajorTom.pushOnchainAddressGotUnconfirmedTransaction(serverKey, apnsPem, payload);
await sendQueueRepository.remove(record);
break;
case 1:
// TODO, currently handled in web request
break;
Expand Down

0 comments on commit 432c450

Please sign in to comment.