Skip to content

[WIP] Coin Indexer #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/application/repositories/BlockRepository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Database from 'better-sqlite3';
import { Block } from '../types/Block';
import { IBlockRepository } from './IBlockRepository';
import { IBlockRepository } from './Interfaces/IBlockRepository';

export const CREATE_BLOCKS_TABLE_SQL = `CREATE TABLE IF NOT EXISTS blocks (hash BLOB, blockHeight INTEGER PRIMARY KEY, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)`;

Expand Down
67 changes: 67 additions & 0 deletions src/application/repositories/CoinRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import Database from 'better-sqlite3';
import { ICoinRepository } from './Interfaces/ICoinRepository';
import { CoinStatus } from '../types/CoinStatus';

export interface CoinRow {
walletId: string;
coinId: Buffer;
parentCoinInfo: Buffer;
puzzleHash: Buffer;
amount: bigint;
syncedHeight: number;
status: CoinStatus;
}

let setupTable = (db: Database.Database) => {
db.exec(`
CREATE TABLE IF NOT EXISTS coin (
walletId TEXT,
coinId BLOB,
parentCoinInfo BLOB,
puzzleHash BLOB,
amount TEXT,
syncedHeight INTEGER,
status TEXT CHECK(status IN ('unspent', 'pending', 'spent')),
PRIMARY KEY (walletId, coinId)
);
CREATE INDEX IF NOT EXISTS idx_coin_walletId ON coin(walletId);
CREATE INDEX IF NOT EXISTS idx_coin_status ON coin(status);
`);
}

