Skip to content

Commit 0962c65

Browse files
committed
fix: randomize block refresh to try to reduce overhead, dilate block retry time
1 parent 2ada4b5 commit 0962c65

File tree

4 files changed

+10
-6
lines changed

4 files changed

+10
-6
lines changed

codex/blockexchange/engine/engine.nim

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import std/sets
1212
import std/options
1313
import std/algorithm
1414
import std/sugar
15+
import std/random
1516

1617
import pkg/chronos
1718
import pkg/libp2p/[cid, switch, multihash, multicodec]
@@ -199,7 +200,6 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
199200

200201
# In dynamic swarms, staleness will dominate latency.
201202
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
202-
trace "Refreshing block knowledge for peer", peer = peer.id
203203
peer.refreshRequested()
204204
# TODO: optimize this by keeping track of what was sent and sending deltas.
205205
# This should allow us to run much more frequent refreshes, and be way more
@@ -269,8 +269,9 @@ proc downloadInternal(
269269

270270
# We now wait for a bit and then retry. If the handle gets completed in the
271271
# meantime (cause the presence handler might have requested the block and
272-
# received it in the meantime), we are done.
273-
await handle or sleepAsync(self.pendingBlocks.retryInterval)
272+
# received it in the meantime), we are done. Retry delays are randomized
273+
# so we don't get all block loops spinning at the same time.
274+
await handle or sleepAsync(secs(rand(self.pendingBlocks.retryInterval.secs)))
274275
if handle.finished:
275276
break
276277
# If we still don't have the block, we'll go for another cycle.
@@ -484,6 +485,9 @@ proc cancelBlocks(
484485
# If so, schedules a cancellation.
485486
scheduledCancellations[peerCtx.id] = intersection
486487

488+
if scheduledCancellations.len == 0:
489+
return
490+
487491
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
488492
toSeq(scheduledCancellations.pairs).map(dispatchCancellations)
489493
)

codex/blockexchange/engine/pendingblocks.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ declareGauge(
3434

3535
const
3636
DefaultBlockRetries* = 3000
37-
DefaultRetryInterval* = 1.seconds
37+
DefaultRetryInterval* = 10.seconds
3838

3939
type
4040
RetriesExhaustedError* = object of CatchableError

codex/blockexchange/peers/peerctxstore.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
7878
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
7979
var res: PeersForBlock = (@[], @[])
8080
for peer in self:
81-
if address in peer.peerHave:
81+
if address in peer:
8282
res.with.add(peer)
8383
else:
8484
res.without.add(peer)

tests/codex/blockexchange/engine/testblockexc.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,4 @@ asyncchecksuite "NetworkStore - dissemination":
213213
await nodes.linearTopology()
214214

215215
let downloads = nodes.mapIt(downloadDataset(it, dataset))
216-
await allFuturesThrowing(downloads).wait(20.seconds)
216+
await allFuturesThrowing(downloads).wait(30.seconds)

0 commit comments

Comments
 (0)