Skip to content

Refactor code for init state creation #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -487,28 +487,28 @@ export class Indexer implements IndexerInterface {
await this.triggerIndexingOnEvent(event, extraData);
}

async processBlock (blockProgress: BlockProgress): Promise<void> {
console.time('time:indexer#processBlock-init_state');
async preEventsBlockProcessing (blockProgress: BlockProgress): Promise<void> {
console.time('time:indexer#preEventsBlockProcessing-init_state');
// Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
console.timeEnd('time:indexer#processBlock-init_state');
console.timeEnd('time:indexer#preEventsBlockProcessing-init_state');
{{#if (subgraphPath)}}

this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
{{/if}}
}

{{#if (subgraphPath)}}
async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> {
console.time('time:indexer#processBlockAfterEvents-mapping_code');
async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> {
console.time('time:indexer#postEventsBlockProcessing-mapping_code');
// Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData);
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code');

console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
// Persist subgraph state to the DB.
await this.dumpSubgraphState(blockHash);
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
}

{{/if}}
Expand Down
2 changes: 1 addition & 1 deletion packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ export class Indexer implements IndexerInterface {
return undefined;
}

async processBlock (blockProgress: BlockProgressInterface): Promise<void> {
async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise<void> {
return undefined;
}

Expand Down
23 changes: 18 additions & 5 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,25 @@ export const processBatchEvents = async (
subgraphEventsOrder: boolean;
}
): Promise<boolean> => {
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[];
let updatedDbEvents: EventInterface[];
let isNewContractWatched = false;
let { block: dbBlock } = data;

// Perform any operations before processing events for this block
// (if this block hasn't already been processed)
if (!dbBlock.isComplete) {
await indexer.preEventsBlockProcessing(data.block);
}

if (subgraphEventsOrder) {
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else {
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
}

if (indexer.processBlockAfterEvents) {
if (!dbBlock.isComplete) {
await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data);
}
// Perform any operations after processing events for this block
if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) {
await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data);
}

dbBlock.isComplete = true;
Expand Down Expand Up @@ -361,6 +367,13 @@ const _processEventsInSubgraphOrder = async (
}
}

if (isNewContractWatched) {
// Create init states for newly watched contracts
// (needs to be done before we start processsing their events)
assert(indexer.createInit);
await indexer.createInit(block.blockHash, block.blockNumber);
}

console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
// In the end process events of newly watched contracts
for (const updatedDbEvent of updatedDbEvents) {
Expand Down
3 changes: 0 additions & 3 deletions packages/util/src/index-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ export const indexBlock = async (
blockProgress = partialblockProgress as BlockProgressInterface;
}

assert(indexer.processBlock);
await indexer.processBlock(blockProgress);

await processBatchEvents(
indexer,
{
Expand Down
59 changes: 46 additions & 13 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,22 @@ export class Indexer {
return blocks;
}

async getBlockWithFullBlock (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<{ block: DeepPartial<BlockProgressInterface>, fullBlock: EthFullBlock }> {
const [fullBlock] = await this.getBlocks(blockFilter);
assert(fullBlock);

const block = {
...fullBlock,
blockTimestamp: Number(fullBlock.timestamp),
blockNumber: Number(fullBlock.blockNumber)
};

return {
block: block as DeepPartial<BlockProgressInterface>,
fullBlock
};
}

async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
return this._db.getBlockProgress(blockHash);
}
Expand Down Expand Up @@ -394,6 +410,17 @@ export class Indexer {

const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);

// Create a set of starting blocks of watched contracts in range [fromBlock, toBlock]
// TODO: Optimize (avoid template contracts)
const watchedBlockNumbers = Object.keys(this._watchedContracts).reduce((acc: Set<number>, address: string) => {
const startBlock = this._watchedContracts[address].startingBlock;
if (startBlock >= fromBlock && startBlock <= toBlock) {
acc.add(startBlock);
}

return acc;
}, new Set());

const { logs } = await this._ethClient.getLogsForBlockRange({
fromBlock,
toBlock,
Expand All @@ -410,19 +437,13 @@ export class Indexer {
// Fetch blocks with transactions for the logs returned
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash });
assert(fullBlock);
const { block, fullBlock } = await this.getBlockWithFullBlock({ blockHash });

const block = {
...fullBlock,
blockTimestamp: Number(fullBlock.timestamp),
blockNumber: Number(fullBlock.blockNumber)
};
// Remove this block from watchedBlockNumbers set as it's already fetched
assert(block.blockNumber);
watchedBlockNumbers.delete(block.blockNumber);

return {
block: block as DeepPartial<BlockProgressInterface>,
fullBlock
};
return { block, fullBlock };
});

const ethFullTxPromises = txHashes.map(async txHash => {
Expand All @@ -432,6 +453,19 @@ export class Indexer {
const blocks = await Promise.all(blocksPromises);
const ethFullTxs = await Promise.all(ethFullTxPromises);

// Fetch starting blocks for watched contracts
const watchedBlocks = await Promise.all(
Array.from(watchedBlockNumbers).map(async (blockNumber) => this.getBlockWithFullBlock({ blockNumber }))
);

// Merge and sort the two block lists
const sortedBlocks = [...blocks, ...watchedBlocks].sort((b1, b2) => {
assert(b1.block.blockNumber);
assert(b2.block.blockNumber);

return b1.block.blockNumber - b2.block.blockNumber;
});

const ethFullTxsMap = ethFullTxs.reduce((acc: Map<string, EthFullTransaction>, ethFullTx) => {
acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx);
return acc;
Expand All @@ -441,7 +475,7 @@ export class Indexer {

// Map db ready events according to blockhash
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => {
const blockWithDbEventsPromises = sortedBlocks.map(async ({ block, fullBlock }) => {
const blockHash = block.blockHash;
assert(blockHash);
const logs = blockLogsMap.get(blockHash) || [];
Expand All @@ -464,7 +498,6 @@ export class Indexer {
blockProgress,
ethFullBlock: fullBlock,
ethFullTransactions: blockEthFullTxs,
block,
events: []
};
});
Expand Down
4 changes: 0 additions & 4 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,6 @@ export class JobRunner {
});
}

if (!blockProgress.isComplete) {
await this._indexer.processBlock(blockProgress);
}

// Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents.
const eventsProcessingJob: EventsJobData = {
Expand Down
4 changes: 2 additions & 2 deletions packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ export interface IndexerInterface {
getRelationsMap?: () => Map<any, { [key: string]: any }>
processInitialState: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean>
processBlock: (blockProgres: BlockProgressInterface) => Promise<void>
processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void>
preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise<void>
postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void>
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
processCheckpoint (blockHash: string): Promise<void>
processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined>
Expand Down