diff --git a/jobs/workers.ts b/jobs/workers.ts index 99899820..25ec1a71 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,12 +1,87 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { clearBalanceCache } from 'redis/balanceCache' +import { clearDashboardCache } from 'redis/dashboardCache' +import { clearPaymentCacheForAddress } from 'redis/paymentCache' +import { fetchAllAddresses } from 'services/addressService' +import { cleanupExpiredClientPayments } from 'services/clientPaymentService' import { multiBlockchainClient } from 'services/chronikService' import { connectAllTransactionsToPrices } from 'services/transactionService' -import { cleanupExpiredClientPayments } from 'services/clientPaymentService' +import { fetchAllUsers } from 'services/userService' import * as priceService from 'services/priceService' +const ADDRESS_INVALIDATION_BATCH_SIZE = 100 + +interface AddressCacheInvalidationResult { + paymentFailures: number + balanceFailures: number +} + +const invalidateAddressCaches = async ( + address: string +): Promise => { + let paymentFailures = 0 + let balanceFailures = 0 + try { + await clearPaymentCacheForAddress(address) + } catch { + paymentFailures = 1 + } + try { + await clearBalanceCache(address) + } catch { + balanceFailures = 1 + } + return { paymentFailures, balanceFailures } +} + +/** + * Drop Redis payment-week, balance, and dashboard caches after bulk blockchain sync. + * Rebuild is lazy on the next API request. + */ +async function invalidateCachesAfterBlockchainSync (): Promise { + console.log('[CACHE]: Invalidating caches after blockchain sync...') + const addresses = await fetchAllAddresses() + let paymentCacheFailures = 0 + let balanceCacheFailures = 0 + for (let i = 0; i < addresses.length; i += ADDRESS_INVALIDATION_BATCH_SIZE) { + const batch = addresses.slice(i, i + ADDRESS_INVALIDATION_BATCH_SIZE) + const results = await Promise.all( + batch.map(async (a) => await invalidateAddressCaches(a.address)) + ) + for (const r of results) { + paymentCacheFailures += r.paymentFailures + balanceCacheFailures += r.balanceFailures + } + } + const users = await fetchAllUsers() + let dashboardCacheFailures = 0 + await Promise.all( + users.map(async (u) => { + try { + await clearDashboardCache(u.id) + } catch { + dashboardCacheFailures += 1 + } + }) + ) + const totalFailures = + paymentCacheFailures + balanceCacheFailures + dashboardCacheFailures + if (totalFailures > 0) { + console.warn( + `[CACHE]: Cache invalidation completed with ${totalFailures} failure(s) ` + + `(payment: ${paymentCacheFailures}, balance: ${balanceCacheFailures}, ` + + `dashboard: ${dashboardCacheFailures}). DB sync already succeeded.` + ) + } + console.log( + `[CACHE]: Invalidated payment/balance caches for ${addresses.length} addresses ` + + `and dashboard caches for ${users.length} users.` + ) +} + export const syncCurrentPricesWorker = async (queueName: string): Promise => { const worker = new Worker( queueName, @@ -39,6 +114,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string, onComplet await priceService.syncPastDaysNewerPrices() await multiBlockchainClient.syncMissedTransactions() await connectAllTransactionsToPrices() + await invalidateCachesAfterBlockchainSync() }, { connection: redisBullMQ, diff --git a/redis/clientInstance.ts b/redis/clientInstance.ts index 9f472a31..148f11e8 100755 --- a/redis/clientInstance.ts +++ b/redis/clientInstance.ts @@ -18,11 +18,15 @@ class RedisMocked { } scanStream (opt: ScanStreamOptions): ScanStream { - return new ScanStream({ + const stream = new ScanStream({ ...opt, - command: '', - redis: {} + command: 'scan', + redis: this as unknown as IORedis }) + process.nextTick(() => { + stream.emit('end') + }) + return stream } pipeline (commands?: unknown[][]): any { diff --git a/redis/paymentCache.ts b/redis/paymentCache.ts index ac6de862..ad18d9f6 100755 --- a/redis/paymentCache.ts +++ b/redis/paymentCache.ts @@ -39,7 +39,19 @@ export const getPaymentList = async (userId: string): Promise => { } const getCachedWeekKeysForAddress = async (addressString: string): Promise => { - return await redis.keys(`${addressString}:payments:*`) + const pattern = `${addressString}:payments:*` + const keys: string[] = [] + const stream = redis.scanStream({ + match: pattern, + count: 100 + }) + return await new Promise((resolve, reject) => { + stream.on('data', (batch: string[]) => { + keys.push(...batch) + }) + stream.on('end', () => resolve(keys)) + stream.on('error', reject) + }) } export const getCachedWeekKeysForUser = async (userId: string): Promise => { @@ -280,6 +292,17 @@ export const clearRecentAddressCache = async (addressString: string, timestamps: ) } +/** Remove all week-grouped payment keys for an address (forces rebuild from DB). */ +export const clearPaymentCacheForAddress = async (addressString: string): Promise => { + const weekKeys = await getCachedWeekKeysForAddress(addressString) + if (weekKeys.length === 0) { + return + } + await Promise.all( + weekKeys.map(async (key) => await redis.del(key, () => {})) + ) +} + export const initPaymentCache = async (address: Address): Promise => { const cachedKeys = await getCachedWeekKeysForAddress(address.address) if (cachedKeys.length === 0) { diff --git a/services/chronikService.ts b/services/chronikService.ts index b3e96938..6f50b92d 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -7,6 +7,9 @@ import prisma from 'prisma-local/clientInstance' import { TransactionWithAddressAndPrices, createManyTransactions, + createManyTransactionsForSync, + filterRowsNeedingCreateMany, + SyncPersistedTransaction, deleteTransactions, fetchUnconfirmedTransactions, markTransactionsOrphaned, @@ -112,7 +115,11 @@ export function getNullDataScriptData (outputScript: string): OpReturnData | nul } interface ChronikTxWithAddress { tx: Tx, address: Address } + +type FetchBatchPhase = 'tx-drain' | 'drain-complete' | 'addresses-synced' + interface FetchedTxsBatch { + phase: FetchBatchPhase chronikTxs: ChronikTxWithAddress[] addressesSynced: string[] } @@ -335,9 +342,62 @@ export class ChronikBlockchainClient { `(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).` ) - let chronikTxs: ChronikTxWithAddress[] = [] + const chronikTxs: ChronikTxWithAddress[] = [] const completedAddresses: string[] = [] + let producersPaused = false + const producerPauseWaiters: Array<() => void> = [] + + const waitWhilePaused = async (): Promise => { + // producersPaused is cleared by resumeProducers during drain cycles + // eslint-disable-next-line no-unmodified-loop-condition + while (producersPaused) { + await new Promise(resolve => { + producerPauseWaiters.push(resolve) + }) + } + } + + const resumeProducers = (): void => { + producersPaused = false + const waiters = producerPauseWaiters.splice(0) + for (const resolve of waiters) { + resolve() + } + } + + async function * runDrainCycle (): AsyncGenerator { + producersPaused = true + + while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) + yield { + phase: 'tx-drain', + chronikTxs: chronikTxsSlice, + addressesSynced: [] + } + } + + if (chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } + } + + yield { + phase: 'drain-complete', + chronikTxs: [], + addressesSynced: [] + } + + // Consumer processes drain-complete before this runs (for await next()) + producersPaused = false + resumeProducers() + } + // Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers const activeWorkers = new Set>() let nextAddressIndex = 0 @@ -354,6 +414,8 @@ export class ChronikBlockchainClient { try { while (!hasReachedStoppingCondition) { + await waitWhilePaused() + const pageIndex = nextBurstBasePageIndex let pageTxs: Tx[] = [] @@ -438,23 +500,29 @@ export class ChronikBlockchainClient { // Poll and yield batches while workers are active while (activeWorkers.size > 0 || chronikTxs.length > 0) { - // Yield batches if buffer is large enough. Make sure to drain until there - // are not enough transactions to fill the batch. - while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { - const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) - yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + if (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + yield * runDrainCycle() + continue } // If no active workers, yield any remaining transactions (even if < batch size) if (activeWorkers.size === 0 && chronikTxs.length > 0) { const remaining = chronikTxs.splice(0) - yield { chronikTxs: remaining, addressesSynced: [] } + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } } - // Yield completed addresses if any - if (completedAddresses.length > 0) { + // Yield completed addresses if any (not during an active drain pause) + if (completedAddresses.length > 0 && !producersPaused) { const completed = completedAddresses.splice(0) - yield { chronikTxs: [], addressesSynced: completed } + yield { + phase: 'addresses-synced', + chronikTxs: [], + addressesSynced: completed + } } // If no active workers and no more transactions, break @@ -475,10 +543,15 @@ export class ChronikBlockchainClient { } // Final TX flush after all addresses processed - if (chronikTxs.length > 0) { - const remaining = chronikTxs - chronikTxs = [] - yield { chronikTxs: remaining, addressesSynced: [] } + if (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + yield * runDrainCycle() + } else if (chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } } } @@ -782,6 +855,41 @@ export class ChronikBlockchainClient { } } + private broadcastIncomingTxFromSyncRow ( + addressString: string, + chronikTx: Tx, + createdTx: SyncPersistedTransaction, + opReturn: string + ): BroadcastTxData { + const broadcastTxData: BroadcastTxData = {} as BroadcastTxData + broadcastTxData.address = addressString + broadcastTxData.messageType = 'NewTx' + const inputAddresses = this.getSortedInputAddresses(chronikTx) + const outputAddresses = this.getSortedOutputAddresses(chronikTx) + const stubTx = { + hash: createdTx.hash, + amount: createdTx.amount, + confirmed: createdTx.confirmed, + opReturn, + timestamp: createdTx.timestamp, + address: { address: addressString }, + prices: [], + inputs: [] + } as unknown as TransactionWithAddressAndPrices + const newSimplifiedTransaction = getSimplifiedTrasaction( + stubTx, + inputAddresses, + outputAddresses + ) + broadcastTxData.txs = [newSimplifiedTransaction] + try { + this.wsEndpoint.emit(SOCKET_MESSAGES.TXS_BROADCAST, broadcastTxData) + } catch (err: any) { + console.error(RESPONSE_MESSAGES.COULD_NOT_BROADCAST_TX_TO_WS_SERVER_500.message, err.stack) + } + return broadcastTxData + } + private broadcastIncomingTx (addressString: string, chronikTx: Tx, createdTx: TransactionWithAddressAndPrices): BroadcastTxData { const broadcastTxData: BroadcastTxData = {} as BroadcastTxData broadcastTxData.address = addressString @@ -877,30 +985,41 @@ export class ChronikBlockchainClient { runTriggers: boolean ): Promise { const rows = commitTuples.map(p => p.row) - const createdTxs = await createManyTransactions(rows) - console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}/${commitTuples.length}`) + const rowsToUpsert = await filterRowsNeedingCreateMany(rows) + const syncResult = await createManyTransactionsForSync(rowsToUpsert) + console.log( + `${this.CHRONIK_MSG_PREFIX} committed — created=${syncResult.insertedCount}/` + + `${commitTuples.length}` + ) - const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId)) + const createdForProd = syncResult.inserted.filter(t => + productionAddressesIds.includes(t.addressId) + ) if (createdForProd.length > 0) { - await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[]) + await appendTxsToFile(createdForProd) } - if (createdTxs.length > 0) { + if (runTriggers && syncResult.inserted.length > 0) { const triggerBatch: BroadcastTxData[] = [] - for (const createdTx of createdTxs) { - const tuple = commitTuples.find(t => t.row.hash === createdTx.hash) + for (const createdTx of syncResult.inserted) { + const tuple = commitTuples.find( + t => t.row.hash === createdTx.hash && t.row.addressId === createdTx.addressId + ) if (tuple == null) { continue } - const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx) + const opReturn = typeof tuple.row.opReturn === 'string' ? tuple.row.opReturn : '' + const bd = this.broadcastIncomingTxFromSyncRow( + tuple.addressString, + tuple.raw, + createdTx, + opReturn + ) triggerBatch.push(bd) } - if (runTriggers && triggerBatch.length > 0) { + if (triggerBatch.length > 0) { await executeTriggersBatch(triggerBatch, this.networkId) } - - // Release memory - createdTxs.length = 0 triggerBatch.length = 0 } @@ -967,12 +1086,29 @@ export class ChronikBlockchainClient { console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} addresses from chronik`) for await (const batch of this.fetchLatestTxsForAddresses(addresses)) { - if (batch.addressesSynced.length > 0) { + if (batch.phase === 'addresses-synced') { // marcador de slice => desmarca syncing await setSyncingBatch(batch.addressesSynced, false) continue } + if (batch.phase === 'drain-complete') { + if (toCommit.length > 0) { + const remainder = toCommit.splice(0) + await this.commitTransactionsBatch( + remainder, + productionAddressesIds, + runTriggers + ) + remainder.length = 0 + } + continue + } + + if (batch.chronikTxs.length === 0) { + continue + } + const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id)) try { diff --git a/services/transactionService.ts b/services/transactionService.ts index 859d7d08..7978e5b4 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -565,14 +565,106 @@ export async function connectAllTransactionsToPrices (): Promise { console.log('[PRICES] Finished connecting txs to prices.') } -export async function createManyTransactions ( +interface ExistingTxSnapshot { + confirmed: boolean + timestamp: number + orphaned: boolean +} + +const txSnapshotKey = (hash: string, addressId: string): string => + `${hash}:${addressId}` + +const rowNeedsUpsert = ( + row: Prisma.TransactionUncheckedCreateInput, + existing: ExistingTxSnapshot +): boolean => { + const confirmed = row.confirmed ?? false + const timestamp = row.timestamp + const orphaned = row.orphaned ?? false + return ( + existing.confirmed !== confirmed || + existing.timestamp !== timestamp || + existing.orphaned !== orphaned + ) +} + +/** Minimal row returned from bulk sync persist (no prices / paybuttons / cache). */ +export interface SyncPersistedTransaction { + id: string + hash: string + addressId: string + amount: Prisma.Decimal + timestamp: number + confirmed: boolean +} + +export interface CreateManyTransactionsSyncResult { + insertedCount: number + inserted: SyncPersistedTransaction[] +} + +interface PersistManyTransactionRowsResult { + inserted: SyncPersistedTransaction[] + updated: SyncPersistedTransaction[] + updatedCount: number +} + +const syncPersistedTxSelect = { + id: true, + hash: true, + addressId: true, + amount: true, + timestamp: true, + confirmed: true +} as const + +/** + * Cheap dedupe before createManyTransactions: returns only new rows or rows + * whose confirmed, timestamp, or orphaned may have changed. + */ +export async function filterRowsNeedingCreateMany ( transactionsData: Prisma.TransactionUncheckedCreateInput[] -): Promise { +): Promise { if (transactionsData.length === 0) { return [] } - // Extract flat transaction data and separate inputs/outputs + const existingTxs = await prisma.transaction.findMany({ + where: { + OR: transactionsData.map(tx => ({ + hash: tx.hash, + addressId: tx.addressId + })) + }, + select: { + hash: true, + addressId: true, + confirmed: true, + timestamp: true, + orphaned: true + } + }) + + const existingMap = new Map() + for (const tx of existingTxs) { + existingMap.set(txSnapshotKey(tx.hash, tx.addressId), tx) + } + + return transactionsData.filter(row => { + const existing = existingMap.get(txSnapshotKey(row.hash, row.addressId)) + if (existing == null) { + return true + } + return rowNeedsUpsert(row, existing) + }) +} + +/** + * Insert or update transactions and inputs only (no prices, cache, or heavy includes). + */ +async function persistManyTransactionRows ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { const flatTxData = transactionsData.map(tx => ({ hash: tx.hash, amount: tx.amount, @@ -593,17 +685,17 @@ export async function createManyTransactions ( } }) - const insertedTransactions: TransactionWithNetwork[] = [] - const updatedTransactions: TransactionWithNetwork[] = [] + const inserted: SyncPersistedTransaction[] = [] + const updated: SyncPersistedTransaction[] = [] + let updatedCount = 0 await prisma.$transaction( - async (prisma) => { - // 1. Query existing transactions in one go - const existingTxs = await prisma.transaction.findMany({ + async (tx) => { + const existingTxs = await tx.transaction.findMany({ where: { - OR: flatTxData.map(tx => ({ - hash: tx.hash, - addressId: tx.addressId + OR: flatTxData.map(row => ({ + hash: row.hash, + addressId: row.addressId })) }, select: { @@ -616,13 +708,11 @@ export async function createManyTransactions ( } }) - // Create a map for quick lookup const existingMap = new Map() - for (const tx of existingTxs) { - existingMap.set(`${tx.hash}:${tx.addressId}`, tx) + for (const row of existingTxs) { + existingMap.set(`${row.hash}:${row.addressId}`, row) } - // 2. Split into new and existing transactions const newTxs: typeof flatTxData = [] const newTxsInputs: typeof txInputs = [] const toUpdate: Array<{ @@ -633,119 +723,109 @@ export async function createManyTransactions ( }> = [] for (let i = 0; i < flatTxData.length; i++) { - const tx = flatTxData[i] - const key = `${tx.hash}:${tx.addressId}` + const row = flatTxData[i] + const key = `${row.hash}:${row.addressId}` const existing = existingMap.get(key) if (existing != null) { - // Check if confirmed, timestamp, or orphaned changed. These are the - // only fields that can be changed after the transaction is created. - // This is to avoid updating the transaction with the same data. - const confirmedChanged = existing.confirmed !== tx.confirmed - const timestampChanged = existing.timestamp !== tx.timestamp - const orphanedChanged = existing.orphaned !== tx.orphaned + const confirmedChanged = existing.confirmed !== row.confirmed + const timestampChanged = existing.timestamp !== row.timestamp + const orphanedChanged = existing.orphaned !== row.orphaned if (confirmedChanged || timestampChanged || orphanedChanged) { toUpdate.push({ id: existing.id, - confirmed: tx.confirmed, - timestamp: tx.timestamp, - orphaned: tx.orphaned + confirmed: row.confirmed, + timestamp: row.timestamp, + orphaned: row.orphaned }) } } else { - newTxs.push(tx) + newTxs.push(row) newTxsInputs.push(txInputs[i]) } } - // 3. Create new transactions using createMany (bulk operation) if (newTxs.length > 0) { - // Create all transactions at once (without inputs/outputs) - await prisma.transaction.createMany({ + await tx.transaction.createMany({ data: newTxs, skipDuplicates: true }) - // Query back the created transactions to get their IDs - const createdTxs = await prisma.transaction.findMany({ + const createdTxs = await tx.transaction.findMany({ where: { - OR: newTxs.map(tx => ({ - hash: tx.hash, - addressId: tx.addressId + OR: newTxs.map(row => ({ + hash: row.hash, + addressId: row.addressId })) }, - select: { - id: true, - hash: true, - addressId: true, - address: { - select: { - networkId: true - } - } - } + select: syncPersistedTxSelect }) - // Create a map to match transactions with their inputs const txMap = new Map() for (let i = 0; i < newTxs.length; i++) { - const tx = newTxs[i] - const created = createdTxs.find(ct => ct.hash === tx.hash && ct.addressId === tx.addressId) + const row = newTxs[i] + const created = createdTxs.find( + ct => ct.hash === row.hash && ct.addressId === row.addressId + ) if (created != null) { - txMap.set(`${tx.hash}:${tx.addressId}`, { - tx: created as any, + txMap.set(`${row.hash}:${row.addressId}`, { + tx: created, inputs: newTxsInputs[i].inputs }) } } - // Create all inputs at once - const allInputs: Array<{ transactionId: string, address: string, index: number, amount: Prisma.Decimal }> = [] - for (const [, { tx, inputs }] of txMap) { + const allInputs: Array<{ + transactionId: string + address: string + index: number + amount: Prisma.Decimal + }> = [] + for (const [, { tx: createdTx, inputs }] of txMap) { for (const input of inputs) { allInputs.push({ - transactionId: tx.id, + transactionId: createdTx.id, address: input.address, index: input.index, - amount: input.amount instanceof Prisma.Decimal ? input.amount : new Prisma.Decimal(input.amount as string | number) + amount: input.amount instanceof Prisma.Decimal + ? input.amount + : new Prisma.Decimal(input.amount as string | number) }) } } if (allInputs.length > 0) { - await prisma.transactionInput.createMany({ + await tx.transactionInput.createMany({ data: allInputs, skipDuplicates: true }) } - // Fetch the full transactions with includes for return value - const fullTxs = await prisma.transaction.findMany({ - where: { - id: { in: createdTxs.map(t => t.id) } - }, - include: includeNetwork - }) - - insertedTransactions.push(...fullTxs) + for (const createdTx of createdTxs) { + inserted.push(createdTx) + } } - // 4. Update existing transactions that changed if (toUpdate.length > 0) { - const updatePromises = toUpdate.map(async update => - await prisma.transaction.update({ - where: { id: update.id }, - data: { - confirmed: update.confirmed, - timestamp: update.timestamp, - orphaned: update.orphaned - }, - include: includeNetwork - }) + await Promise.all( + toUpdate.map(async update => + await tx.transaction.update({ + where: { id: update.id }, + data: { + confirmed: update.confirmed, + timestamp: update.timestamp, + orphaned: update.orphaned + } + }) + ) ) - const updated = await Promise.all(updatePromises) - updatedTransactions.push(...updated) + const updatedTxs = await tx.transaction.findMany({ + where: { id: { in: toUpdate.map(u => u.id) } }, + select: syncPersistedTxSelect + }) + updated.push(...updatedTxs) + updatedCount = updatedTxs.length } }, { @@ -753,9 +833,55 @@ export async function createManyTransactions ( } ) - // 5. Connect prices only for newly created transactions - await connectTransactionsListToPrices(insertedTransactions) - const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList(insertedTransactions.map((tx) => tx.id)) + return { inserted, updated, updatedCount } +} + +/** + * Bulk sync path: persist txs + inputs only. Prices, Redis cache, and paybutton + * graphs are deferred to connectAllTransactionsToPrices after the sync job. + */ +export async function createManyTransactionsForSync ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { + if (transactionsData.length === 0) { + return { insertedCount: 0, inserted: [] } + } + + const { inserted } = await persistManyTransactionRows(transactionsData) + return { + insertedCount: inserted.length, + inserted + } +} + +export async function createManyTransactions ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { + if (transactionsData.length === 0) { + return [] + } + + const { inserted, updated } = await persistManyTransactionRows(transactionsData) + const persistedIds = [ + ...inserted.map(t => t.id), + ...updated.map(t => t.id) + ] + + if (persistedIds.length === 0) { + return [] + } + + const persistedTransactions = await prisma.transaction.findMany({ + where: { + id: { in: persistedIds } + }, + include: includeNetwork + }) + + await connectTransactionsListToPrices(persistedTransactions) + const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList( + persistedIds + ) void CacheSet.txsCreation(txsWithPaybuttonsAndPrices) diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index a8d9440f..b6e989bf 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -61,6 +61,11 @@ jest.mock('../../services/addressService', () => ({ jest.mock('../../services/transactionService', () => ({ createManyTransactions: jest.fn(), + createManyTransactionsForSync: jest.fn(async () => ({ + insertedCount: 0, + inserted: [] + })), + filterRowsNeedingCreateMany: jest.fn(async (rows: unknown[]) => rows), deleteTransactions: jest.fn(), fetchUnconfirmedTransactions: jest.fn(), upsertTransaction: jest.fn(),