Skip to content

Commit 55b0da6

Browse files
committed
Turn reorg checkpoints into general checkpoints
1 parent 6408d01 commit 55b0da6

File tree

9 files changed

+160
-36
lines changed

9 files changed

+160
-36
lines changed

codegenerator/cli/npm/envio/src/Batch.res

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,28 @@
1+
open Belt
2+
3+
type batchCheckpoint = {
4+
checkpointId: bigint,
5+
chainId: int,
6+
blockNumber: int,
7+
// Might be empty if we are not in reorg threshold
8+
// or rollback on reorg is disabled
9+
blockHash?: string,
10+
// Might be empty if we it's a reorg guard block
11+
items?: array<Internal.item>,
12+
}
13+
14+
type chainBeforeBatch = {
15+
chainId: int,
16+
fetchState: FetchState.t,
17+
blocks: ChainBlocks.t,
18+
totalEventsProcessed: int,
19+
}
20+
21+
type mutChainAcc = {
22+
mutable batchSize: int,
23+
mutable lastCheckpoint: batchCheckpoint,
24+
}
25+
126
type progressedChain = {
227
chainId: int,
328
batchSize: int,
@@ -6,11 +31,12 @@ type progressedChain = {
631
}
732

833
type t = {
9-
items: array<Internal.item>,
34+
checkpoints: array<batchCheckpoint>,
1035
progressedChains: array<progressedChain>,
11-
updatedFetchStates: ChainMap.t<FetchState.t>,
12-
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
13-
creationTimeMs: int,
36+
totalBatchSize: int,
37+
// updatedFetchStates: ChainMap.t<FetchState.t>,
38+
// dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
39+
// creationTimeMs: int,
1440
}
1541

1642
/**
@@ -21,7 +47,7 @@ let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerC
2147
let earliestChainTimestamp = ref(0)
2248
let chainKeys = fetchStates->ChainMap.keys
2349
for idx in 0 to chainKeys->Array.length - 1 {
24-
let chain = chainKeys->Array.get(idx)
50+
let chain = chainKeys->Array.getUnsafe(idx)
2551
let fetchState = fetchStates->ChainMap.get(chain)
2652
if fetchState->FetchState.isActivelyIndexing {
2753
let timestamp = fetchState->FetchState.getTimestampAt(
@@ -120,41 +146,59 @@ let prepareOrderedBatch = (
120146
}
121147

122148
let prepareUnorderedBatch = (
149+
~chains: array<chainBeforeBatch>,
123150
~batchSizeTarget,
124-
~fetchStates: ChainMap.t<FetchState.t>,
125-
~mutBatchSizePerChain: dict<int>,
126-
) => {
151+
~progressCheckpointId,
152+
): t => {
153+
let progressCheckpointId = ref(progressCheckpointId)
154+
let accPerChain = Js.Dict.empty()
155+
127156
let preparedFetchStates =
128-
fetchStates
129-
->ChainMap.values
157+
chains
158+
->Array.map(chain => chain.fetchState)
130159
->FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)
131160

132161
let chainIdx = ref(0)
133162
let preparedNumber = preparedFetchStates->Array.length
134-
let batchSize = ref(0)
163+
let totalBatchSize = ref(0)
135164

136165
let items = []
137166

138167
// Accumulate items for all actively indexing chains
139168
// the way to group as many items from a single chain as possible
140169
// This way the loaders optimisations will hit more often
141-
while batchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
170+
while totalBatchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
142171
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(chainIdx.contents)
172+
let chainAcc = switch accPerChain->Utils.Dict.dangerouslyGetByIntNonOption(fetchState.chainId) {
173+
| Some(chainAcc) => chainAcc
174+
| None =>
175+
let acc = {
176+
batchSize: 0,
177+
lastCheckpoint: %raw(`null`),
178+
}
179+
accPerChain->Utils.Dict.setByInt(fetchState.chainId, acc)
180+
acc
181+
}
182+
143183
let chainBatchSize =
144184
fetchState->FetchState.getReadyItemsCount(
145-
~targetSize=batchSizeTarget - batchSize.contents,
185+
~targetSize=batchSizeTarget - totalBatchSize.contents,
146186
~fromItem=0,
147187
)
148188
if chainBatchSize > 0 {
149189
for idx in 0 to chainBatchSize - 1 {
150190
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
151191
}
152-
batchSize := batchSize.contents + chainBatchSize
153-
mutBatchSizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
192+
totalBatchSize := totalBatchSize.contents + chainBatchSize
193+
chainAcc.batchSize = chainBatchSize
154194
}
155195

156196
chainIdx := chainIdx.contents + 1
157197
}
158198

159-
items
199+
{
200+
totalBatchSize: totalBatchSize.contents,
201+
progressedChains: [],
202+
checkpoints: [],
203+
}
160204
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
open Belt
2+
3+
type t = {
4+
shouldRollbackOnReorg: bool,
5+
lastBlockScannedHashes: ReorgDetection.LastBlockScannedHashes.t,
6+
}
7+
8+
let make = (
9+
~chainId,
10+
~maxReorgDepth,
11+
~shouldRollbackOnReorg,
12+
~reorgCheckpoints: array<InternalTable.Checkpoints.t>,
13+
) => {
14+
{
15+
shouldRollbackOnReorg,
16+
lastBlockScannedHashes: reorgCheckpoints
17+
->Array.keepMapU(reorgCheckpoint => {
18+
if reorgCheckpoint.chainId === chainId {
19+
Some({
20+
ReorgDetection.blockNumber: reorgCheckpoint.blockNumber,
21+
blockHash: reorgCheckpoint.blockHash,
22+
})
23+
} else {
24+
None
25+
}
26+
})
27+
->ReorgDetection.LastBlockScannedHashes.makeWithData(~maxReorgDepth),
28+
}
29+
}
30+
31+
let registerReorgGuard = (
32+
chainBlocks: t,
33+
~reorgGuard: ReorgDetection.reorgGuard,
34+
~currentBlockHeight: int,
35+
) => {
36+
let (updatedLastBlockScannedHashes, reorgResult: ReorgDetection.reorgResult) =
37+
chainBlocks.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.registerReorgGuard(
38+
~reorgGuard,
39+
~currentBlockHeight,
40+
~shouldRollbackOnReorg=chainBlocks.shouldRollbackOnReorg,
41+
)
42+
(
43+
{
44+
...chainBlocks,
45+
lastBlockScannedHashes: updatedLastBlockScannedHashes,
46+
},
47+
reorgResult,
48+
)
49+
}

codegenerator/cli/npm/envio/src/Persistence.res

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type initialState = {
1717
cleanRun: bool,
1818
cache: dict<effectCacheRecord>,
1919
chains: array<InternalTable.Chains.t>,
20+
reorgCheckpoints: array<InternalTable.Checkpoints.t>,
2021
}
2122

2223
type operator = [#">" | #"="]

codegenerator/cli/npm/envio/src/PgStorage.res

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ let makeInitializeTransaction = (
6565
let generalTables = [
6666
InternalTable.Chains.table,
6767
InternalTable.PersistedState.table,
68-
InternalTable.ReorgCheckpoints.table,
68+
InternalTable.Checkpoints.table,
6969
InternalTable.RawEvents.table,
7070
]
7171

@@ -707,6 +707,7 @@ let make = (
707707
{
708708
cleanRun: true,
709709
cache,
710+
reorgCheckpoints: [],
710711
chains: chainConfigs->Js.Array2.map(InternalTable.Chains.initialFromConfig),
711712
}
712713
}
@@ -895,17 +896,23 @@ let make = (
895896
}
896897

897898
let resumeInitialState = async (): Persistence.initialState => {
898-
let (cache, chains) = await Promise.all2((
899+
let (cache, chains, reorgCheckpoints) = await Promise.all3((
899900
restoreEffectCache(~withUpload=false),
900901
sql
901902
->Postgres.unsafe(
902903
makeLoadAllQuery(~pgSchema, ~tableName=InternalTable.Chains.table.tableName),
903904
)
904905
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Chains.t>>),
906+
sql
907+
->Postgres.unsafe(
908+
makeLoadAllQuery(~pgSchema, ~tableName=InternalTable.Checkpoints.table.tableName),
909+
)
910+
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Checkpoints.t>>),
905911
))
906912

907913
{
908914
cleanRun: false,
915+
reorgCheckpoints,
909916
cache,
910917
chains,
911918
}

codegenerator/cli/npm/envio/src/db/InternalTable.res

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,13 @@ WHERE "id" = $1;`
206206
Promise.all(promises)
207207
}
208208

209-
let setProgressedChains = (sql, ~pgSchema, ~progressedChains: array<Batch.progressedChain>) => {
209+
type progressedChain = {
210+
chainId: int,
211+
progressBlockNumber: int,
212+
totalEventsProcessed: int,
213+
}
214+
215+
let setProgressedChains = (sql, ~pgSchema, ~progressedChains: array<progressedChain>) => {
210216
let query = makeProgressFieldsUpdateQuery(~pgSchema)
211217

212218
let promises = []
@@ -259,24 +265,27 @@ module PersistedState = {
259265
)
260266
}
261267

262-
module ReorgCheckpoints = {
268+
module Checkpoints = {
263269
type t = {
264270
id: bigint,
265271
@as("chain_id")
266272
chainId: int,
267273
@as("block_number")
268274
blockNumber: int,
269275
@as("block_hash")
270-
blockHash: string,
276+
blockHash: Js.null<string>,
277+
@as("events_processed")
278+
eventsProcessed: int,
271279
}
272280

273281
let table = mkTable(
274-
"envio_reorg_checkpoints",
282+
"envio_checkpoints",
275283
~fields=[
276284
mkField("id", Numeric, ~fieldSchema=S.bigint, ~isPrimaryKey),
277285
mkField("chain_id", Integer, ~fieldSchema=S.int),
278286
mkField("block_number", Integer, ~fieldSchema=S.int),
279-
mkField("block_hash", Text, ~fieldSchema=S.string),
287+
mkField("block_hash", Text, ~fieldSchema=S.null(S.string), ~isNullable),
288+
mkField("events_processed", Integer, ~fieldSchema=S.int),
280289
],
281290
)
282291
}

codegenerator/cli/npm/envio/src/db/Table.res

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ type derived
55
@unboxed
66
type fieldType =
77
| @as("INTEGER") Integer
8+
| @as("BIGINT") BigInt
89
| @as("BOOLEAN") Boolean
910
| @as("NUMERIC") Numeric
1011
| @as("DOUBLE PRECISION") DoublePrecision

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type t = {
2121
firstEventBlockNumber: option<int>,
2222
numEventsProcessed: int,
2323
numBatchesFetched: int,
24+
blocks: ChainBlocks.t,
2425
//An optional list of filters to apply on event queries
2526
//Used for reorgs and restarts
2627
processingFilters: option<array<processingFilter>>,
@@ -42,6 +43,7 @@ let make = (
4243
~numBatchesFetched,
4344
~isInReorgThreshold,
4445
~maxReorgDepth,
46+
~reorgCheckpoints,
4547
): t => {
4648
// We don't need the router itself, but only validation logic,
4749
// since now event router is created for selection of events
@@ -202,6 +204,12 @@ let make = (
202204
timestampCaughtUpToHeadOrEndblock,
203205
numEventsProcessed,
204206
numBatchesFetched,
207+
blocks: ChainBlocks.make(
208+
~chainId=chainConfig.id,
209+
~maxReorgDepth,
210+
~reorgCheckpoints,
211+
~shouldRollbackOnReorg=config->Config.shouldRollbackOnReorg,
212+
),
205213
processingFilters: None,
206214
}
207215
}
@@ -224,6 +232,7 @@ let makeFromConfig = (chainConfig: InternalConfig.chain, ~config, ~targetBufferS
224232
~dynamicContracts=[],
225233
~isInReorgThreshold=false,
226234
~maxReorgDepth=chainConfig.maxReorgDepth,
235+
~reorgCheckpoints=[],
227236
)
228237
}
229238

@@ -233,6 +242,7 @@ let makeFromConfig = (chainConfig: InternalConfig.chain, ~config, ~targetBufferS
233242
let makeFromDbState = async (
234243
chainConfig: InternalConfig.chain,
235244
~resumedChainState: InternalTable.Chains.t,
245+
~reorgCheckpoints: array<InternalTable.Checkpoints.t>,
236246
~isInReorgThreshold,
237247
~config,
238248
~targetBufferSize,
@@ -271,6 +281,7 @@ let makeFromDbState = async (
271281
~logger,
272282
~targetBufferSize,
273283
~isInReorgThreshold,
284+
~reorgCheckpoints,
274285
)
275286
}
276287

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ open Belt
33
type t = {
44
chainFetchers: ChainMap.t<ChainFetcher.t>,
55
multichain: InternalConfig.multichain,
6+
// This is an in-memory pointer for checkpoints
7+
// Checkpiont references a block in the order it was processed
8+
// On restart we should use the last checkpoint id used for entity history
9+
// or it's fine to reset it to 0 when there's no history yet
10+
progressCheckpointId: bigint,
611
isInReorgThreshold: bool,
712
}
813

@@ -25,6 +30,7 @@ let makeFromConfig = (~config: Config.t): t => {
2530
{
2631
chainFetchers,
2732
multichain: config.multichain,
33+
progressCheckpointId: 0n,
2834
isInReorgThreshold: false,
2935
}
3036
}
@@ -65,6 +71,7 @@ let makeFromDbState = async (~initialState: Persistence.initialState, ~config: C
6571
await chainConfig->ChainFetcher.makeFromDbState(
6672
~resumedChainState,
6773
~isInReorgThreshold,
74+
~reorgCheckpoints=initialState.reorgCheckpoints,
6875
~targetBufferSize,
6976
~config,
7077
),
@@ -78,6 +85,7 @@ let makeFromDbState = async (~initialState: Persistence.initialState, ~config: C
7885
multichain: config.multichain,
7986
chainFetchers,
8087
isInReorgThreshold,
88+
progressCheckpointId: 0n, // FIXME: get from initialState
8189
}
8290
}
8391

0 commit comments

Comments
 (0)