diff --git a/src/application/repositories/BlockRepository.ts b/src/application/repositories/BlockRepository.ts index 2f487fb..7ec745c 100644 --- a/src/application/repositories/BlockRepository.ts +++ b/src/application/repositories/BlockRepository.ts @@ -1,6 +1,6 @@ import Database from 'better-sqlite3'; import { Block } from '../types/Block'; -import { IBlockRepository } from './IBlockRepository'; +import { IBlockRepository } from './Interfaces/IBlockRepository'; export const CREATE_BLOCKS_TABLE_SQL = `CREATE TABLE IF NOT EXISTS blocks (hash BLOB, blockHeight INTEGER PRIMARY KEY, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)`; diff --git a/src/application/repositories/CoinRepository.ts b/src/application/repositories/CoinRepository.ts new file mode 100644 index 0000000..6c1174d --- /dev/null +++ b/src/application/repositories/CoinRepository.ts @@ -0,0 +1,67 @@ +import Database from 'better-sqlite3'; +import { ICoinRepository } from './Interfaces/ICoinRepository'; +import { CoinStatus } from '../types/CoinStatus'; + +export interface CoinRow { + walletId: string; + coinId: Buffer; + parentCoinInfo: Buffer; + puzzleHash: Buffer; + amount: bigint; + syncedHeight: number; + status: CoinStatus; +} + +let setupTable = (db: Database.Database) => { + db.exec(` + CREATE TABLE IF NOT EXISTS coin ( + walletId TEXT, + coinId BLOB, + parentCoinInfo BLOB, + puzzleHash BLOB, + amount TEXT, + syncedHeight INTEGER, + status TEXT CHECK(status IN ('unspent', 'pending', 'spent')), + PRIMARY KEY (walletId, coinId) + ); + CREATE INDEX IF NOT EXISTS idx_coin_walletId ON coin(walletId); + CREATE INDEX IF NOT EXISTS idx_coin_status ON coin(status); + `); +} + +export class CoinRepository implements ICoinRepository { + private db: Database.Database; + + constructor(db: Database.Database) { + this.db = db; + setupTable(db); + } + + upsertCoin(walletId: string, coin: { coinId: Buffer, parentCoinInfo: Buffer, puzzleHash: Buffer, amount: bigint, syncedHeight: number, status: string }) { + this.db.prepare( + `INSERT OR REPLACE INTO coin (walletId, coinId, parentCoinInfo, puzzleHash, amount, syncedHeight, status) VALUES (?, ?, ?, ?, ?, ?, ?)` + ).run( + walletId, + coin.coinId, + coin.parentCoinInfo, + coin.puzzleHash, + coin.amount.toString(), + coin.syncedHeight, + coin.status + ); + } + + getCoins(walletId: string): CoinRow[] { + return this.db.prepare('SELECT * FROM coin WHERE walletId = ?').all(walletId) as CoinRow[]; + } + + getPendingCoins(): CoinRow[] { + return this.db.prepare('SELECT * FROM coin WHERE status = ?').all(CoinStatus.PENDING) as CoinRow[]; + } + + updateCoinStatus(walletId: string, coinId: Buffer, status: CoinStatus, syncedHeight: number) { + this.db.prepare( + `UPDATE coin SET status = ?, syncedHeight = ? WHERE walletId = ? AND coinId = ?` + ).run(status, syncedHeight, walletId, coinId); + } +} diff --git a/src/application/repositories/IBlockRepository.ts b/src/application/repositories/Interfaces/IBlockRepository.ts similarity index 74% rename from src/application/repositories/IBlockRepository.ts rename to src/application/repositories/Interfaces/IBlockRepository.ts index c179bbc..f870d7e 100644 --- a/src/application/repositories/IBlockRepository.ts +++ b/src/application/repositories/Interfaces/IBlockRepository.ts @@ -1,4 +1,4 @@ -import { Block } from '../types/Block'; +import { Block } from '../../types/Block'; export interface IBlockRepository { getLatestBlock(): Promise; diff --git a/src/application/repositories/Interfaces/ICoinRepository.ts b/src/application/repositories/Interfaces/ICoinRepository.ts new file mode 100644 index 0000000..2895bb6 --- /dev/null +++ b/src/application/repositories/Interfaces/ICoinRepository.ts @@ -0,0 +1,9 @@ +import { CoinStatus } from '../../types/CoinStatus'; +import { CoinRow } from '../CoinRepository'; + +export interface ICoinRepository { + upsertCoin(walletId: string, coin: { coinId: Buffer, parentCoinInfo: Buffer, puzzleHash: Buffer, amount: bigint, syncedHeight: number, status: CoinStatus }): void; + getCoins(walletId: string): CoinRow[]; + getPendingCoins(): CoinRow[]; + updateCoinStatus(walletId: string, coinId: Buffer, status: CoinStatus, syncedHeight: number): void; +} diff --git a/src/application/repositories/Interfaces/IWalletRepository.ts b/src/application/repositories/Interfaces/IWalletRepository.ts new file mode 100644 index 0000000..0f7389f --- /dev/null +++ b/src/application/repositories/Interfaces/IWalletRepository.ts @@ -0,0 +1,7 @@ +import { WalletRow } from '../WalletRepository'; + +export interface IWalletRepository { + addWallet(address: string, namespace?: string): void; + updateWalletSync(address: string, synced_to_height: number, synced_to_hash: string): void; + getWallets(): WalletRow[]; +} diff --git a/src/application/repositories/WalletRepository.ts b/src/application/repositories/WalletRepository.ts new file mode 100644 index 0000000..ba42150 --- /dev/null +++ b/src/application/repositories/WalletRepository.ts @@ -0,0 +1,45 @@ +import Database from 'better-sqlite3'; +import { IWalletRepository } from './Interfaces/IWalletRepository'; + +export interface WalletRow { + address: string; + namespace: string; + synced_to_height: number; + synced_to_hash: string; +} + +let setupTable = (db: Database.Database) => { + db.exec(` + CREATE TABLE IF NOT EXISTS wallet ( + address TEXT PRIMARY KEY, + namespace TEXT DEFAULT 'default', + synced_to_height INTEGER, + synced_to_hash TEXT + ); + `); + } + +export class WalletRepository implements IWalletRepository { + private db: Database.Database; + + constructor(db: Database.Database) { + this.db = db; + setupTable(db); + } + + addWallet(address: string, namespace: string = 'default') { + this.db.prepare( + `INSERT OR IGNORE INTO wallet (address, namespace) VALUES (?, ?)` + ).run(address, namespace); + } + + updateWalletSync(address: string, synced_to_height: number, synced_to_hash: string) { + this.db.prepare( + `UPDATE wallet SET synced_to_height = ?, synced_to_hash = ? WHERE address = ?` + ).run(synced_to_height, synced_to_hash, address); + } + + getWallets(): WalletRow[] { + return this.db.prepare('SELECT * FROM wallet').all() as WalletRow[]; + } +} diff --git a/src/application/types/CoinStatus.ts b/src/application/types/CoinStatus.ts new file mode 100644 index 0000000..d48deac --- /dev/null +++ b/src/application/types/CoinStatus.ts @@ -0,0 +1,5 @@ +export enum CoinStatus { + PENDING = 'pending', + UNSPENT = 'unspent', + SPENT = 'spent', +} diff --git a/src/application/workers/CoinIndexer/CoinIndexer.ts b/src/application/workers/CoinIndexer/CoinIndexer.ts new file mode 100644 index 0000000..f447603 --- /dev/null +++ b/src/application/workers/CoinIndexer/CoinIndexer.ts @@ -0,0 +1,96 @@ +import { EventEmitter } from 'events'; +import { spawn, Worker, Thread } from 'threads'; +import { IWorker } from '../IWorker'; +import { + CoinIndexerEventNames, + CoinIndexerEvents, + CoinStateUpdatedEvent, +} from './CoinIndexerEvents'; + +export interface CoinIndexerWorkerApi { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [key: string]: (...args: any[]) => any; +} + +interface ICoinIndexer extends IWorker { + onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void; +} + +export class CoinIndexer + extends (EventEmitter as { new (): CoinIndexerEvents }) + implements ICoinIndexer +{ + private worker: import('threads').ModuleThread | null = null; + private started = false; + private restartIntervalId: NodeJS.Timeout | null = null; + private restartIntervalMs: number | null = null; + + async start( + blockchainType: string, + dbPath: string = './coin_indexer.sqlite', + restartIntervalHours?: number, + ): Promise { + await this.startWorker(blockchainType, dbPath); + + if (restartIntervalHours && restartIntervalHours > 0) { + this.restartIntervalMs = restartIntervalHours * 60 * 60 * 1000; + this.restartIntervalId = setInterval(async () => { + await this.restartWorker(blockchainType, dbPath); + }, this.restartIntervalMs); + } + } + + private async startWorker(blockchainType: string, dbPath: string) { + if (this.started) return; + if (!this.worker) { + // Use src worker for tests/dev, dist worker for production + let workerPath: string; + if (process.env.JEST_WORKER_ID !== undefined || process.env.NODE_ENV === 'test') { + workerPath = '../../../../dist/application/workers/CoinIndexer/CoinIndexer.worker.js'; + } else { + workerPath = './CoinIndexer.worker.ts'; + } + this.worker = (await spawn( + new Worker(workerPath), + )) as import('threads').ModuleThread; + } + + this.worker.onCoinStateUpdated().subscribe({ + next: (coinState: CoinStateUpdatedEvent) => { + this.emit(CoinIndexerEventNames.CoinStateUpdated, coinState); + }, + }); + + try { + await this.worker.start(blockchainType, dbPath); + } catch { + await this.restartWorker(blockchainType, dbPath); + } + + this.started = true; + } + + private async restartWorker(blockchainType: string, dbPath: string) { + if (this.worker) { + await this.worker.stop(); + this.started = false; + await Thread.terminate(this.worker); + this.worker = null; + } + await this.startWorker(blockchainType, dbPath); + } + + async stop(): Promise { + if (this.started && this.worker) await this.worker.stop(); + this.started = false; + if (this.worker) await Thread.terminate(this.worker); + if (this.restartIntervalId) { + clearInterval(this.restartIntervalId); + this.restartIntervalId = null; + } + } + + onCoinStateUpdated(listener: (coinState: CoinStateUpdatedEvent) => void): void { + this.on(CoinIndexerEventNames.CoinStateUpdated, listener); + } +} diff --git a/src/application/workers/CoinIndexer/CoinIndexer.worker.logic.ts b/src/application/workers/CoinIndexer/CoinIndexer.worker.logic.ts new file mode 100644 index 0000000..772247a --- /dev/null +++ b/src/application/workers/CoinIndexer/CoinIndexer.worker.logic.ts @@ -0,0 +1,156 @@ +import Database from 'better-sqlite3'; +import { Observable } from 'observable-fns'; +import { CoinStateUpdatedEvent } from './CoinIndexerEvents'; +import { CoinRepository, CoinRow } from '../../repositories/CoinRepository'; +import { WalletRepository, WalletRow } from '../../repositories/WalletRepository'; +import { IBlockchainService } from '../../interfaces/IBlockChainService'; +import { Peer, type Coin } from '@dignetwork/datalayer-driver'; +import { BlockChainType } from '../../types/BlockChain'; +import { TestBlockchainService } from '../../../infrastructure/BlockchainServices/TestBlockchainService'; +import { ChiaBlockchainService } from '../../../infrastructure/BlockchainServices/ChiaBlockchainService'; +import { CoinStatus } from '../../types/CoinStatus'; + +let db: Database.Database | null = null; +let coinRepo: CoinRepository | null = null; +let walletRepo: WalletRepository | null = null; +let started = false; +let intervalId: NodeJS.Timeout | null = null; +let blockchainService: IBlockchainService | null = null; + +let coinStateObservable: Observable | null = null; +let coinStateObserver: ((event: CoinStateUpdatedEvent) => void) | null = null; + +function mapUnspentCoinToDbFields(coin: Coin, walletId: string, syncedHeight: number): CoinRow { + return { + coinId: blockchainService!.getCoinId(coin), + parentCoinInfo: coin.parentCoinInfo, + puzzleHash: coin.puzzleHash, + amount: coin.amount, + syncedHeight, + status: CoinStatus.PENDING, + walletId, + }; +} + +async function sync() { + if (!coinRepo || !walletRepo || !blockchainService) return; + const wallets: WalletRow[] = walletRepo.getWallets(); + let peer: Peer | null = null; // TODO: get from PeerCluster if needed + + for (const wallet of wallets) { + // Find all coins for this wallet that are pending + const pendingCoins = coinRepo + .getCoins(wallet.address) + .filter((c) => c.status === CoinStatus.PENDING); + // Determine the smallest syncedHeight among pending coins, or use wallet.synced_to_height + let fetchFromHeight = wallet.synced_to_height || 0; + let fetchFromHash = Buffer.from(wallet.synced_to_hash) || Buffer.alloc(32); + + if (pendingCoins.length > 0) { + // Find the pending coin with the minimum syncedHeight + const minPendingCoin = pendingCoins.reduce( + (min, c) => (c.syncedHeight < min.syncedHeight ? c : min), + pendingCoins[0], + ); + fetchFromHeight = + wallet.synced_to_height === undefined + ? minPendingCoin.syncedHeight + : Math.min(fetchFromHeight, minPendingCoin.syncedHeight); + // Use the hash from the min height pending coin + fetchFromHash = minPendingCoin.puzzleHash || fetchFromHash; + } + // Fetch unspent coins from blockchain service + const unspent = await blockchainService.listUnspentCoins( + peer!, + Buffer.from(wallet.address, 'hex'), + fetchFromHeight, + fetchFromHash, + ); + // Upsert all returned coins as unspent + const seenCoinIds = new Set(); + for (const coin of unspent.coins) { + const mapped = mapUnspentCoinToDbFields(coin, wallet.address, fetchFromHeight); + coinRepo.upsertCoin(wallet.address, mapped); + seenCoinIds.add(mapped.coinId.toString('hex')); + if (coinStateObserver) { + coinStateObserver({ + walletId: wallet.address, + coinId: mapped.coinId, + status: CoinStatus.UNSPENT, + syncedHeight: fetchFromHeight, + }); + } + } + // Mark as spent any pending coins not present in the response + for (const coin of pendingCoins) { + if (!seenCoinIds.has(coin.coinId.toString('hex'))) { + coinRepo.updateCoinStatus(coin.walletId, coin.coinId, CoinStatus.SPENT, coin.syncedHeight); + if (coinStateObserver) { + coinStateObserver({ + walletId: coin.walletId, + coinId: coin.coinId, + status: CoinStatus.SPENT, + syncedHeight: coin.syncedHeight, + }); + } + } + } + } +} + +export const api = { + async start(_blockchainType: BlockChainType, dbPath: string = './coin_indexer.sqlite') { + if (started) return; + db = new Database(dbPath); + coinRepo = new CoinRepository(db); + walletRepo = new WalletRepository(db); + + switch (_blockchainType) { + case BlockChainType.Test: + blockchainService = new TestBlockchainService(); + break; + case BlockChainType.Chia: + default: + blockchainService = new ChiaBlockchainService(); + break; + } + + await sync(); + + started = true; + intervalId = setInterval(sync, 1000); + }, + stop() { + started = false; + if (intervalId) clearInterval(intervalId); + db = null; + coinRepo = null; + walletRepo = null; + blockchainService = null; + coinStateObservable = null; + coinStateObserver = null; + }, + onCoinStateUpdated() { + if (!coinStateObservable) { + coinStateObservable = new Observable((observer) => { + coinStateObserver = (event: CoinStateUpdatedEvent) => { + observer.next(event); + }; + return () => { + coinStateObserver = null; + }; + }); + } + return coinStateObservable; + }, + __reset() { + if (intervalId) clearInterval(intervalId); + db = null; + coinRepo = null; + walletRepo = null; + blockchainService = null; + started = false; + coinStateObservable = null; + coinStateObserver = null; + }, +}; diff --git a/src/application/workers/CoinIndexer/CoinIndexer.worker.ts b/src/application/workers/CoinIndexer/CoinIndexer.worker.ts new file mode 100644 index 0000000..01e79d7 --- /dev/null +++ b/src/application/workers/CoinIndexer/CoinIndexer.worker.ts @@ -0,0 +1,4 @@ +import { expose } from 'threads/worker'; +import { api } from './CoinIndexer.worker.logic'; + +expose(api); \ No newline at end of file diff --git a/src/application/workers/CoinIndexer/CoinIndexerEvents.ts b/src/application/workers/CoinIndexer/CoinIndexerEvents.ts new file mode 100644 index 0000000..84d05e7 --- /dev/null +++ b/src/application/workers/CoinIndexer/CoinIndexerEvents.ts @@ -0,0 +1,17 @@ +import { CoinStatus } from "../../types/CoinStatus"; + +export enum CoinIndexerEventNames { + CoinStateUpdated = 'coinStateUpdated', +} + +export interface CoinStateUpdatedEvent { + walletId: string; + coinId: Buffer; + status: CoinStatus; + syncedHeight: number; +} + +export interface CoinIndexerEvents { + on(event: CoinIndexerEventNames.CoinStateUpdated, listener: (event: CoinStateUpdatedEvent) => void): this; + emit(event: CoinIndexerEventNames.CoinStateUpdated, eventData: CoinStateUpdatedEvent): boolean; +} \ No newline at end of file diff --git a/src/infrastructure/BlockchainServices/TestBlockchainService.ts b/src/infrastructure/BlockchainServices/TestBlockchainService.ts index 6686c0d..2c9a4bb 100644 --- a/src/infrastructure/BlockchainServices/TestBlockchainService.ts +++ b/src/infrastructure/BlockchainServices/TestBlockchainService.ts @@ -3,6 +3,7 @@ import { IBlockchainService } from "../../application/interfaces/IBlockChainServ import { Block } from "../../application/types/Block"; import Database from 'better-sqlite3'; import type { Coin, Peer, UnspentCoinsResponse } from '@dignetwork/datalayer-driver'; +import { CoinRow } from "../../application/repositories/CoinRepository"; export class TestBlockchainService implements IBlockchainService { private db: Database.Database; @@ -10,6 +11,15 @@ export class TestBlockchainService implements IBlockchainService { constructor() { this.db = new Database('testservice.sqlite'); this.db.exec(`CREATE TABLE IF NOT EXISTS blocks (hash BLOB, blockHeight INTEGER PRIMARY KEY, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)`); + this.db.exec(`CREATE TABLE IF NOT EXISTS coin (coin_id BLOB, parentCoinInfo BLOB, puzzleHash BLOB, amount TEXT, status TEXT, walletId TEXT, height INTEGER)`); + this.db.exec(` + CREATE TABLE IF NOT EXISTS wallet ( + address TEXT PRIMARY KEY, + namespace TEXT DEFAULT 'default', + synced_to_height INTEGER, + synced_to_hash TEXT + ); + `); } async getCurrentBlockchainHeight(): Promise { @@ -33,7 +43,7 @@ export class TestBlockchainService implements IBlockchainService { masterPublicKeyToFirstPuzzleHash(publicKey: Buffer): Buffer { return Buffer.alloc(32, 5); } puzzleHashToAddress(puzzleHash: Buffer, prefix: string): string { return prefix + puzzleHash.toString('hex'); } signMessage(message: Buffer, privateKey: Buffer): Buffer { return Buffer.from('deadbeef', 'hex'); } - getCoinId(coin: Coin): Buffer { return Buffer.from('cafebabe', 'hex'); } + getCoinId(coin: Coin): Buffer { return coin.puzzleHash; } selectCoins(coins: Coin[], amount: bigint): Coin[] { return coins.slice(0, 1); } // New methods for ColdWallet/WalletService @@ -49,8 +59,29 @@ export class TestBlockchainService implements IBlockchainService { previousHeight: number, previousHeaderHash: Buffer ): Promise { - // Dummy: return a fake response - return { coins: [], lastHeight: 0, lastHeaderHash: Buffer.alloc(32) }; + // Query the coins table for coins with puzzleHash and height >= previousHeight + let rows: CoinRow[] = []; + try{ + rows = this.db.prepare( + 'SELECT coin_id, parentCoinInfo, puzzleHash, amount, status, walletId, height FROM coin' + ).all() as CoinRow[]; + } + catch (error) { + console.error(`Error querying unspent coins: ${error}`); + throw new Error(`Failed to query unspent coins: ${error}`); + } + if (!rows) throw new Error(`no data found for puzzleHash ${puzzleHash.toString('hex')} and height >= ${previousHeight}`); + const coins = rows.map((row) => ({ + coinId: row.coinId, + parentCoinInfo: row.parentCoinInfo, + puzzleHash: row.puzzleHash, + amount: BigInt(row.amount), + status: row.status, + walletId: row.walletId, + blockHeight: row.syncedHeight, + })); + // Optionally, you can return lastHeight and lastHeaderHash if needed + return { coins, lastHeight: 0, lastHeaderHash: Buffer.alloc(32) }; } async isCoinSpendable( peer: Peer, diff --git a/test/application/workers/BlockIndexer/BlockRepository.test.ts b/test/application/repositories/BlockRepository.test.ts similarity index 90% rename from test/application/workers/BlockIndexer/BlockRepository.test.ts rename to test/application/repositories/BlockRepository.test.ts index ebdd88d..a55b16d 100644 --- a/test/application/workers/BlockIndexer/BlockRepository.test.ts +++ b/test/application/repositories/BlockRepository.test.ts @@ -1,7 +1,7 @@ import Database from 'better-sqlite3'; -import { BlockRepository } from '../../../../src/application/repositories/BlockRepository'; -import { IBlockRepository } from '../../../../src/application/repositories/IBlockRepository'; +import { BlockRepository } from '../../../src/application/repositories/BlockRepository'; import fs from 'fs'; +import { IBlockRepository } from '../../../src/application/repositories/Interfaces/IBlockRepository'; describe('BlockRepository', () => { let db: Database.Database; @@ -58,7 +58,7 @@ describe('BlockRepository', () => { const dbPath = 'test_blockrepository_createdb.sqlite'; if (fs.existsSync(dbPath)) fs.unlinkSync(dbPath); const Database = require('better-sqlite3'); - const { BlockRepository } = require('../../../../src/application/repositories/BlockRepository'); + const { BlockRepository } = require('../../../src/application/repositories/BlockRepository'); const db = new Database(dbPath); new BlockRepository(db); // Check file exists diff --git a/test/application/repositories/CoinRepository.test.ts b/test/application/repositories/CoinRepository.test.ts new file mode 100644 index 0000000..71da3f0 --- /dev/null +++ b/test/application/repositories/CoinRepository.test.ts @@ -0,0 +1,77 @@ +import Database from 'better-sqlite3'; +import { CoinRepository } from '../../../src/application/repositories/CoinRepository'; +import { ICoinRepository } from '../../../src/application/repositories/Interfaces/ICoinRepository'; +import { CoinStatus } from '../../../src/application/types/CoinStatus'; + +describe('CoinRepository', () => { + const dbPath = ':memory:'; + let db: Database.Database; + let coinRepo: ICoinRepository; + + beforeEach(() => { + db = new Database(dbPath); + coinRepo = new CoinRepository(db); + }); + + afterEach(() => { + db.close(); + }); + + it('should upsert and retrieve coins', () => { + const coin = { + coinId: Buffer.from('aabbcc', 'hex'), + parentCoinInfo: Buffer.from('ddeeff', 'hex'), + puzzleHash: Buffer.from('112233', 'hex'), + amount: BigInt(1000), + syncedHeight: 10, + status: CoinStatus.UNSPENT, + }; + coinRepo.upsertCoin('xch1234', coin); + const coins = coinRepo.getCoins('xch1234'); + expect(coins.length).toBe(1); + expect(Buffer.isBuffer(coins[0].coinId) || typeof coins[0].coinId === 'object').toBe(true); + expect(coins[0].amount).toBe('1000'); // expect string, matches repo logic + expect(coins[0].status).toBe(CoinStatus.UNSPENT); + }); + + it('should update coin status', () => { + const coin = { + coinId: Buffer.from('aabbcc', 'hex'), + parentCoinInfo: Buffer.from('ddeeff', 'hex'), + puzzleHash: Buffer.from('112233', 'hex'), + amount: BigInt(1000), + syncedHeight: 10, + status: CoinStatus.PENDING, + }; + coinRepo.upsertCoin('xch1234', coin); + coinRepo.updateCoinStatus('xch1234', coin.coinId, CoinStatus.SPENT, 11); + const coins = coinRepo.getCoins('xch1234'); + expect(coins[0].status).toBe(CoinStatus.SPENT); + expect(coins[0].syncedHeight).toBe(11); + }); + + it('should get pending coins', () => { + const coin1 = { + coinId: Buffer.from('aabbcc', 'hex'), + parentCoinInfo: Buffer.from('ddeeff', 'hex'), + puzzleHash: Buffer.from('112233', 'hex'), + amount: BigInt(1000), + syncedHeight: 10, + status: CoinStatus.PENDING, + }; + const coin2 = { + coinId: Buffer.from('bbccdd', 'hex'), + parentCoinInfo: Buffer.from('eeff00', 'hex'), + puzzleHash: Buffer.from('223344', 'hex'), + amount: BigInt(2000), + syncedHeight: 12, + status: CoinStatus.UNSPENT, + }; + coinRepo.upsertCoin('xch1234', coin1); + coinRepo.upsertCoin('xch1234', coin2); + const pending = coinRepo.getPendingCoins(); + expect(pending.length).toBe(1); + expect(pending[0].status).toBe(CoinStatus.PENDING); + expect(pending[0].coinId.equals(coin1.coinId)).toBe(true); + }); +}); diff --git a/test/application/repositories/WalletRepository.test.ts b/test/application/repositories/WalletRepository.test.ts new file mode 100644 index 0000000..8132d89 --- /dev/null +++ b/test/application/repositories/WalletRepository.test.ts @@ -0,0 +1,49 @@ +import Database from 'better-sqlite3'; +import { WalletRepository } from '../../../src/application/repositories/WalletRepository'; +import { IWalletRepository } from '../../../src/application/repositories/Interfaces/IWalletRepository'; + +describe('WalletRepository', () => { + const dbPath = ':memory:'; + let db: Database.Database; + let walletRepo: IWalletRepository; + + beforeEach(() => { + db = new Database(dbPath); + walletRepo = new WalletRepository(db); + }); + + afterEach(() => { + db.close(); + }); + + it('should add and retrieve wallets', () => { + walletRepo.addWallet('xch1234'); + const wallets = walletRepo.getWallets(); + expect(wallets.length).toBe(1); + expect(wallets[0].address).toBe('xch1234'); + expect(wallets[0].namespace).toBe('default'); + }); + + it('should add a wallet with a custom namespace', () => { + walletRepo.addWallet('xch5678', 'customns'); + const wallets = walletRepo.getWallets(); + expect(wallets.length).toBe(1); + expect(wallets[0].address).toBe('xch5678'); + expect(wallets[0].namespace).toBe('customns'); + }); + + it('should update wallet sync state', () => { + walletRepo.addWallet('xch1234'); + walletRepo.updateWalletSync('xch1234', 42, 'abc'); + const wallets = walletRepo.getWallets(); + expect(wallets[0].synced_to_height).toBe(42); + expect(wallets[0].synced_to_hash).toBe('abc'); + }); + + it('should not add duplicate wallets', () => { + walletRepo.addWallet('xch1234'); + walletRepo.addWallet('xch1234'); + const wallets = walletRepo.getWallets(); + expect(wallets.length).toBe(1); + }); +}); diff --git a/test/application/types/ColdWallet.test.ts b/test/application/types/ColdWallet.test.ts index bf70ffb..0727b00 100644 --- a/test/application/types/ColdWallet.test.ts +++ b/test/application/types/ColdWallet.test.ts @@ -2,7 +2,7 @@ import { ColdWallet } from '../../../src/application/types/ColdWallet'; import { TestBlockchainService } from '../../../src/infrastructure/BlockchainServices/TestBlockchainService'; const TEST_ADDRESS = 'xch1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqc8249j'; -const TEST_PUZZLE_HASH = Buffer.from('aabbcc', 'hex'); +const TEST_puzzleHash = Buffer.from('aabbcc', 'hex'); const TEST_SIGNATURE = Buffer.from('deadbeef', 'hex'); const TEST_PUBLIC_KEY = Buffer.from('cafebabe', 'hex'); const TEST_MESSAGE = Buffer.from('test message', 'utf-8'); @@ -33,7 +33,7 @@ describe('ColdWallet', () => { }); it('listUnspentCoins should delegate to blockchain and return coins', async () => { - const result = await wallet.listUnspentCoins(mockPeer, TEST_PUZZLE_HASH, 0, Buffer.alloc(32)); + const result = await wallet.listUnspentCoins(mockPeer, TEST_puzzleHash, 0, Buffer.alloc(32)); expect(result).toHaveProperty('coins'); }); diff --git a/test/application/workers/BlockIndexer/BlockIndexer.worker.logic.test.ts b/test/application/workers/BlockIndexer/BlockIndexer.worker.logic.test.ts index 9120ac4..145523b 100644 --- a/test/application/workers/BlockIndexer/BlockIndexer.worker.logic.test.ts +++ b/test/application/workers/BlockIndexer/BlockIndexer.worker.logic.test.ts @@ -3,20 +3,6 @@ import Database from 'better-sqlite3'; import { BlockChainType } from '../../../../src/application/types/BlockChain'; import fs from 'fs'; -// Mock BlockchainService for unit tests -class MockBlockchainService { - private blocks: any[] = []; - constructor(blocks: any[] = []) { - this.blocks = blocks; - } - async getCurrentBlockchainHeight() { - return this.blocks.length; - } - async getBlockchainBlockByHeight(h: number) { - return this.blocks[h - 1] || null; - } -} - describe('BlockIndexer.worker.logic api', () => { const dbPath = 'test_blockindexer_worker_logic.sqlite'; diff --git a/test/application/workers/CoinIndexer/CoinIndexer.integration.test.ts b/test/application/workers/CoinIndexer/CoinIndexer.integration.test.ts new file mode 100644 index 0000000..95eb2a7 --- /dev/null +++ b/test/application/workers/CoinIndexer/CoinIndexer.integration.test.ts @@ -0,0 +1,121 @@ +import { CoinIndexer } from '../../../../src/application/workers/CoinIndexer/CoinIndexer'; +import { BlockChainType } from '../../../../src/application/types/BlockChain'; +import Database from 'better-sqlite3'; +import fs from 'fs'; +import { CoinStatus } from '../../../../src/application/types/CoinStatus'; + +// Use the same DB as TestBlockchainService +const serviceDbPath = 'testservice.sqlite'; +const coinDbPath = 'test_coinindexer_integration.sqlite'; + +describe('CoinIndexer integration', () => { + let coinIndexer: CoinIndexer; + let servicedb: Database.Database; + let db: Database.Database; + beforeAll(() => { + if (fs.existsSync(coinDbPath)) fs.unlinkSync(coinDbPath); + if (fs.existsSync(serviceDbPath)) fs.unlinkSync(serviceDbPath); + + db = new Database(coinDbPath); + servicedb = new Database(serviceDbPath); + + servicedb.exec( + `CREATE TABLE IF NOT EXISTS coin (coin_id BLOB, parentCoinInfo BLOB, puzzleHash BLOB, amount TEXT, status TEXT, walletId TEXT, height INTEGER)`, + ); + + servicedb.exec( + `CREATE TABLE IF NOT EXISTS wallet ( + address TEXT PRIMARY KEY, + namespace TEXT DEFAULT 'default', + synced_to_height INTEGER, + synced_to_hash TEXT + )`, + ); + }); + + beforeEach(async () => { + coinIndexer = new CoinIndexer(); + }); + + afterEach(async () => { + await coinIndexer.stop(); + + servicedb.exec('DELETE FROM coin'); + + db.exec('DELETE FROM coin'); + }); + + function insertWallet( + walletId: string, + synced_to_height: number = 0, + synced_to_hash: string = '', + ) { + db.prepare( + 'INSERT INTO wallet (address, namespace, synced_to_height, synced_to_hash) VALUES (?, ?, ?, ?)', + ).run(walletId, 'default', synced_to_height, synced_to_hash); + } + + function insertCoin({ + coin_id, + parentCoinInfo, + puzzleHash, + amount, + status, + walletId, + height, + }: any) { + servicedb + .prepare( + 'INSERT INTO coin (coin_id, parentCoinInfo, puzzleHash, amount, status, walletId, height) VALUES (?, ?, ?, ?, ?, ?, ?)', + ) + .run(coin_id, parentCoinInfo, puzzleHash, amount.toString(), status, walletId, height); + } + + it('should ingest unspent coins and mark spent coins correctly', async () => { + // Start CoinIndexer + await coinIndexer.start(BlockChainType.Test, coinDbPath); + + // Simulate blockchain coins + const walletId = 'aabbcc'; + insertWallet(walletId, 0, ''); + const coin1 = Buffer.from('01', 'hex'); + const coin2 = Buffer.from('02', 'hex'); + insertCoin({ + coin_id: coin1, + parentCoinInfo: Buffer.from('11', 'hex'), + puzzleHash: coin1, + amount: 100n, + status: CoinStatus.UNSPENT, + walletId, + height: 1, + }); + insertCoin({ + coin_id: coin2, + parentCoinInfo: Buffer.from('33', 'hex'), + puzzleHash: coin2, + amount: 200n, + status: CoinStatus.UNSPENT, + walletId, + height: 2, + }); + + // Wait for sync + await new Promise((res) => setTimeout(res, 1200)); + + const coins = db.prepare('SELECT * FROM coin WHERE walletId = ?').all(walletId); + expect(coins.length).toBe(2); + expect(coins.some((c: any) => Buffer.compare(c.coinId, coin1) === 0)).toBe(true); + expect(coins.some((c: any) => Buffer.compare(c.coinId, coin2) === 0)).toBe(true); + + // Remove coin1 from blockchain (simulate spend) + servicedb.prepare('DELETE FROM coin WHERE coin_id = ?').run(coin1); + await new Promise((res) => setTimeout(res, 1200)); + + // Now coin1 should be marked as spent in CoinIndexer DB + const spent = db + .prepare('SELECT * FROM coin WHERE walletId = ? AND status = ?') + .all(walletId, CoinStatus.SPENT); + expect(spent.length).toBe(1); + expect(Buffer.compare((spent[0] as any).coinId, coin1)).toBe(0); + }, 10000); +}); diff --git a/test/application/workers/CoinIndexer/CoinIndexer.test.ts b/test/application/workers/CoinIndexer/CoinIndexer.test.ts new file mode 100644 index 0000000..08278bd --- /dev/null +++ b/test/application/workers/CoinIndexer/CoinIndexer.test.ts @@ -0,0 +1,143 @@ +import { CoinIndexer } from '../../../../src/application/workers/CoinIndexer/CoinIndexer'; +import { BlockChainType } from '../../../../src/application/types/BlockChain'; +import fs from 'fs'; +import path from 'path'; +import { CoinStatus } from '../../../../src/application/types/CoinStatus'; + +// Mock the worker and DB for async start +jest.mock('threads', () => { + const actual = jest.requireActual('threads'); + return { + ...actual, + spawn: jest.fn(async () => ({ + start: jest.fn(), + stop: jest.fn(), + onCoinStateUpdated: jest.fn(() => ({ + subscribe: jest.fn() + })), + terminate: jest.fn() + })), + Worker: jest.fn(), + Thread: { terminate: jest.fn() } + }; +}); + +jest.mock('better-sqlite3', () => { + return jest.fn().mockImplementation(() => ({ + prepare: jest.fn(() => ({ + get: jest.fn(() => ({ coinId: Buffer.from('01', 'hex'), syncedHeight: 1 })), + run: jest.fn(), + })), + exec: jest.fn(), + close: jest.fn(), + })); +}); + +describe('CoinIndexer async start', () => { + const dbPath = path.join(__dirname, 'test_coin_indexer_async.sqlite'); + let coinIndexer: CoinIndexer; + + beforeEach(() => { + coinIndexer = new CoinIndexer(); + if (fs.existsSync(dbPath)) fs.unlinkSync(dbPath); + }); + + afterEach(async () => { + await coinIndexer.stop(); + if (fs.existsSync(dbPath)) fs.unlinkSync(dbPath); + }); + + it('should start without waiting for worker sync', async () => { + // Should resolve immediately + const startPromise = coinIndexer.start(BlockChainType.Test, dbPath); + expect(startPromise).toBeInstanceOf(Promise); + await startPromise; + }); + + it('should call worker.start and handle events after CoinIndexer.start()', async () => { + const coinIndexer = new CoinIndexer(); + const workerStart = jest.fn(); + const onCoinStateUpdated = jest.fn(() => ({ + subscribe: ({ next }: any) => { + if (typeof next === 'function') { + next({ walletId: 'wallet1', coinId: Buffer.from('01', 'hex'), status: CoinStatus.UNSPENT, syncedHeight: 2 }); + } + return { unsubscribe: jest.fn() }; + } + })); + const mockWorker = { + start: workerStart, + stop: jest.fn(), + onCoinStateUpdated, + }; + const { spawn } = require('threads'); + spawn.mockImplementation(async () => mockWorker); + + const listener = jest.fn(); + coinIndexer.onCoinStateUpdated(listener); + await coinIndexer.start(BlockChainType.Test, dbPath); + + expect(workerStart).toHaveBeenCalled(); + expect(onCoinStateUpdated).toHaveBeenCalled(); + expect(listener).toHaveBeenCalledWith({ walletId: 'wallet1', coinId: Buffer.from('01', 'hex'), status: CoinStatus.UNSPENT, syncedHeight: 2 }); + }); + + it('should call restartWorker, worker.stop, and Thread.terminate at the correct interval', async () => { + jest.useFakeTimers(); + const coinIndexer = new CoinIndexer(); + const workerStart = jest.fn(); + const workerStop = jest.fn(); + const workerTerminate = jest.fn(); + const onCoinStateUpdated = jest.fn(() => ({ subscribe: jest.fn() })); + const mockWorker = { + start: workerStart, + stop: workerStop, + onCoinStateUpdated, + }; + const { spawn, Thread } = require('threads'); + spawn.mockImplementation(async () => mockWorker); + Thread.terminate = workerTerminate; + const restartSpy = jest.spyOn(coinIndexer as any, 'restartWorker'); + + await coinIndexer.start(BlockChainType.Test, dbPath, 1/1800); // 2 seconds for test + expect(workerStart).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(2200); + await Promise.resolve(); + + expect(restartSpy).toHaveBeenCalledTimes(1); + expect(workerStop).toHaveBeenCalledTimes(1); + expect(workerTerminate).toHaveBeenCalledTimes(1); + await coinIndexer.stop(); + jest.useRealTimers(); + }); + + it('should not set a timer or call restartWorker if no restartIntervalHours is specified', async () => { + jest.useFakeTimers(); + const coinIndexer = new CoinIndexer(); + const workerStart = jest.fn(); + const workerStop = jest.fn(); + const workerTerminate = jest.fn(); + const onCoinStateUpdated = jest.fn(() => ({ subscribe: jest.fn() })); + const mockWorker = { + start: workerStart, + stop: workerStop, + onCoinStateUpdated, + }; + const { spawn, Thread } = require('threads'); + spawn.mockImplementation(async () => mockWorker); + Thread.terminate = workerTerminate; + const restartSpy = jest.spyOn(coinIndexer as any, 'restartWorker'); + + await coinIndexer.start(BlockChainType.Test, dbPath); // no interval + expect(workerStart).toHaveBeenCalledTimes(1); + jest.advanceTimersByTime(10000); + await Promise.resolve(); + await Promise.resolve(); + expect(restartSpy).not.toHaveBeenCalled(); + expect(workerStop).not.toHaveBeenCalled(); + expect(workerTerminate).not.toHaveBeenCalled(); + await coinIndexer.stop(); + jest.useRealTimers(); + }); +}); diff --git a/test/application/workers/CoinIndexer/CoinIndexer.worker.logic.test.ts b/test/application/workers/CoinIndexer/CoinIndexer.worker.logic.test.ts new file mode 100644 index 0000000..4696a43 --- /dev/null +++ b/test/application/workers/CoinIndexer/CoinIndexer.worker.logic.test.ts @@ -0,0 +1,46 @@ +import Database from 'better-sqlite3'; +import { api as coinIndexerApi } from '../../../../src/application/workers/CoinIndexer/CoinIndexer.worker.logic'; +import { BlockChainType } from '../../../../src/application/types/BlockChain'; +import { existsSync, unlinkSync } from 'fs'; + +const dbPath = 'test_coinindexer_worker_logic.sqlite'; + +describe('CoinIndexer.worker.logic api', () => { + beforeAll(() => { + if (existsSync(dbPath)) unlinkSync(dbPath); + }); + + afterEach(() => { + try { new Database(dbPath).close(); } catch {} + }); + + it('should create the database file after start', async () => { + coinIndexerApi.__reset(); + if (existsSync(dbPath)) unlinkSync(dbPath); + await coinIndexerApi.start(BlockChainType.Test, dbPath); + expect(existsSync(dbPath)).toBe(true); + // Check table exists (should be named 'coin' not 'coins') + const db = new Database(dbPath); + const tables = db.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='coin'").get(); + expect(tables).toBeDefined(); + db.close(); + coinIndexerApi.stop(); + }); + + it('should not start twice', async () => { + coinIndexerApi.__reset(); + await coinIndexerApi.start(BlockChainType.Test, dbPath); + await coinIndexerApi.start(BlockChainType.Test, dbPath); // should not throw + coinIndexerApi.stop(); + }); + + it('should stop and reset', async () => { + coinIndexerApi.__reset(); + await coinIndexerApi.start(BlockChainType.Test, dbPath); + coinIndexerApi.stop(); + coinIndexerApi.__reset(); + // Should be able to start again + await coinIndexerApi.start(BlockChainType.Test, dbPath); + coinIndexerApi.stop(); + }); +});