export class CoinRepository implements ICoinRepository {
private db: Database.Database;

constructor(db: Database.Database) {
this.db = db;
setupTable(db);
}

upsertCoin(walletId: string, coin: { coinId: Buffer, parentCoinInfo: Buffer, puzzleHash: Buffer, amount: bigint, syncedHeight: number, status: string }) {
this.db.prepare(
`INSERT OR REPLACE INTO coin (walletId, coinId, parentCoinInfo, puzzleHash, amount, syncedHeight, status) VALUES (?, ?, ?, ?, ?, ?, ?)`
).run(
walletId,
coin.coinId,
coin.parentCoinInfo,
coin.puzzleHash,
coin.amount.toString(),
coin.syncedHeight,
coin.status
);
}

getCoins(walletId: string): CoinRow[] {
return this.db.prepare('SELECT * FROM coin WHERE walletId = ?').all(walletId) as CoinRow[];
}

getPendingCoins(): CoinRow[] {
return this.db.prepare('SELECT * FROM coin WHERE status = ?').all(CoinStatus.PENDING) as CoinRow[];
}

updateCoinStatus(walletId: string, coinId: Buffer, status: CoinStatus, syncedHeight: number) {
this.db.prepare(
`UPDATE coin SET status = ?, syncedHeight = ? WHERE walletId = ? AND coinId = ?`
).run(status, syncedHeight, walletId, coinId);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Block } from '../types/Block';
import { Block } from '../../types/Block';

export interface IBlockRepository {
getLatestBlock(): Promise<Block>;
Expand Down
9 changes: 9 additions & 0 deletions src/application/repositories/Interfaces/ICoinRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { CoinStatus } from '../../types/CoinStatus';
import { CoinRow } from '../CoinRepository';

export interface ICoinRepository {
upsertCoin(walletId: string, coin: { coinId: Buffer, parentCoinInfo: Buffer, puzzleHash: Buffer, amount: bigint, syncedHeight: number, status: CoinStatus }): void;
getCoins(walletId: string): CoinRow[];
getPendingCoins(): CoinRow[];
updateCoinStatus(walletId: string, coinId: Buffer, status: CoinStatus, syncedHeight: number): void;
}
7 changes: 7 additions & 0 deletions src/application/repositories/Interfaces/IWalletRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { WalletRow } from '../WalletRepository';

export interface IWalletRepository {
addWallet(address: string, namespace?: string): void;
updateWalletSync(address: string, synced_to_height: number, synced_to_hash: string): void;
getWallets(): WalletRow[];
}
45 changes: 45 additions & 0 deletions src/application/repositories/WalletRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import Database from 'better-sqlite3';
import { IWalletRepository } from './Interfaces/IWalletRepository';

export interface WalletRow {
address: string;
namespace: string;
synced_to_height: number;
synced_to_hash: string;
}

let setupTable = (db: Database.Database) => {
db.exec(`
CREATE TABLE IF NOT EXISTS wallet (
address TEXT PRIMARY KEY,
namespace TEXT DEFAULT 'default',
synced_to_height INTEGER,
synced_to_hash TEXT
);
`);
}

export class WalletRepository implements IWalletRepository {
private db: Database.Database;

constructor(db: Database.Database) {
this.db = db;
setupTable(db);
}

addWallet(address: string, namespace: string = 'default') {
this.db.prepare(
`INSERT OR IGNORE INTO wallet (address, namespace) VALUES (?, ?)`
).run(address, namespace);
}

updateWalletSync(address: string, synced_to_height: number, synced_to_hash: string) {
this.db.prepare(
`UPDATE wallet SET synced_to_height = ?, synced_to_hash = ? WHERE address = ?`
).run(synced_to_height, synced_to_hash, address);
}

getWallets(): WalletRow[] {
return this.db.prepare('SELECT * FROM wallet').all() as WalletRow[];
}
}
5 changes: 5 additions & 0 deletions src/application/types/CoinStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum CoinStatus {
PENDING = 'pending',
UNSPENT = 'unspent',
SPENT = 'spent',
}
96 changes: 96 additions & 0 deletions src/application/workers/CoinIndexer/CoinIndexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { EventEmitter } from 'events';
import { spawn, Worker, Thread } from 'threads';
import { IWorker } from '../IWorker';
import {
CoinIndexerEventNames,
CoinIndexerEvents,
CoinStateUpdatedEvent,
} from './CoinIndexerEvents';

export interface CoinIndexerWorkerApi {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: (...args: any[]) => any;
}

interface ICoinIndexer extends IWorker {
onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void;
}

export class CoinIndexer
extends (EventEmitter as { new (): CoinIndexerEvents })
implements ICoinIndexer
{
private worker: import('threads').ModuleThread<CoinIndexerWorkerApi> | null = null;
private started = false;
private restartIntervalId: NodeJS.Timeout | null = null;
private restartIntervalMs: number | null = null;

async start(
blockchainType: string,
dbPath: string = './coin_indexer.sqlite',
restartIntervalHours?: number,
): Promise<void> {
await this.startWorker(blockchainType, dbPath);

if (restartIntervalHours && restartIntervalHours > 0) {
this.restartIntervalMs = restartIntervalHours * 60 * 60 * 1000;
this.restartIntervalId = setInterval(async () => {
await this.restartWorker(blockchainType, dbPath);
}, this.restartIntervalMs);
}
}

private async startWorker(blockchainType: string, dbPath: string) {
if (this.started) return;
if (!this.worker) {
// Use src worker for tests/dev, dist worker for production
let workerPath: string;
if (process.env.JEST_WORKER_ID !== undefined || process.env.NODE_ENV === 'test') {
workerPath = '../../../../dist/application/workers/CoinIndexer/CoinIndexer.worker.js';
} else {
workerPath = './CoinIndexer.worker.ts';
}
this.worker = (await spawn(
new Worker(workerPath),
)) as import('threads').ModuleThread<CoinIndexerWorkerApi>;
}

this.worker.onCoinStateUpdated().subscribe({
next: (coinState: CoinStateUpdatedEvent) => {
this.emit(CoinIndexerEventNames.CoinStateUpdated, coinState);
},
});

try {
await this.worker.start(blockchainType, dbPath);
} catch {
await this.restartWorker(blockchainType, dbPath);
}

this.started = true;
}

private async restartWorker(blockchainType: string, dbPath: string) {
if (this.worker) {
await this.worker.stop();
this.started = false;
await Thread.terminate(this.worker);
this.worker = null;
}
await this.startWorker(blockchainType, dbPath);
}

async stop(): Promise<void> {
if (this.started && this.worker) await this.worker.stop();
this.started = false;
if (this.worker) await Thread.terminate(this.worker);
if (this.restartIntervalId) {
clearInterval(this.restartIntervalId);
this.restartIntervalId = null;
}
}

onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void {
this.on(CoinIndexerEventNames.CoinStateUpdated, listener);
}
}
Loading
Loading