Skip to content

Commit 2dbe083

Browse files
committed
Move a lot of logic to the Batch module
1 parent f5bf560 commit 2dbe083

File tree

13 files changed

+446
-347
lines changed

13 files changed

+446
-347
lines changed

codegenerator/cli/npm/envio/src/Batch.res

Lines changed: 158 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
1-
type progressedChain = {
2-
chainId: int,
1+
open Belt
2+
3+
type chainAfterBatch = {
34
batchSize: int,
45
progressBlockNumber: int,
56
totalEventsProcessed: int,
7+
fetchState: FetchState.t,
8+
dcsToStore: option<array<FetchState.indexingContract>>,
9+
isProgressAtHeadWhenBatchCreated: bool,
10+
}
11+
12+
type chainBeforeBatch = {
13+
fetchState: FetchState.t,
14+
progressBlockNumber: int,
15+
sourceBlockNumber: int,
16+
totalEventsProcessed: int,
617
}
718

819
type t = {
920
items: array<Internal.item>,
10-
progressedChains: array<progressedChain>,
11-
updatedFetchStates: ChainMap.t<FetchState.t>,
12-
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
13-
creationTimeMs: int,
21+
progressedChainsById: dict<chainAfterBatch>,
1422
}
1523

1624
/**
@@ -21,7 +29,7 @@ let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerC
2129
let earliestChainTimestamp = ref(0)
2230
let chainKeys = fetchStates->ChainMap.keys
2331
for idx in 0 to chainKeys->Array.length - 1 {
24-
let chain = chainKeys->Array.get(idx)
32+
let chain = chainKeys->Array.getUnsafe(idx)
2533
let fetchState = fetchStates->ChainMap.get(chain)
2634
if fetchState->FetchState.isActivelyIndexing {
2735
let timestamp = fetchState->FetchState.getTimestampAt(
@@ -74,14 +82,116 @@ let hasMultichainReadyItem = (
7482
}
7583
}
7684

77-
let prepareOrderedBatch = (
78-
~batchSizeTarget,
79-
~fetchStates: ChainMap.t<FetchState.t>,
80-
~mutBatchSizePerChain: dict<int>,
81-
) => {
85+
let getProgressedChainsById = {
86+
let getChainAfterBatchIfProgressed = (
87+
~chainBeforeBatch: chainBeforeBatch,
88+
~updatedFetchState,
89+
~batchSize,
90+
~dcsToStore,
91+
) => {
92+
let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
93+
94+
// The check is sufficient, since we guarantee to include a full block in a batch
95+
// Also, this might be true even if batchSize is 0,
96+
// eg when indexing at the head and chain doesn't have items in a block
97+
if chainBeforeBatch.progressBlockNumber < nextProgressBlockNumber {
98+
Some(
99+
(
100+
{
101+
batchSize,
102+
progressBlockNumber: nextProgressBlockNumber,
103+
totalEventsProcessed: chainBeforeBatch.totalEventsProcessed + batchSize,
104+
dcsToStore,
105+
fetchState: updatedFetchState,
106+
isProgressAtHeadWhenBatchCreated: nextProgressBlockNumber >=
107+
chainBeforeBatch.sourceBlockNumber,
108+
}: chainAfterBatch
109+
),
110+
)
111+
} else {
112+
None
113+
}
114+
}
115+
116+
(~chainsBeforeBatch: ChainMap.t<chainBeforeBatch>, ~batchSizePerChain: dict<int>) => {
117+
let progressedChainsById = Js.Dict.empty()
118+
119+
// Needed to:
120+
// - Recalculate the computed queue sizes
121+
// - Accumulate registered dynamic contracts to store in the db
122+
// - Trigger onBlock pointer update
123+
chainsBeforeBatch
124+
->ChainMap.values
125+
->Array.forEachU(chainBeforeBatch => {
126+
let fetchState = chainBeforeBatch.fetchState
127+
switch switch batchSizePerChain->Utils.Dict.dangerouslyGetNonOption(
128+
fetchState.chainId->Int.toString,
129+
) {
130+
| Some(batchSize) =>
131+
let leftItems = fetchState.buffer->Js.Array2.sliceFrom(batchSize)
132+
switch fetchState.dcsToStore {
133+
| [] =>
134+
getChainAfterBatchIfProgressed(
135+
~chainBeforeBatch,
136+
~batchSize,
137+
~dcsToStore=None,
138+
~updatedFetchState=fetchState->FetchState.updateInternal(~mutItems=leftItems),
139+
)
140+
141+
| dcs => {
142+
let leftDcsToStore = []
143+
let batchDcs = []
144+
let updatedFetchState =
145+
fetchState->FetchState.updateInternal(~mutItems=leftItems, ~dcsToStore=leftDcsToStore)
146+
let nextProgressBlockNumber = updatedFetchState->FetchState.getProgressBlockNumber
147+
148+
dcs->Array.forEach(dc => {
149+
// Important: This should be a registering block number.
150+
// This works for now since dc.startBlock is a registering block number.
151+
if dc.startBlock <= nextProgressBlockNumber {
152+
batchDcs->Array.push(dc)
153+
} else {
154+
// Mutate the array we passed to the updateInternal beforehand
155+
leftDcsToStore->Array.push(dc)
156+
}
157+
})
158+
159+
getChainAfterBatchIfProgressed(
160+
~chainBeforeBatch,
161+
~batchSize,
162+
~dcsToStore=Some(batchDcs),
163+
~updatedFetchState,
164+
)
165+
}
166+
}
167+
// Skip not affected chains
168+
| None =>
169+
getChainAfterBatchIfProgressed(
170+
~chainBeforeBatch,
171+
~batchSize=0,
172+
~dcsToStore=None,
173+
~updatedFetchState=chainBeforeBatch.fetchState,
174+
)
175+
} {
176+
| Some(progressedChain) =>
177+
progressedChainsById->Utils.Dict.setByInt(
178+
chainBeforeBatch.fetchState.chainId,
179+
progressedChain,
180+
)
181+
| None => ()
182+
}
183+
})
184+
185+
progressedChainsById
186+
}
187+
}
188+
189+
let prepareOrderedBatch = (~chainsBeforeBatch: ChainMap.t<chainBeforeBatch>, ~batchSizeTarget) => {
82190
let batchSize = ref(0)
83191
let isFinished = ref(false)
192+
let mutBatchSizePerChain = Js.Dict.empty()
84193
let items = []
194+
let fetchStates = chainsBeforeBatch->ChainMap.map(chainBeforeBatch => chainBeforeBatch.fetchState)
85195

86196
while batchSize.contents < batchSizeTarget && !isFinished.contents {
87197
switch fetchStates->getOrderedNextChain(~batchSizePerChain=mutBatchSizePerChain) {
@@ -116,23 +226,30 @@ let prepareOrderedBatch = (
116226
}
117227
}
118228

119-
items
229+
{
230+
items,
231+
progressedChainsById: getProgressedChainsById(
232+
~chainsBeforeBatch,
233+
~batchSizePerChain=mutBatchSizePerChain,
234+
),
235+
}
120236
}
121237

122238
let prepareUnorderedBatch = (
239+
~chainsBeforeBatch: ChainMap.t<chainBeforeBatch>,
123240
~batchSizeTarget,
124-
~fetchStates: ChainMap.t<FetchState.t>,
125-
~mutBatchSizePerChain: dict<int>,
126241
) => {
127242
let preparedFetchStates =
128-
fetchStates
243+
chainsBeforeBatch
129244
->ChainMap.values
245+
->Js.Array2.map(chainBeforeBatch => chainBeforeBatch.fetchState)
130246
->FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)
131247

132248
let chainIdx = ref(0)
133249
let preparedNumber = preparedFetchStates->Array.length
134250
let batchSize = ref(0)
135251

252+
let mutBatchSizePerChain = Js.Dict.empty()
136253
let items = []
137254

138255
// Accumulate items for all actively indexing chains
@@ -156,5 +273,28 @@ let prepareUnorderedBatch = (
156273
chainIdx := chainIdx.contents + 1
157274
}
158275

159-
items
160-
}
276+
{
277+
items,
278+
progressedChainsById: getProgressedChainsById(
279+
~chainsBeforeBatch,
280+
~batchSizePerChain=mutBatchSizePerChain,
281+
),
282+
}
283+
}
284+
285+
let make = (
286+
~chainsBeforeBatch: ChainMap.t<chainBeforeBatch>,
287+
~multichain: InternalConfig.multichain,
288+
~batchSizeTarget,
289+
) => {
290+
if (
291+
switch multichain {
292+
| Unordered => true
293+
| Ordered => chainsBeforeBatch->ChainMap.size === 1
294+
}
295+
) {
296+
prepareUnorderedBatch(~chainsBeforeBatch, ~batchSizeTarget)
297+
} else {
298+
prepareOrderedBatch(~chainsBeforeBatch, ~batchSizeTarget)
299+
}
300+
}

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@ let mergeIntoPartition = (p: partition, ~target: partition, ~maxAddrInPartition)
9292

9393
let allowedAddressesNumber = ref(maxAddrInPartition)
9494

95-
target.addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
95+
target.addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
9696
allowedAddressesNumber := allowedAddressesNumber.contents - addresses->Array.length
9797
mergedAddresses->Js.Dict.set(contractName, addresses)
9898
})
9999

100100
// Start with putting all addresses to the merging dict
101101
// And if they exceed the limit, start removing from the merging dict
102102
// and putting into the rest dict
103-
p.addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
103+
p.addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
104104
allowedAddressesNumber := allowedAddressesNumber.contents - addresses->Array.length
105105
switch mergedAddresses->Utils.Dict.dangerouslyGetNonOption(contractName) {
106106
| Some(targetAddresses) =>
@@ -112,7 +112,7 @@ let mergeIntoPartition = (p: partition, ~target: partition, ~maxAddrInPartition)
112112
let rest = if allowedAddressesNumber.contents < 0 {
113113
let restAddresses = Js.Dict.empty()
114114

115-
mergedAddresses->Utils.Dict.forEachWithKey((contractName, addresses) => {
115+
mergedAddresses->Utils.Dict.forEachWithKey((addresses, contractName) => {
116116
if allowedAddressesNumber.contents === 0 {
117117
()
118118
} else if addresses->Array.length <= -allowedAddressesNumber.contents {
@@ -1153,7 +1153,7 @@ let rollbackPartition = (
11531153
})
11541154
| {addressesByContractName} =>
11551155
let rollbackedAddressesByContractName = Js.Dict.empty()
1156-
addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
1156+
addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
11571157
let keptAddresses =
11581158
addresses->Array.keep(address => !(addressesToRemove->Utils.Set.has(address)))
11591159
if keptAddresses->Array.length > 0 {

codegenerator/cli/npm/envio/src/PgStorage.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ let chunkArray = (arr: array<'a>, ~chunkSize) => {
325325
let removeInvalidUtf8InPlace = entities =>
326326
entities->Js.Array2.forEach(item => {
327327
let dict = item->(Utils.magic: 'a => dict<unknown>)
328-
dict->Utils.Dict.forEachWithKey((key, value) => {
328+
dict->Utils.Dict.forEachWithKey((value, key) => {
329329
if value->Js.typeof === "string" {
330330
let value = value->(Utils.magic: unknown => string)
331331
// We mutate here, since we don't care

codegenerator/cli/npm/envio/src/Utils.res

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -97,30 +97,66 @@ module Dict = {
9797
@val
9898
external mergeInPlace: (dict<'a>, dict<'a>) => dict<'a> = "Object.assign"
9999

100-
let map = (dict, fn) => {
101-
let newDict = Js.Dict.empty()
102-
let keys = dict->Js.Dict.keys
103-
for idx in 0 to keys->Js.Array2.length - 1 {
104-
let key = keys->Js.Array2.unsafe_get(idx)
105-
newDict->Js.Dict.set(key, fn(dict->Js.Dict.unsafeGet(key)))
100+
// Use %raw to support for..in which is a ~10% faster than .forEach
101+
let mapValues: (dict<'a>, 'a => 'b) => dict<'b> = %raw(`(dict, f) => {
102+
var target = {}, i;
103+
for (i in dict) {
104+
target[i] = f(dict[i]);
106105
}
107-
newDict
108-
}
106+
return target;
107+
}`)
109108

110-
let forEach = (dict, fn) => {
111-
let keys = dict->Js.Dict.keys
112-
for idx in 0 to keys->Js.Array2.length - 1 {
113-
fn(dict->Js.Dict.unsafeGet(keys->Js.Array2.unsafe_get(idx)))
109+
// Use %raw to support for..in which is a ~10% faster than .forEach
110+
let filterMapValues: (dict<'a>, 'a => option<'b>) => dict<'b> = %raw(`(dict, f) => {
111+
var target = {}, i, v;
112+
for (i in dict) {
113+
v = f(dict[i]);
114+
if (v !== undefined) {
115+
target[i] = v;
116+
}
114117
}
115-
}
118+
return target;
119+
}`)
116120

117-
let forEachWithKey = (dict, fn) => {
118-
let keys = dict->Js.Dict.keys
119-
for idx in 0 to keys->Js.Array2.length - 1 {
120-
let key = keys->Js.Array2.unsafe_get(idx)
121-
fn(key, dict->Js.Dict.unsafeGet(key))
121+
// Use %raw to support for..in which is a ~10% faster than .forEach
122+
let mapValuesToArray: (dict<'a>, 'a => 'b) => array<'b> = %raw(`(dict, f) => {
123+
var target = [], i;
124+
for (i in dict) {
125+
target.push(f(dict[i]));
122126
}
123-
}
127+
return target;
128+
}`)
129+
130+
// Use %raw to support for..in which is a ~10% faster than .forEach
131+
let forEach: (dict<'a>, 'a => unit) => unit = %raw(`(dict, f) => {
132+
for (var i in dict) {
133+
f(dict[i]);
134+
}
135+
}`)
136+
137+
// Use %raw to support for..in which is a ~10% faster than .forEach
138+
let forEachWithKey: (dict<'a>, ('a, string) => unit) => unit = %raw(`(dict, f) => {
139+
for (var i in dict) {
140+
f(dict[i], i);
141+
}
142+
}`)
143+
144+
// Use %raw to support for..in which is a ~10% faster than Object.keys
145+
let size: dict<'a> => int = %raw(`(dict) => {
146+
var size = 0, i;
147+
for (i in dict) {
148+
size++;
149+
}
150+
return size;
151+
}`)
152+
153+
// Use %raw to support for..in which is a 2x faster than Object.keys
154+
let isEmpty: dict<'a> => bool = %raw(`(dict) => {
155+
for (var _ in dict) {
156+
return false
157+
}
158+
return true
159+
}`)
124160

125161
let deleteInPlace: (dict<'a>, string) => unit = %raw(`(dict, key) => {
126162
delete dict[key];
@@ -135,8 +171,6 @@ module Dict = {
135171

136172
let shallowCopy: dict<'a> => dict<'a> = %raw(`(dict) => ({...dict})`)
137173

138-
let size = dict => dict->Js.Dict.keys->Js.Array2.length
139-
140174
@set_index
141175
external setByInt: (dict<'a>, int, 'a) => unit = ""
142176

0 commit comments

Comments
 (0)