Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,6 @@ export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (nu

// Enough for eCash IFP when created
export const MAX_TXS_PER_ADDRESS = 250000

// Will look for this many days before to check if there are gaps in prices
export const N_DAYS_LOOK_FOR_PRICE_GAPS = 30
8 changes: 6 additions & 2 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ export class ChronikBlockchainClient {
}
}
} else if (msg.msgType === 'TX_CONFIRMED') {
if (this.isAlreadyBeingProcessed(msg.txid, true)) return
try {
const transaction = await this.fetchTxWithRetry(msg.txid)
const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
Expand All @@ -731,6 +732,8 @@ export class ChronikBlockchainClient {
await markTransactionsOrphaned(msg.txid)
} else {
console.error(`${this.CHRONIK_MSG_PREFIX}: confirmed tx handler failed for ${msg.txid}`, e)
const { [msg.txid]: _, ...rest } = this.lastProcessedMessages.confirmed
this.lastProcessedMessages.confirmed = rest
}
}
} else if (msg.msgType === 'TX_ADDED_TO_MEMPOOL') {
Expand Down Expand Up @@ -803,8 +806,9 @@ export class ChronikBlockchainClient {
const pageSize = 200
let blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs
let blockTxsToSync: Tx[] = []
while (blockPageTxs.length > 0 && blockTxsToSync.length !== this.confirmedTxsHashesFromLastBlock.length) {
const thisBlockTxsToSync = blockPageTxs.filter(tx => this.confirmedTxsHashesFromLastBlock.includes(tx.txid))
const confirmedTxHashes = new Set(this.confirmedTxsHashesFromLastBlock)
while (blockPageTxs.length > 0 && blockTxsToSync.length < confirmedTxHashes.size) {
const thisBlockTxsToSync = blockPageTxs.filter(tx => confirmedTxHashes.has(tx.txid))
blockTxsToSync = [...blockTxsToSync, ...thisBlockTxsToSync]
page += 1
blockPageTxs = (await this.chronik.blockTxs(blockHash, page, pageSize)).txs
Expand Down
103 changes: 82 additions & 21 deletions services/priceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import axios from 'axios'
import { Prisma, Price } from '@prisma/client'
import config from 'config'
import prisma from 'prisma-local/clientInstance'
import { PRICE_API_TIMEOUT, PRICE_API_MAX_RETRIES, PRICE_API_DATE_FORMAT, RESPONSE_MESSAGES, NETWORK_TICKERS, XEC_NETWORK_ID, BCH_NETWORK_ID, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, HUMAN_READABLE_DATE_FORMAT } from 'constants/index'
import { PRICE_API_TIMEOUT, PRICE_API_MAX_RETRIES, PRICE_API_DATE_FORMAT, RESPONSE_MESSAGES, NETWORK_TICKERS, XEC_NETWORK_ID, BCH_NETWORK_ID, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, N_DAYS_LOOK_FOR_PRICE_GAPS } from 'constants/index'
import { validatePriceAPIUrlAndToken, validateNetworkTicker } from 'utils/validators'
import moment from 'moment'

Expand Down Expand Up @@ -154,44 +154,105 @@ export async function getAllPricesByNetworkTicker (
}

export async function syncPastDaysNewerPrices (): Promise<void> {
console.log('[PRICES] Syncing prices...')
const lastPrice = await prisma.price.findFirst({
orderBy: [{ timestamp: 'desc' }],
select: { timestamp: true }
console.log(`[PRICES] Syncing missing prices, including gaps on the last ${N_DAYS_LOOK_FOR_PRICE_GAPS} days...`)

const today = moment.utc().startOf('day')
const windowStart = moment.utc().subtract(N_DAYS_LOOK_FOR_PRICE_GAPS, 'days').startOf('day')

const existingPrices = await prisma.price.findMany({
where: {
timestamp: {
gte: windowStart.unix(),
lte: today.unix()
},
quoteId: USD_QUOTE_ID
},
select: { timestamp: true, networkId: true }
})
if (lastPrice === null) throw new Error('No prices found, initial database seed did not complete successfully')

const lastDateInDB = moment.unix(lastPrice.timestamp)
const date = moment().startOf('day')
const daysToRetrieve: string[] = []
const xecTimestamps = new Set(
existingPrices.filter(p => p.networkId === XEC_NETWORK_ID).map(p => p.timestamp)
)
const bchTimestamps = new Set(
existingPrices.filter(p => p.networkId === BCH_NETWORK_ID).map(p => p.timestamp)
)

const expectedDays: Array<{ formatted: string, timestamp: number }> = []
const cursor = today.clone()
while (cursor.isSameOrAfter(windowStart)) {
expectedDays.push({ formatted: cursor.format(PRICE_API_DATE_FORMAT), timestamp: cursor.unix() })
cursor.subtract(1, 'day')
}

const missingXECDays = expectedDays.filter(d => !xecTimestamps.has(d.timestamp))
const missingBCHDays = expectedDays.filter(d => !bchTimestamps.has(d.timestamp))
Comment on lines +162 to +188
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Gap detection only checks USD; CAD gaps from a partial prior upsert will be missed.

upsertPricesForNetworkId upserts USD and CAD as two separate, non-transactional Prisma calls (see lines 32–68). If the USD upsert succeeds but CAD throws (network blip, timeout, etc.), the row set ends up with USD-only for that day. This new gap-filler queries existing rows with quoteId: USD_QUOTE_ID only, so any USD-without-CAD day will be considered "already present" and will never be backfilled.

Two reasonable options:

  1. Wrap the USD+CAD upserts in prisma.$transaction(...) so they always succeed/fail together (preferred — also fixes the underlying invariant).
  2. Detect gaps based on (timestamp, networkId) having both quotes present, e.g. group by timestamp+networkId and require count = N_OF_QUOTES.
🛡️ Option 1: make the dual upsert atomic (root-cause fix)
 export async function upsertPricesForNetworkId (responseData: IResponseData, networkId: number, timestamp: number): Promise<void> {
   try {
-    await prisma.price.upsert({
-      where: { ... USD ... },
-      ...
-    })
-
-    await prisma.price.upsert({
-      where: { ... CAD ... },
-      ...
-    })
+    await prisma.$transaction([
+      prisma.price.upsert({
+        where: {
+          Price_timestamp_quoteId_networkId_unique_constraint: { quoteId: USD_QUOTE_ID, networkId, timestamp }
+        },
+        create: { quoteId: USD_QUOTE_ID, networkId, timestamp, value: new Prisma.Decimal(responseData.Price_in_USD) },
+        update: { value: new Prisma.Decimal(responseData.Price_in_USD) }
+      }),
+      prisma.price.upsert({
+        where: {
+          Price_timestamp_quoteId_networkId_unique_constraint: { quoteId: CAD_QUOTE_ID, networkId, timestamp }
+        },
+        create: { quoteId: CAD_QUOTE_ID, networkId, timestamp, value: new Prisma.Decimal(responseData.Price_in_CAD) },
+        update: { value: new Prisma.Decimal(responseData.Price_in_CAD) }
+      })
+    ])
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/priceService.ts` around lines 162 - 188, The gap detection is only
querying rows with quoteId: USD_QUOTE_ID so days where USD upsert succeeded but
CAD failed will be treated as present; fix by making the dual-quote upsert
atomic in upsertPricesForNetworkId: wrap the two Prisma upserts (USD and CAD)
inside a single prisma.$transaction(...) so both succeed or both rollback (use
CAD_QUOTE_ID and USD_QUOTE_ID inside the transaction), and keep the
gap-detection logic (existingPrices / xecTimestamps / bchTimestamps) unchanged;
alternatively, if you prefer detection-side fix, change the query that builds
existingPrices to group by timestamp+networkId and require count === N_OF_QUOTES
so only fully-populated (both quotes) days are considered present.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bot is correct on that one. You could run the same prisma query for both quote ids and intersect them for finding the gaps, so you will override any price with a missing quote as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fabcien, great suggestion — querying both quote IDs and intersecting gives you the tightest gap definition: a day is only "complete" if both USD and CAD rows exist. Here is a concrete implementation:

// Fetch rows for both quote IDs in a single query
const existingPrices = await prisma.price.findMany({
  where: {
    timestamp: {
      gte: windowStart.unix(),
      lte: today.unix()
    },
    quoteId: { in: [USD_QUOTE_ID, CAD_QUOTE_ID] }   // ← both quotes
  },
  select: { timestamp: true, networkId: true, quoteId: true }
})

// Count distinct quotes per (networkId, timestamp)
const xecQuoteCounts = new Map<number, number>()
const bchQuoteCounts = new Map<number, number>()

for (const p of existingPrices) {
  if (p.networkId === XEC_NETWORK_ID) {
    xecQuoteCounts.set(p.timestamp, (xecQuoteCounts.get(p.timestamp) ?? 0) + 1)
  } else if (p.networkId === BCH_NETWORK_ID) {
    bchQuoteCounts.set(p.timestamp, (bchQuoteCounts.get(p.timestamp) ?? 0) + 1)
  }
}

// A day is only "present" when all N_OF_QUOTES quotes exist
const xecTimestamps = new Set(
  [...xecQuoteCounts.entries()]
    .filter(([, count]) => count >= N_OF_QUOTES)
    .map(([ts]) => ts)
)
const bchTimestamps = new Set(
  [...bchQuoteCounts.entries()]
    .filter(([, count]) => count >= N_OF_QUOTES)
    .map(([ts]) => ts)
)

The rest of the function (missingXECDays, missingBCHDays, fetch + upsert logic) stays unchanged. Any day where one quote succeeded and the other failed will have a count of 1, fall below N_OF_QUOTES, and be treated as a gap — triggering a full re-upsert of both quotes.


console.log(`[PRICES] Last price found is for ${lastDateInDB.format(HUMAN_READABLE_DATE_FORMAT)}.`)
while (date.isAfter(lastDateInDB)) {
daysToRetrieve.push(date.format(PRICE_API_DATE_FORMAT))
date.add(-1, 'day')
const totalMissing = missingXECDays.length + missingBCHDays.length
if (totalMissing === 0) {
console.log(`[PRICES] No missing prices found in the last ${N_DAYS_LOOK_FOR_PRICE_GAPS} days.`)
return
}
console.log(`[PRICES] Will try to retrieve ${daysToRetrieve.length} prices.`)

const allXECPrices = await getAllPricesByNetworkTicker(NETWORK_TICKERS.ecash, false)
const allBCHPrices = await getAllPricesByNetworkTicker(NETWORK_TICKERS.bitcoincash, false)
console.log(`[PRICES] Found ${missingXECDays.length} missing XEC days and ${missingBCHDays.length} missing BCH days. Fetching from API...`)

const failedDays: string[] = []

const allXECPrices = missingXECDays.length > 0 ? await getAllPricesByNetworkTicker(NETWORK_TICKERS.ecash, false) : null
const allBCHPrices = missingBCHDays.length > 0 ? await getAllPricesByNetworkTicker(NETWORK_TICKERS.bitcoincash, false) : null

const xecBulkDays = new Set(allXECPrices?.map(p => p.day) ?? [])
const bchBulkDays = new Set(allBCHPrices?.map(p => p.day) ?? [])

if (allXECPrices !== null) {
const missingDaySet = new Set(missingXECDays.map(d => d.formatted))
await Promise.all(
allXECPrices
.filter(p => daysToRetrieve.includes(p.day))
.map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment(price.day).unix()))
.filter(p => missingDaySet.has(p.day))
.map(async price => await upsertPricesForNetworkId(price, XEC_NETWORK_ID, moment.utc(price.day).unix()))
)
}

if (allBCHPrices !== null) {
const missingDaySet = new Set(missingBCHDays.map(d => d.formatted))
await Promise.all(
allBCHPrices
.filter(p => daysToRetrieve.includes(p.day))
.map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment(price.day).unix()))
.filter(p => missingDaySet.has(p.day))
.map(async price => await upsertPricesForNetworkId(price, BCH_NETWORK_ID, moment.utc(price.day).unix()))
)
}

const xecStillMissing = missingXECDays.filter(d => !xecBulkDays.has(d.formatted))
const bchStillMissing = missingBCHDays.filter(d => !bchBulkDays.has(d.formatted))

for (const day of xecStillMissing) {
const price = await withRetries(
async () => await getPriceForDayAndNetworkTicker(moment.utc(day.formatted), NETWORK_TICKERS.ecash),
{ throwOnFailure: false, context: { day: day.formatted, network: 'XEC' } }
)
if (price !== null) {
await upsertPricesForNetworkId(price, XEC_NETWORK_ID, day.timestamp)
} else {
failedDays.push(`XEC ${day.formatted}`)
}
}

console.log('[PRICES] All past prices have been synced.')
for (const day of bchStillMissing) {
const price = await withRetries(
async () => await getPriceForDayAndNetworkTicker(moment.utc(day.formatted), NETWORK_TICKERS.bitcoincash),
{ throwOnFailure: false, context: { day: day.formatted, network: 'BCH' } }
)
if (price !== null) {
await upsertPricesForNetworkId(price, BCH_NETWORK_ID, day.timestamp)
} else {
failedDays.push(`BCH ${day.formatted}`)
}
}

if (failedDays.length > 0) {
console.warn(`[PRICES] Could not fetch prices for: ${failedDays.join(', ')}`)
} else {
console.log('[PRICES] All missing prices have been synced.')
}
}

export async function syncCurrentPrices (): Promise<void> {
Expand Down
Loading