Skip to content

Commit 1180e79

Browse files
committed
Updated blockindexer to use eventemitter and added structure for the coin indexer
Signed-off-by: Robert Gogete <[email protected]>
1 parent 6d95fb7 commit 1180e79

File tree

8 files changed

+297
-31
lines changed

8 files changed

+297
-31
lines changed

src/application/workers/BlockIndexer/BlockIndexer.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { EventEmitter } from 'events';
21
import { spawn, Worker, Thread } from 'threads';
32
import { Block } from '../../types/Block';
43
import { IWorker } from '../IWorker';
5-
import { BlockIndexerEventNames, BlockIndexerEvents } from './BlockIndexerEvents';
4+
import { BlockIndexerEvents } from './BlockIndexerEvents';
65

76
interface BlockIndexerWorkerApi {
87
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -13,7 +12,7 @@ interface IBlockIndexer extends IWorker {
1312
onBlockIngested(listener: (block: Block) => void): void;
1413
}
1514

16-
export class BlockIndexer extends (EventEmitter as { new(): BlockIndexerEvents }) implements IBlockIndexer {
15+
export class BlockIndexer extends BlockIndexerEvents implements IBlockIndexer {
1716
private worker: import('threads').ModuleThread<BlockIndexerWorkerApi> | null = null;
1817
private started = false;
1918
private restartIntervalId: NodeJS.Timeout | null = null;
@@ -48,7 +47,7 @@ export class BlockIndexer extends (EventEmitter as { new(): BlockIndexerEvents }
4847
}
4948

5049
this.worker.onBlockIngested().subscribe((block: Block) => {
51-
this.emit(BlockIndexerEventNames.BlockIngested, block);
50+
this.emitBlockIngested(block);
5251
});
5352

5453
try {
@@ -81,7 +80,7 @@ export class BlockIndexer extends (EventEmitter as { new(): BlockIndexerEvents }
8180
}
8281
}
8382

84-
onBlockIngested(listener: (block: Block) => void) {
85-
this.on(BlockIndexerEventNames.BlockIngested, listener);
83+
onBlockIngested(listener: (block: Block) => void): this {
84+
return super.onBlockIngested(listener);
8685
}
8786
}

src/application/workers/BlockIndexer/BlockIndexer.worker.logic.ts

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
11
import Database from 'better-sqlite3';
2-
import { Observable } from 'observable-fns';
32
import { BlockChainType } from '../../types/BlockChain';
43
import { ChiaBlockchainService } from '../../../infrastructure/BlockchainServices/ChiaBlockchainService';
54
import { Block } from '../../types/Block';
65
import { TestBlockchainService } from '../../../infrastructure/BlockchainServices/TestBlockchainService';
76
import { CREATE_BLOCKS_TABLE_SQL } from '../../repositories/BlockRepository';
87
import { IBlockchainService } from '../../interfaces/IBlockChainService';
9-
8+
import { BlockIndexerEvents, BlockIndexerEventNames } from './BlockIndexerEvents';
109

1110
let db: Database.Database | null = null;
1211
let intervalId: NodeJS.Timeout | null = null;
1312
let started = false;
1413

15-
let blockObservable: Observable<Block> | null = null;
16-
let blockObserver: ((block: Block) => void) | null = null;
14+
const eventEmitter = new BlockIndexerEvents();
1715

1816
let blockHeight = 0;
19-
2017
let blockchainService: IBlockchainService;
2118

2219
async function syncToBlockchainHeight() {
@@ -35,9 +32,7 @@ async function syncToBlockchainHeight() {
3532
block.hash,
3633
block.blockHeight,
3734
);
38-
if (blockObserver) {
39-
blockObserver(block);
40-
}
35+
eventEmitter.emitBlockIngested(block);
4136
}
4237
blockHeight = h;
4338
}
@@ -74,18 +69,9 @@ export const api = {
7469
if (intervalId) clearInterval(intervalId);
7570
started = false;
7671
},
77-
onBlockIngested() {
78-
if (!blockObservable) {
79-
blockObservable = new Observable<Block>((observer) => {
80-
blockObserver = (block: Block) => {
81-
observer.next(block);
82-
};
83-
return () => {
84-
blockObserver = null;
85-
};
86-
});
87-
}
88-
return blockObservable;
72+
onBlockIngested(listener: (block: Block) => void) {
73+
eventEmitter.onBlockIngested(listener);
74+
return () => eventEmitter.off(BlockIndexerEventNames.BlockIngested, listener);
8975
},
9076
// For testing: reset all state
9177
__reset() {
@@ -94,7 +80,5 @@ export const api = {
9480
db = null;
9581
intervalId = null;
9682
started = false;
97-
blockObservable = null;
98-
blockObserver = null;
9983
},
10084
};
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import { EventEmitter } from 'events';
12
import { Block } from "../../types/Block";
23

34
export const enum BlockIndexerEventNames {
45
BlockIngested = 'hashGenerated',
56
}
67

7-
export interface BlockIndexerEvents {
8-
on(event: BlockIndexerEventNames.BlockIngested, listener: (block: Block) => void): this;
9-
emit(event: BlockIndexerEventNames.BlockIngested, block: Block): boolean;
8+
export class BlockIndexerEvents extends EventEmitter {
9+
emitBlockIngested(block: Block): boolean {
10+
return this.emit(BlockIndexerEventNames.BlockIngested, block);
11+
}
12+
13+
onBlockIngested(listener: (block: Block) => void): this {
14+
return this.on(BlockIndexerEventNames.BlockIngested, listener);
15+
}
1016
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { EventEmitter } from 'events';
2+
import { spawn, Worker, Thread } from 'threads';
3+
import { IWorker } from '../IWorker';
4+
import {
5+
CoinIndexerEventNames,
6+
CoinStateUpdatedEvent,
7+
} from './CoinIndexerEvents';
8+
9+
export interface CoinIndexerWorkerApi {
10+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
11+
[key: string]: (...args: any[]) => any;
12+
}
13+
14+
interface ICoinIndexer extends IWorker {
15+
onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void;
16+
}
17+
18+
export class CoinIndexer
19+
extends EventEmitter
20+
implements ICoinIndexer
21+
{
22+
private worker: import('threads').ModuleThread<CoinIndexerWorkerApi> | null = null;
23+
private started = false;
24+
private restartIntervalId: NodeJS.Timeout | null = null;
25+
private restartIntervalMs: number | null = null;
26+
27+
async start(
28+
blockchainType: string,
29+
dbPath: string = './coin_indexer.sqlite',
30+
restartIntervalHours?: number,
31+
): Promise<void> {
32+
await this.startWorker(blockchainType, dbPath);
33+
34+
if (restartIntervalHours && restartIntervalHours > 0) {
35+
this.restartIntervalMs = restartIntervalHours * 60 * 60 * 1000;
36+
this.restartIntervalId = setInterval(async () => {
37+
await this.restartWorker(blockchainType, dbPath);
38+
}, this.restartIntervalMs);
39+
}
40+
}
41+
42+
private async startWorker(blockchainType: string, dbPath: string) {
43+
if (this.started) return;
44+
if (!this.worker) {
45+
// Use src worker for tests/dev, dist worker for production
46+
let workerPath: string;
47+
if (process.env.JEST_WORKER_ID !== undefined || process.env.NODE_ENV === 'test') {
48+
workerPath = '../../../../dist/application/workers/CoinIndexer/CoinIndexer.worker.js';
49+
} else {
50+
workerPath = './CoinIndexer.worker.ts';
51+
}
52+
this.worker = (await spawn(
53+
new Worker(workerPath),
54+
)) as import('threads').ModuleThread<CoinIndexerWorkerApi>;
55+
}
56+
57+
this.worker.onCoinStateUpdated().subscribe((coinState: CoinStateUpdatedEvent) => {
58+
this.emit(CoinIndexerEventNames.CoinStateUpdated, coinState);
59+
});
60+
61+
try {
62+
await this.worker.start(blockchainType, dbPath);
63+
} catch {
64+
await this.restartWorker(blockchainType, dbPath);
65+
}
66+
67+
this.started = true;
68+
}
69+
70+
private async restartWorker(blockchainType: string, dbPath: string) {
71+
if (this.worker) {
72+
await this.worker.stop();
73+
this.started = false;
74+
await Thread.terminate(this.worker);
75+
this.worker = null;
76+
}
77+
await this.startWorker(blockchainType, dbPath);
78+
}
79+
80+
async stop(): Promise<void> {
81+
if (this.started && this.worker) await this.worker.stop();
82+
this.started = false;
83+
if (this.worker) await Thread.terminate(this.worker);
84+
if (this.restartIntervalId) {
85+
clearInterval(this.restartIntervalId);
86+
this.restartIntervalId = null;
87+
}
88+
}
89+
90+
onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void {
91+
this.on(CoinIndexerEventNames.CoinStateUpdated, listener);
92+
}
93+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import Database from 'better-sqlite3';
2+
import { CoinIndexerEvents, CoinStateUpdatedEvent } from './CoinIndexerEvents';
3+
4+
let db: Database.Database | null = null;
5+
let events: CoinIndexerEvents | null = null;
6+
let started = false;
7+
8+
function setupTables() {
9+
db!.exec(`
10+
CREATE TABLE IF NOT EXISTS wallet (
11+
address TEXT PRIMARY KEY,
12+
namespace TEXT DEFAULT 'default',
13+
synced_to_height INTEGER,
14+
synced_to_hash TEXT
15+
);
16+
CREATE TABLE IF NOT EXISTS coin (
17+
wallet_id TEXT,
18+
coinId BLOB,
19+
parent_coin_info BLOB,
20+
puzzle_hash BLOB,
21+
amount TEXT,
22+
synced_height INTEGER,
23+
status TEXT CHECK(status IN ('unspent', 'pending', 'spent')),
24+
PRIMARY KEY (wallet_id, coinId)
25+
);
26+
CREATE INDEX IF NOT EXISTS idx_coin_wallet_id ON coin(wallet_id);
27+
CREATE INDEX IF NOT EXISTS idx_coin_status ON coin(status);
28+
`);
29+
}
30+
31+
export const api = {
32+
start(_blockchainType: string, dbPath: string = './coin_indexer.sqlite') {
33+
if (started) return;
34+
db = new Database(dbPath);
35+
events = new CoinIndexerEvents();
36+
setupTables();
37+
started = true;
38+
},
39+
stop() {
40+
started = false;
41+
db = null;
42+
events = null;
43+
},
44+
addWallet(address: string, namespace: string = 'default') {
45+
db!.prepare(
46+
`INSERT OR IGNORE INTO wallet (address, namespace) VALUES (?, ?)`
47+
).run(address, namespace);
48+
},
49+
updateWalletSync(address: string, synced_to_height: number, synced_to_hash: string) {
50+
db!.prepare(
51+
`UPDATE wallet SET synced_to_height = ?, synced_to_hash = ? WHERE address = ?`
52+
).run(synced_to_height, synced_to_hash, address);
53+
},
54+
upsertCoin(wallet_id: string, coin: { coinId: Buffer, parent_coin_info: Buffer, puzzle_hash: Buffer, amount: bigint, synced_height: number, status: string }) {
55+
db!.prepare(
56+
`INSERT OR REPLACE INTO coin (wallet_id, coinId, parent_coin_info, puzzle_hash, amount, synced_height, status) VALUES (?, ?, ?, ?, ?, ?, ?)`
57+
).run(
58+
wallet_id,
59+
coin.coinId,
60+
coin.parent_coin_info,
61+
coin.puzzle_hash,
62+
coin.amount.toString(),
63+
coin.synced_height,
64+
coin.status
65+
);
66+
// Emit event for coin state update
67+
if (events) {
68+
const event: CoinStateUpdatedEvent = {
69+
wallet_id,
70+
coinId: coin.coinId,
71+
status: coin.status as 'unspent' | 'pending' | 'spent',
72+
synced_height: coin.synced_height,
73+
};
74+
events.emitCoinStateUpdated(event);
75+
}
76+
},
77+
getWallets() {
78+
return db!.prepare('SELECT * FROM wallet').all();
79+
},
80+
getCoins(wallet_id: string) {
81+
return db!.prepare('SELECT * FROM coin WHERE wallet_id = ?').all(wallet_id);
82+
},
83+
onCoinStateUpdated(listener: (event: CoinStateUpdatedEvent) => void) {
84+
if (!events) throw new Error('CoinIndexerEvents not initialized');
85+
events.onCoinStateUpdated(listener);
86+
},
87+
__reset() {
88+
db = null;
89+
events = null;
90+
started = false;
91+
},
92+
};
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import { expose } from 'threads/worker';
2+
import { api } from './CoinIndexer.worker.logic';
3+
4+
expose(api);
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { EventEmitter } from 'events';
2+
3+
export enum CoinIndexerEventNames {
4+
CoinStateUpdated = 'CoinStateUpdated',
5+
}
6+
7+
export interface CoinStateUpdatedEvent {
8+
wallet_id: string;
9+
coinId: Buffer;
10+
status: 'unspent' | 'pending' | 'spent';
11+
synced_height: number;
12+
}
13+
14+
export class CoinIndexerEvents extends EventEmitter {
15+
emitCoinStateUpdated(event: CoinStateUpdatedEvent) {
16+
this.emit(CoinIndexerEventNames.CoinStateUpdated, event);
17+
}
18+
onCoinStateUpdated(listener: (event: CoinStateUpdatedEvent) => void) {
19+
this.on(CoinIndexerEventNames.CoinStateUpdated, listener);
20+
}
21+
}

0 commit comments

Comments
 (0)