diff --git a/packages/api/schema.graphql b/packages/api/schema.graphql index 959e90281..8e381c0e9 100644 --- a/packages/api/schema.graphql +++ b/packages/api/schema.graphql @@ -60,6 +60,7 @@ type Query { epochs(marketId: Int): [EpochType!]! indexCandles(address: String!, chainId: Int!, epochId: String!, from: Int!, interval: Int!, to: Int!): [CandleType!]! indexPriceAtTime(address: String!, chainId: Int!, epochId: String!, timestamp: Int!): CandleType + legacyMarketCandles(address: String!, chainId: Int!, epochId: String!, from: Int!, interval: Int!, to: Int!): [CandleType!]! market(address: String!, chainId: Int!): MarketType marketCandles(address: String!, chainId: Int!, epochId: String!, from: Int!, interval: Int!, to: Int!): [CandleType!]! markets: [MarketType!]! diff --git a/packages/api/src/graphql/resolvers/CandleResolver.ts b/packages/api/src/graphql/resolvers/CandleResolver.ts index cb25513a7..d5b5fc037 100644 --- a/packages/api/src/graphql/resolvers/CandleResolver.ts +++ b/packages/api/src/graphql/resolvers/CandleResolver.ts @@ -575,6 +575,40 @@ export class CandleResolver { @Arg('from', () => Int) from: number, @Arg('to', () => Int) to: number, @Arg('interval', () => Int) interval: number + ): Promise { + const resourcePerformanceManager = ResourcePerformanceManager.getInstance(); + const resourcePerformance = + resourcePerformanceManager.getResourcePerformanceFromChainAndAddress( + chainId, + address + ); + + if (!resourcePerformance) { + throw new Error( + `Resource performance not initialized for ${chainId}-${address}` + ); + } + + const prices = await resourcePerformance.getMarketPrices( + from, + to, + interval, + chainId, + address, + epochId + ); + + return prices; + } + + @Query(() => [CandleType]) + async legacyMarketCandles( + @Arg('chainId', () => Int) chainId: number, + @Arg('address', () => String) address: string, + @Arg('epochId', () => String) epochId: string, + @Arg('from', () => Int) from: number, + @Arg('to', () => Int) to: number, + @Arg('interval', () => Int) interval: number ): Promise { try { const market = await dataSource.getRepository(Market).findOne({ diff --git a/packages/api/src/migrations/1741722842647-migration.ts b/packages/api/src/migrations/1741722842647-migration.ts index b350430ae..2d3581ab7 100644 --- a/packages/api/src/migrations/1741722842647-migration.ts +++ b/packages/api/src/migrations/1741722842647-migration.ts @@ -1,38 +1,89 @@ -import { MigrationInterface, QueryRunner } from "typeorm"; +import { MigrationInterface, QueryRunner } from 'typeorm'; export class Migration1741722842647 implements MigrationInterface { - name = 'Migration1741722842647' + name = 'Migration1741722842647'; - public async up(queryRunner: QueryRunner): Promise { - await queryRunner.query(`CREATE INDEX "IDX_02755ce1b56a981eef76c0b59b" ON "epoch" ("marketId") `); - await queryRunner.query(`CREATE INDEX "IDX_f89ec06faf22da268399ae6a9b" ON "epoch" ("epochId") `); - await queryRunner.query(`CREATE INDEX "IDX_187fa56af532560ce204719ea3" ON "resource_price" ("resourceId") `); - await queryRunner.query(`CREATE INDEX "IDX_5bbe200849d138539d19b7caa6" ON "resource_price" ("blockNumber") `); - await queryRunner.query(`CREATE INDEX "IDX_a369700ab879af9ef6061c6dbe" ON "resource_price" ("timestamp") `); - await queryRunner.query(`CREATE INDEX "IDX_82453de75cd894e19c42844e70" ON "resource" ("slug") `); - await queryRunner.query(`CREATE INDEX "IDX_58232d6050e212b4a0f7eb02da" ON "market" ("address") `); - await queryRunner.query(`CREATE INDEX "IDX_33f985ce349688238dfeb8560e" ON "market" ("chainId") `); - await queryRunner.query(`CREATE INDEX "IDX_5430e2d7fe1df2bcada2c12deb" ON "event" ("blockNumber") `); - await queryRunner.query(`CREATE INDEX "IDX_2c15918ff289396205521c5f3c" ON "event" ("timestamp") `); - await queryRunner.query(`CREATE INDEX "IDX_a9346cdd1ea1e53a6b87e409ad" ON "market_price" ("timestamp") `); - await queryRunner.query(`CREATE INDEX "IDX_1ebf6f07652ca11d9f4618b64a" ON "collateral_transfer" ("transactionHash") `); - await queryRunner.query(`CREATE INDEX "IDX_927edd2b828777f0052366195e" ON "position" ("positionId") `); - } - - public async down(queryRunner: QueryRunner): Promise { - await queryRunner.query(`DROP INDEX "public"."IDX_927edd2b828777f0052366195e"`); - await queryRunner.query(`DROP INDEX "public"."IDX_1ebf6f07652ca11d9f4618b64a"`); - await queryRunner.query(`DROP INDEX "public"."IDX_a9346cdd1ea1e53a6b87e409ad"`); - await queryRunner.query(`DROP INDEX "public"."IDX_2c15918ff289396205521c5f3c"`); - await queryRunner.query(`DROP INDEX "public"."IDX_5430e2d7fe1df2bcada2c12deb"`); - await queryRunner.query(`DROP INDEX "public"."IDX_33f985ce349688238dfeb8560e"`); - await queryRunner.query(`DROP INDEX "public"."IDX_58232d6050e212b4a0f7eb02da"`); - await queryRunner.query(`DROP INDEX "public"."IDX_82453de75cd894e19c42844e70"`); - await queryRunner.query(`DROP INDEX "public"."IDX_a369700ab879af9ef6061c6dbe"`); - await queryRunner.query(`DROP INDEX "public"."IDX_5bbe200849d138539d19b7caa6"`); - await queryRunner.query(`DROP INDEX "public"."IDX_187fa56af532560ce204719ea3"`); - await queryRunner.query(`DROP INDEX "public"."IDX_f89ec06faf22da268399ae6a9b"`); - await queryRunner.query(`DROP INDEX "public"."IDX_02755ce1b56a981eef76c0b59b"`); - } + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX "IDX_02755ce1b56a981eef76c0b59b" ON "epoch" ("marketId") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_f89ec06faf22da268399ae6a9b" ON "epoch" ("epochId") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_187fa56af532560ce204719ea3" ON "resource_price" ("resourceId") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_5bbe200849d138539d19b7caa6" ON "resource_price" ("blockNumber") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_a369700ab879af9ef6061c6dbe" ON "resource_price" ("timestamp") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_82453de75cd894e19c42844e70" ON "resource" ("slug") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_58232d6050e212b4a0f7eb02da" ON "market" ("address") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_33f985ce349688238dfeb8560e" ON "market" ("chainId") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_5430e2d7fe1df2bcada2c12deb" ON "event" ("blockNumber") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_2c15918ff289396205521c5f3c" ON "event" ("timestamp") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_a9346cdd1ea1e53a6b87e409ad" ON "market_price" ("timestamp") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_1ebf6f07652ca11d9f4618b64a" ON "collateral_transfer" ("transactionHash") ` + ); + await queryRunner.query( + `CREATE INDEX "IDX_927edd2b828777f0052366195e" ON "position" ("positionId") ` + ); + } + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `DROP INDEX "public"."IDX_927edd2b828777f0052366195e"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_1ebf6f07652ca11d9f4618b64a"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_a9346cdd1ea1e53a6b87e409ad"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_2c15918ff289396205521c5f3c"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_5430e2d7fe1df2bcada2c12deb"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_33f985ce349688238dfeb8560e"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_58232d6050e212b4a0f7eb02da"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_82453de75cd894e19c42844e70"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_a369700ab879af9ef6061c6dbe"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_5bbe200849d138539d19b7caa6"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_187fa56af532560ce204719ea3"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_f89ec06faf22da268399ae6a9b"` + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_02755ce1b56a981eef76c0b59b"` + ); + } } diff --git a/packages/api/src/performance/helper.ts b/packages/api/src/performance/helper.ts index af7e0f176..a04b3f18b 100644 --- a/packages/api/src/performance/helper.ts +++ b/packages/api/src/performance/helper.ts @@ -2,9 +2,12 @@ import { IntervalStore } from './types'; import * as fs from 'fs'; import * as path from 'path'; +const FILE_VERSION = 1; + export async function saveStorageToFile( storage: IntervalStore, - latestTimestamp: number, + latestResourceTimestamp: number, + latestMarketTimestamp: number, resourceSlug: string, resourceName: string, sectionName: string @@ -33,7 +36,9 @@ export async function saveStorageToFile( filename, JSON.stringify( { - latestTimestamp, + fileVersion: FILE_VERSION, + latestResourceTimestamp, + latestMarketTimestamp, store: storage, }, (key, value) => (typeof value === 'bigint' ? value.toString() : value), @@ -53,7 +58,8 @@ export async function loadStorageFromFile( sectionName: string ): Promise< | { - latestTimestamp: number; + latestResourceTimestamp: number; + latestMarketTimestamp: number; store: IntervalStore; } | undefined @@ -79,30 +85,46 @@ export async function loadStorageFromFile( return undefined; } - const fileContent = await fs.promises.readFile(filename, 'utf-8'); - const storage = JSON.parse(fileContent, (key, value) => { - // Convert string numbers that might be bigints back to bigint - if (typeof value === 'string' && /^\d+$/.test(value)) { - try { - return BigInt(value); - } catch { - return value; + try { + const fileContent = await fs.promises.readFile(filename, 'utf-8'); + const storage = JSON.parse(fileContent, (key, value) => { + // Convert string numbers that might be bigints back to bigint + if (typeof value === 'string' && /^\d+$/.test(value)) { + try { + return BigInt(value); + } catch { + return value; + } } + return value; + }) as { + fileVersion: number; + latestResourceTimestamp: number; + latestMarketTimestamp: number; + store: IntervalStore; + }; + console.timeEnd( + ` ResourcePerformance - processResourceData.${resourceName}.${sectionName}.loadStorage` + ); + console.log(` ResourcePerformance - -> Loaded storage from ${filename}`); + if (storage.fileVersion !== FILE_VERSION) { + console.log( + `!! Storage file ${filename} has an unsupported version -${storage.fileVersion}-. Expected -${FILE_VERSION}-` + ); + return undefined; } - return value; - }) as { - latestTimestamp: number; - store: IntervalStore; - }; - - console.timeEnd( - ` ResourcePerformance - processResourceData.${resourceName}.${sectionName}.loadStorage` - ); - console.log(` ResourcePerformance - -> Loaded storage from ${filename}`); - return { - latestTimestamp: storage.latestTimestamp, - store: storage.store, - }; + return { + latestResourceTimestamp: storage.latestResourceTimestamp, + latestMarketTimestamp: storage.latestMarketTimestamp, + store: storage.store, + }; + } catch (error) { + console.error(`!! Error loading storage from ${filename}: ${error}`); + console.timeEnd( + ` ResourcePerformance - processResourceData.${resourceName}.${sectionName}.loadStorage` + ); + return undefined; + } } export async function clearStorageFiles(): Promise { diff --git a/packages/api/src/performance/resourcePerformance.ts b/packages/api/src/performance/resourcePerformance.ts index 17025d69e..c0e04d231 100644 --- a/packages/api/src/performance/resourcePerformance.ts +++ b/packages/api/src/performance/resourcePerformance.ts @@ -1,13 +1,19 @@ import { Resource } from 'src/models/Resource'; import { epochRepository, + marketPriceRepository, marketRepository, resourcePriceRepository, } from 'src/db'; import { ResourcePrice } from 'src/models/ResourcePrice'; import { Market } from 'src/models/Market'; import { Epoch } from 'src/models/Epoch'; -import { CandleData, StorageData, TrailingAvgData } from './types'; +import { + CandleData, + MarketPriceData, + StorageData, + TrailingAvgData, +} from './types'; import { MoreThan } from 'typeorm'; import { @@ -35,7 +41,8 @@ export class ResourcePerformance { private epochs: Epoch[]; private intervals: number[]; private trailingAvgTime: number; - private lastTimestampProcessed: number = 0; + private lastResourceTimestampProcessed: number = 0; + private lastMarketTimestampProcessed: number = 0; // Persistent storage. The main storage for the resource performance data and where all the data is pulled when required private persistentStorage: StorageData = {}; @@ -75,6 +82,17 @@ export class ResourcePerformance { trailingAvgData: TrailingAvgData[]; }; }; + marketProcessData: { + [interval: number]: { + [epochId: string]: { + open: bigint; + high: bigint; + low: bigint; + close: bigint; + nextTimestamp: number; + }; + }; + }; } = { dbResourcePrices: [], dbResourcePricesLength: 0, @@ -83,6 +101,7 @@ export class ResourcePerformance { indexProcessData: {}, resourceProcessData: {}, trailingAvgProcessData: {}, + marketProcessData: {}, }; constructor( @@ -106,17 +125,15 @@ export class ResourcePerformance { resourceStore: { data: [], metadata: [], - pointers: {}, trailingAvgData: [], }, trailingAvgStore: { data: [], metadata: [], - pointers: {}, trailingAvgData: [], }, indexStore: {}, - // marketStore: {}, + marketStore: {}, }; this.runtime.resourceProcessData[interval] = { open: 0n, @@ -144,25 +161,40 @@ export class ResourcePerformance { this.persistentStorage = restoredStorage.store; - this.lastTimestampProcessed = restoredStorage.latestTimestamp; + this.lastResourceTimestampProcessed = + restoredStorage.latestResourceTimestamp; + this.lastMarketTimestampProcessed = restoredStorage.latestMarketTimestamp; await this.pullMarketsAndEpochs(false); - await this.processResourceData(this.lastTimestampProcessed); + await this.processResourceData( + this.lastResourceTimestampProcessed, + this.lastMarketTimestampProcessed + ); } - private async processResourceData(initialTimestamp?: number) { + private async processResourceData( + initialResourceTimestamp?: number, + initialMarketTimestamp?: number + ) { if (this.runtime.processingResourceItems) { throw new Error('Resource prices are already being processed'); } console.time( - ` ResourcePerformance.processResourceData.${this.resource.slug}.total (${initialTimestamp})` + ` ResourcePerformance.processResourceData.${this.resource.slug}.total (${initialResourceTimestamp})` ); this.runtime.processingResourceItems = true; - const dbResourcePrices = await this.pullResourcePrices(initialTimestamp); - if (dbResourcePrices.length === 0) { + const dbResourcePrices = await this.pullResourcePrices( + initialResourceTimestamp + ); + const dbMarketPrices = await this.pullMarketPrices(initialMarketTimestamp); + + if (dbResourcePrices.length === 0 && dbMarketPrices.length === 0) { + console.timeEnd( + ` ResourcePerformance.processResourceData.${this.resource.slug}.total (${initialResourceTimestamp})` + ); this.runtime.processingResourceItems = false; return; } @@ -190,13 +222,35 @@ export class ResourcePerformance { this.runtime.currentIdx++; } + // Process all market prices + let marketIdx = 0; + const dbMarketPricesLength = dbMarketPrices.length; + while (marketIdx < dbMarketPricesLength) { + const item = dbMarketPrices[marketIdx]; + for (const interval of this.intervals) { + this.processMarketPriceData( + item, + marketIdx, + interval, + dbMarketPricesLength === marketIdx + 1 + ); + } + marketIdx++; + } + // Update the last timestamp processed if (this.runtime.dbResourcePricesLength > 0) { - this.lastTimestampProcessed = + this.lastResourceTimestampProcessed = this.runtime.dbResourcePrices[ this.runtime.dbResourcePricesLength - 1 ].timestamp; } + + if (dbMarketPricesLength > 0) { + this.lastMarketTimestampProcessed = + dbMarketPrices[dbMarketPricesLength - 1].timestamp; + } + console.timeEnd( ` ResourcePerformance.processResourceData.${this.resource.slug}.process` ); @@ -211,7 +265,7 @@ export class ResourcePerformance { ); console.timeEnd( - ` ResourcePerformance.processResourceData.${this.resource.slug}.total (${initialTimestamp})` + ` ResourcePerformance.processResourceData.${this.resource.slug}.total (${initialResourceTimestamp})` ); this.runtime.processingResourceItems = false; } @@ -251,6 +305,45 @@ export class ResourcePerformance { return dbResourcePrices; } + private async pullMarketPrices(initialTimestamp?: number) { + // Build the query based on whether we have an initialTimestamp + console.time( + ` ResourcePerformance.processResourceData.${this.resource.slug}.find.marketPrices` + ); + + const dbMarketPrices = await marketPriceRepository + .createQueryBuilder('marketPrice') + .leftJoinAndSelect('marketPrice.transaction', 'transaction') + .leftJoinAndSelect('transaction.event', 'event') + .leftJoinAndSelect('event.market', 'market') + .leftJoinAndSelect('market.resource', 'resource') + .leftJoinAndSelect('transaction.position', 'position') + .leftJoinAndSelect('position.epoch', 'epoch') + .where('resource.id = :resourceId', { resourceId: this.resource.id }) + .andWhere('CAST(marketPrice.timestamp AS bigint) > :from', { + from: initialTimestamp?.toString() ?? '0', + }) + .orderBy('marketPrice.timestamp', 'ASC') + .getMany(); + + const reducedDbMarketPrices = dbMarketPrices.map((item) => ({ + value: item.value, + timestamp: Number(item.timestamp), + epoch: item.transaction.position.epoch.id, + })); + + console.timeEnd( + ` ResourcePerformance.processResourceData.${this.resource.slug}.find.marketPrices` + ); + + console.log( + ` ResourcePerformance.processResourceData.${this.resource.slug}.find.marketPrices.length`, + dbMarketPrices.length + ); + + return reducedDbMarketPrices; + } + private async pullMarketsAndEpochs(onlyIfMissing: boolean = true) { // Find markets if not already loaded // Notice: doing it everytime since we don't know if a new market was added @@ -293,7 +386,6 @@ export class ResourcePerformance { this.runtime.dbResourcePrices = dbResourcePrices; this.runtime.dbResourcePricesLength = dbResourcePrices.length; this.runtime.currentIdx = 0; - this.runtime.processingResourceItems = false; // Reset processing data structures // We don't need complex initialization anymore since our processing methods @@ -301,6 +393,7 @@ export class ResourcePerformance { this.runtime.indexProcessData = {}; this.runtime.resourceProcessData = {}; this.runtime.trailingAvgProcessData = {}; + this.runtime.marketProcessData = {}; // Initialize with empty objects for each interval for (const interval of this.intervals) { @@ -326,6 +419,7 @@ export class ResourcePerformance { }; this.runtime.indexProcessData[interval] = {}; + this.runtime.marketProcessData[interval] = {}; for (const epoch of this.epochs) { this.runtime.indexProcessData[interval][epoch.id] = { @@ -333,13 +427,21 @@ export class ResourcePerformance { feePaid: 0n, nextTimestamp: 0, }; + this.runtime.marketProcessData[interval][epoch.id] = { + open: 0n, + high: 0n, + low: 0n, + close: 0n, + nextTimestamp: 0, + }; } } } private async persistStorage() { const storage = this.persistentStorage; - const lastTimestampProcessed = this.lastTimestampProcessed; + const lastResourceTimestampProcessed = this.lastResourceTimestampProcessed; + const lastMarketTimestampProcessed = this.lastMarketTimestampProcessed; const resourceSlug = this.resource.slug; const resourceName = this.resource.name; @@ -347,7 +449,8 @@ export class ResourcePerformance { // Interval resource store await saveStorageToFile( storage[interval], - lastTimestampProcessed, + lastResourceTimestampProcessed, + lastMarketTimestampProcessed, resourceSlug, resourceName, interval.toString() @@ -357,7 +460,8 @@ export class ResourcePerformance { private async restorePersistedStorage(): Promise< | { - latestTimestamp: number; + latestResourceTimestamp: number; + latestMarketTimestamp: number; store: StorageData; } | undefined @@ -365,7 +469,8 @@ export class ResourcePerformance { const resourceSlug = this.resource.slug; const resourceName = this.resource.name; const restoredStorage: StorageData = {}; - let latestTimestamp = 0; + let latestResourceTimestamp = 0; + let latestMarketTimestamp = 0; for (const interval of this.intervals) { const storageInterval = await loadStorageFromFile( resourceSlug, @@ -376,24 +481,17 @@ export class ResourcePerformance { return undefined; } restoredStorage[interval] = storageInterval.store; - latestTimestamp = storageInterval.latestTimestamp; - // const indexStoreMetadata = - // restoredStorage[interval].indexStore['1'].metadata; - // const lastMetadata = indexStoreMetadata[indexStoreMetadata.length - 1]; + latestResourceTimestamp = storageInterval.latestResourceTimestamp; + latestMarketTimestamp = storageInterval.latestMarketTimestamp; } return { - latestTimestamp, + latestResourceTimestamp, + latestMarketTimestamp, store: restoredStorage, }; } - // async backfillMarketPrices() { - // } - - // getMarketPrices(from: number, to: number, interval: number) { - // } - private processResourcePriceData( item: ResourcePrice, currentIdx: number, @@ -438,8 +536,6 @@ export class ResourcePerformance { used: 0n, feePaid: 0n, }); - - resourceStore.pointers[item.timestamp] = resourceStore.data.length - 1; } } @@ -502,8 +598,6 @@ export class ResourcePerformance { used: 0n, feePaid: 0n, }); - - resourceStore.pointers[item.timestamp] = resourceStore.data.length - 1; } } else { // Update the current interval @@ -554,7 +648,6 @@ export class ResourcePerformance { this.persistentStorage[interval].indexStore[epoch.id] = { data: [], metadata: [], - pointers: {}, trailingAvgData: [], // Unused in index store }; } @@ -598,8 +691,6 @@ export class ResourcePerformance { used: ripd.used, feePaid: ripd.feePaid, }); - - piStore.pointers[item.timestamp] = piStore.data.length - 1; } } @@ -684,8 +775,6 @@ export class ResourcePerformance { used: ripd.used, feePaid: ripd.feePaid, }); - - piStore.pointers[item.timestamp] = piStore.data.length - 1; } } } @@ -773,8 +862,6 @@ export class ResourcePerformance { if (!ptStore.trailingAvgData) { ptStore.trailingAvgData = []; } - - ptStore.pointers[item.timestamp] = ptStore.data.length - 1; } } @@ -785,23 +872,6 @@ export class ResourcePerformance { const isLastItem = currentIdx === this.runtime.dbResourcePricesLength - 1; const isNewInterval = item.timestamp >= rtpd.nextTimestamp; - // if (interval == 300 && epoch.id == 2) { - // const avgPrice = ripd.used > 0n ? ripd.feePaid / ripd.used : 0n; - // console.log( - // 'LLL 18 ', - // interval, - // epoch.id, - // item.timestamp, - // ripd.used.toString(), - // ripd.feePaid.toString(), - // avgPrice.toString(), - // item.used.toString(), - // item.feePaid.toString(), - // (ripd.used + BigInt(item.used)).toString(), - // (ripd.feePaid + BigInt(item.feePaid)).toString() - // ); - // } - // Include the new item in accumulators and the runtime trailing avg data rtpd.used += BigInt(item.used); rtpd.feePaid += BigInt(item.feePaid); @@ -882,8 +952,6 @@ export class ResourcePerformance { used: rtpd.used, feePaid: rtpd.feePaid, }); - - ptStore.pointers[item.timestamp] = ptStore.data.length - 1; } } } @@ -905,6 +973,138 @@ export class ResourcePerformance { } } + private processMarketPriceData( + item: MarketPriceData, + currentIdx: number, + interval: number, + isLastItem: boolean + ) { + for (const epoch of this.epochs) { + if (epoch.id != item.epoch) { + continue; + } + + const epochStartTime = epoch.startTimestamp; + const epochEndTime = epoch.endTimestamp; + // Skip data points that are not in the epoch + if ( + !epochStartTime || + item.timestamp < epochStartTime || + (epochEndTime && item.timestamp > epochEndTime) + ) { + continue; + } + const itemValueBn = BigInt(item.value); + + const rmpd = this.runtime.marketProcessData[interval][epoch.id]; + + if (!rmpd.nextTimestamp) { + rmpd.nextTimestamp = this.startOfNextInterval(item.timestamp, interval); + rmpd.open = itemValueBn; + rmpd.high = itemValueBn; + rmpd.low = itemValueBn; + rmpd.close = itemValueBn; + + if (!this.persistentStorage[interval].marketStore[epoch.id]) { + this.persistentStorage[interval].marketStore[epoch.id] = { + data: [], + metadata: [], + trailingAvgData: [], + }; + } + + const pmStore = this.persistentStorage[interval].marketStore[epoch.id]; + + // Create a placeholder in the store + const itemStartTime = this.startOfCurrentInterval( + item.timestamp, + interval + ); + + // Check if we already have an item for this interval + const lastStoreIndex = + pmStore.data.length > 0 ? pmStore.data.length - 1 : undefined; + + // Get cached data from the latest stored item + if (lastStoreIndex !== undefined) { + const previousData = pmStore.data[lastStoreIndex]; + rmpd.open = BigInt(previousData.open); + rmpd.high = BigInt(previousData.high); + rmpd.low = BigInt(previousData.low); + rmpd.close = BigInt(previousData.close); + } + + const isLastStoredItem = + lastStoreIndex !== undefined + ? pmStore.data[lastStoreIndex].timestamp == itemStartTime + : false; + + if (!isLastStoredItem) { + pmStore.data.push({ + timestamp: itemStartTime, + open: rmpd.close.toString(), // open is the previous close + high: maxBigInt(rmpd.high, itemValueBn).toString(), + low: minBigInt(rmpd.low, itemValueBn).toString(), + close: itemValueBn.toString(), + }); + } + } + + // Get the current placeholder index (the last item in the store) + const pmStore = this.persistentStorage[interval].marketStore[epoch.id]; + const currentPlaceholderIndex = pmStore.data.length - 1; + + const isNewInterval = item.timestamp >= rmpd.nextTimestamp; + const isEndOfEpoch = epochEndTime && item.timestamp > epochEndTime; + + rmpd.open = rmpd.open === 0n ? itemValueBn : rmpd.open; + if (isNewInterval || isLastItem || isEndOfEpoch) { + // Finalize the current interval + pmStore.data[currentPlaceholderIndex] = { + timestamp: pmStore.data[currentPlaceholderIndex].timestamp, + open: rmpd.open.toString(), + high: maxBigInt(rmpd.high, itemValueBn).toString(), + low: minBigInt(rmpd.low, itemValueBn).toString(), + close: itemValueBn.toString(), + }; + // Prepare the next interval + rmpd.nextTimestamp = this.startOfNextInterval(item.timestamp, interval); + rmpd.open = itemValueBn; + rmpd.high = itemValueBn; + rmpd.low = itemValueBn; + rmpd.close = itemValueBn; + + // Create a placeholder for the next interval + const itemStartTime = this.startOfCurrentInterval( + item.timestamp, + interval + ); + + // Check if we already have an item for this interval + const existingIndex = pmStore.data.findIndex( + (d) => + d.timestamp >= itemStartTime && d.timestamp < rmpd.nextTimestamp + ); + + if (existingIndex === -1 && !isEndOfEpoch) { + // Create a new placeholder + pmStore.data.push({ + timestamp: itemStartTime, + open: item.value, + high: item.value, + low: item.value, + close: item.value, + }); + } + } else { + // Update the current interval min/max values + rmpd.high = maxBigInt(rmpd.high, itemValueBn); + rmpd.low = minBigInt(rmpd.low, itemValueBn); + rmpd.close = itemValueBn; + } + } + } + private getEpochId(chainId: number, address: string, epoch: string) { const theEpoch = this.epochs.find( (e) => @@ -962,6 +1162,33 @@ export class ResourcePerformance { ); } + async getMarketPrices( + from: number, + to: number, + interval: number, + chainId: number, + address: string, + epoch: string + ) { + this.checkInterval(interval); + const epochId = this.getEpochId(chainId, address, epoch); + if (!this.persistentStorage[interval].marketStore[epochId]) { + return []; + } + + const prices = await this.getPricesFromArray( + this.persistentStorage[interval].marketStore[epochId].data, + from, + to, + interval, + false + ); + + const filledPrices = this.fillMissingCandles(prices, from, to, interval); + + return filledPrices; + } + getMarketFromChainAndAddress(chainId: number, address: string) { return this.markets.find( (m) => @@ -995,7 +1222,10 @@ export class ResourcePerformance { ); } else { // Process new data starting from the last timestamp we processed - await this.processResourceData(this.lastTimestampProcessed); + await this.processResourceData( + this.lastResourceTimestampProcessed, + this.lastMarketTimestampProcessed + ); } } } @@ -1005,25 +1235,19 @@ export class ResourcePerformance { from: number, to: number, interval: number, - fillMissing: boolean = true + fillInitialDatapoints: boolean = true ) { if (prices.length === 0) { return []; } - const timeWindow: { - from: number; - to: number; - } = { - from: this.startOfCurrentInterval(from, interval), - to: this.startOfCurrentInterval(to, interval), - }; + const timeWindow = this.getTimeWindow(from, to, interval); // Check if we need to process new data for this requested time range await this.updateStoreIfNeeded(prices, timeWindow.to); // If there are no prices or window starts before first price, add zero entries - if (fillMissing && timeWindow.from < prices[0].timestamp) { + if (fillInitialDatapoints && timeWindow.from < prices[0].timestamp) { const zeroEntries = []; for (let t = timeWindow.from; t < prices[0].timestamp; t += interval) { zeroEntries.push({ @@ -1063,6 +1287,88 @@ export class ResourcePerformance { })); } + private fillMissingCandles( + prices: CandleData[], + from: number, + to: number, + interval: number + ) { + const timeWindow = this.getTimeWindow(from, to, interval); + + const outputEntries = []; + for (let t = timeWindow.from; t < timeWindow.to; t += interval) { + outputEntries.push({ + timestamp: t, + open: '0', + high: '0', + low: '0', + close: '0', + }); + } + let outputIdx = 0; + let pricesIdx = 0; + let lastClose = '0'; + const pricesLength = prices.length; + if (pricesLength === 0) { + return outputEntries; + } + + let nextPriceItemTimestamp = prices[pricesIdx].timestamp; + + while (outputIdx < outputEntries.length) { + if (outputEntries[outputIdx].timestamp < nextPriceItemTimestamp) { + outputEntries[outputIdx].close = lastClose; + outputEntries[outputIdx].high = lastClose; + outputEntries[outputIdx].low = lastClose; + outputEntries[outputIdx].open = lastClose; + + outputIdx++; + continue; + } + + if (outputEntries[outputIdx].timestamp == nextPriceItemTimestamp) { + outputEntries[outputIdx] = prices[pricesIdx]; + lastClose = prices[pricesIdx].close; + pricesIdx++; + nextPriceItemTimestamp = + pricesIdx < pricesLength + ? prices[pricesIdx].timestamp + : timeWindow.to + 1; // set it in the future if not more prices to fall in the first if for the next loops + + outputIdx++; + continue; + } + + if (outputEntries[outputIdx].timestamp > nextPriceItemTimestamp) { + // pick the last known price first + let lastKnownPrice = prices[pricesIdx].close; + + // then move the prices the price in the prices array + while ( + nextPriceItemTimestamp < outputEntries[outputIdx].timestamp || + pricesIdx < pricesLength + ) { + nextPriceItemTimestamp = prices[pricesIdx].timestamp; + lastKnownPrice = prices[pricesIdx].close; + pricesIdx++; + } + + if (nextPriceItemTimestamp === outputEntries[outputIdx].timestamp) { + outputEntries[outputIdx] = prices[pricesIdx]; + } else { + outputEntries[outputIdx].close = lastKnownPrice; + outputEntries[outputIdx].high = lastKnownPrice; + outputEntries[outputIdx].low = lastKnownPrice; + outputEntries[outputIdx].open = lastKnownPrice; + } + + outputIdx++; + } + } + + return outputEntries; + } + startOfCurrentInterval( timestamp: number, interval: number | undefined = undefined @@ -1082,4 +1388,11 @@ export class ResourcePerformance { } return (Math.floor(timestamp / interval) + 1) * interval; } + + private getTimeWindow(from: number, to: number, interval: number) { + return { + from: this.startOfCurrentInterval(from, interval), + to: this.startOfCurrentInterval(to, interval), + }; + } } diff --git a/packages/api/src/performance/types.ts b/packages/api/src/performance/types.ts index 61a946ded..b9505c6e5 100644 --- a/packages/api/src/performance/types.ts +++ b/packages/api/src/performance/types.ts @@ -12,6 +12,12 @@ export type TrailingAvgData = { feePaid: string; }; +export type MarketPriceData = { + value: string; + timestamp: number; + epoch: number; +}; + export type CandleMetadata = { used: bigint; feePaid: bigint; @@ -22,9 +28,6 @@ export type CandleMetadata = { export type IndexStore = { data: CandleData[]; metadata: CandleMetadata[]; - pointers: { - [closestTimestamp: number]: number; - }; trailingAvgData: TrailingAvgData[]; }; @@ -34,9 +37,9 @@ export type IntervalStore = { indexStore: { [epoch: string]: IndexStore; }; - // marketStore: { - // [market: string]: IndexStore; - // }; + marketStore: { + [epoch: string]: IndexStore; + }; }; export type StorageData = {