Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion jobs/workers.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,87 @@
import { Worker } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
import { clearBalanceCache } from 'redis/balanceCache'
import { clearDashboardCache } from 'redis/dashboardCache'
import { clearPaymentCacheForAddress } from 'redis/paymentCache'
import { fetchAllAddresses } from 'services/addressService'
import { cleanupExpiredClientPayments } from 'services/clientPaymentService'
import { multiBlockchainClient } from 'services/chronikService'
import { connectAllTransactionsToPrices } from 'services/transactionService'
import { cleanupExpiredClientPayments } from 'services/clientPaymentService'
import { fetchAllUsers } from 'services/userService'

import * as priceService from 'services/priceService'

const ADDRESS_INVALIDATION_BATCH_SIZE = 100

interface AddressCacheInvalidationResult {
paymentFailures: number
balanceFailures: number
}

const invalidateAddressCaches = async (
address: string
): Promise<AddressCacheInvalidationResult> => {
let paymentFailures = 0
let balanceFailures = 0
try {
await clearPaymentCacheForAddress(address)
} catch {
paymentFailures = 1
}
try {
await clearBalanceCache(address)
} catch {
balanceFailures = 1
}
return { paymentFailures, balanceFailures }
}

/**
* Drop Redis payment-week, balance, and dashboard caches after bulk blockchain sync.
* Rebuild is lazy on the next API request.
*/
async function invalidateCachesAfterBlockchainSync (): Promise<void> {
console.log('[CACHE]: Invalidating caches after blockchain sync...')
const addresses = await fetchAllAddresses()
let paymentCacheFailures = 0
let balanceCacheFailures = 0
for (let i = 0; i < addresses.length; i += ADDRESS_INVALIDATION_BATCH_SIZE) {
const batch = addresses.slice(i, i + ADDRESS_INVALIDATION_BATCH_SIZE)
const results = await Promise.all(
batch.map(async (a) => await invalidateAddressCaches(a.address))
)
for (const r of results) {
paymentCacheFailures += r.paymentFailures
balanceCacheFailures += r.balanceFailures
}
}
const users = await fetchAllUsers()
let dashboardCacheFailures = 0
await Promise.all(
users.map(async (u) => {
try {
await clearDashboardCache(u.id)
} catch {
dashboardCacheFailures += 1
}
})
)
const totalFailures =
paymentCacheFailures + balanceCacheFailures + dashboardCacheFailures
if (totalFailures > 0) {
console.warn(
`[CACHE]: Cache invalidation completed with ${totalFailures} failure(s) ` +
`(payment: ${paymentCacheFailures}, balance: ${balanceCacheFailures}, ` +
`dashboard: ${dashboardCacheFailures}). DB sync already succeeded.`
)
}
console.log(
`[CACHE]: Invalidated payment/balance caches for ${addresses.length} addresses ` +
`and dashboard caches for ${users.length} users.`
)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

export const syncCurrentPricesWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
Expand Down Expand Up @@ -39,6 +114,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string, onComplet
await priceService.syncPastDaysNewerPrices()
await multiBlockchainClient.syncMissedTransactions()
await connectAllTransactionsToPrices()
await invalidateCachesAfterBlockchainSync()
},
{
connection: redisBullMQ,
Expand Down
10 changes: 7 additions & 3 deletions redis/clientInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ class RedisMocked {
}

scanStream (opt: ScanStreamOptions): ScanStream {
return new ScanStream({
const stream = new ScanStream({
...opt,
command: '',
redis: {}
command: 'scan',
redis: this as unknown as IORedis
})
process.nextTick(() => {
stream.emit('end')
})
return stream
}

pipeline (commands?: unknown[][]): any {
Expand Down
25 changes: 24 additions & 1 deletion redis/paymentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,19 @@ export const getPaymentList = async (userId: string): Promise<Payment[]> => {
}

const getCachedWeekKeysForAddress = async (addressString: string): Promise<string[]> => {
return await redis.keys(`${addressString}:payments:*`)
const pattern = `${addressString}:payments:*`
const keys: string[] = []
const stream = redis.scanStream({
match: pattern,
count: 100
})
return await new Promise<string[]>((resolve, reject) => {
stream.on('data', (batch: string[]) => {
keys.push(...batch)
})
stream.on('end', () => resolve(keys))
stream.on('error', reject)
})
}

export const getCachedWeekKeysForUser = async (userId: string): Promise<string[]> => {
Expand Down Expand Up @@ -280,6 +292,17 @@ export const clearRecentAddressCache = async (addressString: string, timestamps:
)
}

/** Remove all week-grouped payment keys for an address (forces rebuild from DB). */
export const clearPaymentCacheForAddress = async (addressString: string): Promise<void> => {
const weekKeys = await getCachedWeekKeysForAddress(addressString)
if (weekKeys.length === 0) {
return
}
await Promise.all(
weekKeys.map(async (key) => await redis.del(key, () => {}))
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

export const initPaymentCache = async (address: Address): Promise<boolean> => {
const cachedKeys = await getCachedWeekKeysForAddress(address.address)
if (cachedKeys.length === 0) {
Expand Down
Loading
Loading