Skip to content

Commit 1c2ca56

Browse files
Updates for long range sync (#1202)
* test prod run * prod tests * fixes * fixes * draft migration * test fixes * fixes * fixes * scripts * progress * fixes * cleanup * further cleanup * use config for batch size
1 parent 2c37cb6 commit 1c2ca56

14 files changed

+450
-199
lines changed

relayer/cmd/generate_beacon_data.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func generateBeaconCheckpointCmd() *cobra.Command {
5656
}
5757

5858
cmd.Flags().String("config", "/tmp/snowbridge/beacon-relay.json", "Path to the beacon relay config")
59+
cmd.Flags().Uint64("finalized-slot", 0, "Optional finalized slot to create checkpoint at")
5960
cmd.Flags().Bool("export-json", false, "Export Json")
6061

6162
return cmd
@@ -101,6 +102,7 @@ func generateBeaconCheckpoint(cmd *cobra.Command, _ []string) error {
101102
if err != nil {
102103
return err
103104
}
105+
finalizedSlot, _ := cmd.Flags().GetUint64("finalized-slot")
104106

105107
viper.SetConfigFile(config)
106108

@@ -122,7 +124,13 @@ func generateBeaconCheckpoint(cmd *cobra.Command, _ []string) error {
122124
client := api.NewBeaconClient(conf.Source.Beacon.Endpoint, conf.Source.Beacon.StateEndpoint)
123125
s := syncer.New(client, &store, p)
124126

125-
checkPointScale, err := s.GetCheckpoint()
127+
var checkPointScale scale.BeaconCheckpoint
128+
if finalizedSlot == 0 {
129+
checkPointScale, err = s.GetCheckpoint()
130+
} else {
131+
checkPointScale, err = s.GetCheckpointAtSlot(finalizedSlot)
132+
}
133+
126134
if err != nil {
127135
return fmt.Errorf("get initial sync: %w", err)
128136
}

relayer/cmd/import_beacon_state.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ func importBeaconState(cmd *cobra.Command, _ []string) error {
105105
attestedSlot := attestedState.GetSlot()
106106
finalizedSlot := finalizedState.GetSlot()
107107

108+
err = syncer.ValidatePair(finalizedSlot, attestedSlot, attestedState)
109+
if err != nil {
110+
return fmt.Errorf("state pair validation failed: %w", err)
111+
}
112+
108113
err = store.WriteEntry(attestedSlot, finalizedSlot, attestedData, finalizedData)
109114
if err != nil {
110115
return fmt.Errorf("write beacon store entry: %w", err)

relayer/contracts/beefy_client.go

Lines changed: 136 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relayer/contracts/gateway.go

Lines changed: 1 addition & 145 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relayer/relays/beacon/header/header.go

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,27 @@ import (
2323

2424
var ErrFinalizedHeaderUnchanged = errors.New("finalized header unchanged")
2525
var ErrFinalizedHeaderNotImported = errors.New("finalized header not imported")
26+
var ErrInterimHeaderNotImported = errors.New("interim finalized header not imported")
2627
var ErrSyncCommitteeNotImported = errors.New("sync committee not imported")
2728
var ErrSyncCommitteeLatency = errors.New("sync committee latency found")
2829
var ErrExecutionHeaderNotImported = errors.New("execution header not imported")
2930
var ErrBeaconHeaderNotFinalized = errors.New("beacon header not finalized")
3031

3132
type Header struct {
32-
cache *cache.BeaconCache
33-
writer parachain.ChainWriter
34-
syncer *syncer.Syncer
35-
protocol *protocol.Protocol
33+
cache *cache.BeaconCache
34+
writer parachain.ChainWriter
35+
syncer *syncer.Syncer
36+
protocol *protocol.Protocol
37+
batchCallSize uint64
3638
}
3739

38-
func New(writer parachain.ChainWriter, client api.BeaconAPI, setting config.SpecSettings, store store.BeaconStore, protocol *protocol.Protocol) Header {
40+
func New(writer parachain.ChainWriter, client api.BeaconAPI, setting config.SpecSettings, store store.BeaconStore, protocol *protocol.Protocol, batchCallSize uint64) Header {
3941
return Header{
40-
cache: cache.New(setting.SlotsInEpoch, setting.EpochsPerSyncCommitteePeriod),
41-
writer: writer,
42-
syncer: syncer.New(client, store, protocol),
43-
protocol: protocol,
42+
cache: cache.New(setting.SlotsInEpoch, setting.EpochsPerSyncCommitteePeriod),
43+
writer: writer,
44+
syncer: syncer.New(client, store, protocol),
45+
protocol: protocol,
46+
batchCallSize: batchCallSize,
4447
}
4548
}
4649

@@ -70,6 +73,7 @@ func (h *Header) Sync(ctx context.Context, eg *errgroup.Group) error {
7073
// Special handling here for the initial checkpoint to sync the next sync committee which is not included in initial
7174
// checkpoint.
7275
if h.isInitialSyncPeriod() {
76+
log.Info("syncing next sync committee for initial checkpoint")
7377
err = h.SyncCommitteePeriodUpdate(ctx, latestSyncedPeriod)
7478
if err != nil {
7579
return fmt.Errorf("sync next committee for initial sync period: %w", err)
@@ -135,16 +139,27 @@ func (h *Header) SyncCommitteePeriodUpdate(ctx context.Context, period uint64) e
135139
// finalized header
136140
if uint64(update.Payload.FinalizedHeader.Slot) > h.cache.Finalized.LastSyncedSlot {
137141
diff := uint64(update.Payload.FinalizedHeader.Slot) - h.cache.Finalized.LastSyncedSlot
138-
log.WithFields(log.Fields{"diff": diff, "last_finalized_slot": h.cache.Finalized.LastSyncedSlot, "new_finalized_slot": uint64(update.Payload.FinalizedHeader.Slot)}).Info("checking max latency")
139-
if diff > h.protocol.Settings.SlotsInEpoch*h.protocol.Settings.EpochsPerSyncCommitteePeriod {
140-
log.Info("syncing an interim update")
141-
err = h.syncInterimFinalizedUpdate(ctx, h.cache.Finalized.LastSyncedSlot, uint64(update.Payload.FinalizedHeader.Slot))
142+
minSlot := h.cache.Finalized.LastSyncedSlot
143+
for diff > h.protocol.Settings.SlotsInEpoch*h.protocol.Settings.EpochsPerSyncCommitteePeriod {
144+
log.WithFields(log.Fields{
145+
"diff": diff,
146+
"last_finalized_slot": h.cache.Finalized.LastSyncedSlot,
147+
"new_finalized_slot": uint64(update.Payload.FinalizedHeader.Slot),
148+
}).Info("interim update required")
149+
150+
interimUpdate, err := h.syncInterimFinalizedUpdate(ctx, minSlot, uint64(update.Payload.FinalizedHeader.Slot))
142151
if err != nil {
143152
return fmt.Errorf("sync interim finalized header update: %w", err)
144153
}
154+
155+
diff = uint64(update.Payload.FinalizedHeader.Slot) - uint64(interimUpdate.Payload.FinalizedHeader.Slot)
156+
minSlot = uint64(update.Payload.FinalizedHeader.Slot) + h.protocol.Settings.SlotsInEpoch
157+
log.WithFields(log.Fields{
158+
"new_diff": diff,
159+
"interim_finalized_slot": uint64(interimUpdate.Payload.FinalizedHeader.Slot),
160+
"new_finalized_slot": uint64(update.Payload.FinalizedHeader.Slot),
161+
}).Info("interim update synced successfully")
145162
}
146-
} else {
147-
log.Info("interim update not required")
148163
}
149164

150165
log.WithFields(log.Fields{
@@ -290,6 +305,7 @@ func (h *Header) SyncExecutionHeaders(ctx context.Context) error {
290305
for currentSlot <= toSlot {
291306
log.WithFields(log.Fields{
292307
"currentSlot": currentSlot,
308+
"remaining": toSlot - currentSlot,
293309
}).Info("fetching next header at slot")
294310

295311
var nextHeaderUpdate scale.HeaderUpdatePayload
@@ -307,42 +323,57 @@ func (h *Header) SyncExecutionHeaders(ctx context.Context) error {
307323

308324
headersToSync = append(headersToSync, headerUpdate)
309325
// last slot to be synced, sync headers
310-
if currentSlot >= toSlot {
326+
if currentSlot%h.batchCallSize == 0 || currentSlot >= toSlot {
327+
slotsToSync := []uint64{}
328+
for _, h := range headersToSync {
329+
slotsToSync = append(slotsToSync, uint64(h.Header.Slot))
330+
}
331+
log.WithFields(log.Fields{
332+
"slotsToSync": slotsToSync,
333+
}).Info("syncing batch of headers")
311334
err = h.batchSyncHeaders(ctx, headersToSync)
312335
if err != nil {
313336
return fmt.Errorf("batch sync headers failed: %w", err)
314337
}
338+
339+
// waiting for all batch calls to be executed on chain
340+
err = h.waitingForBatchCallFinished(slotsToSync[len(slotsToSync)-1])
341+
if err != nil {
342+
return err
343+
}
344+
345+
headersToSync = []scale.HeaderUpdatePayload{}
315346
}
316347
headerUpdate = nextHeaderUpdate
317348
currentSlot = uint64(headerUpdate.Header.Slot)
318349
}
319-
// waiting for all batch calls to be executed on chain
320-
err = h.waitingForBatchCallFinished(toSlot)
321-
if err != nil {
322-
return err
323-
}
324-
h.cache.SetLastSyncedExecutionSlot(toSlot)
350+
325351
return nil
326352
}
327353

328-
func (h *Header) syncInterimFinalizedUpdate(ctx context.Context, lastSyncedSlot, newCheckpointSlot uint64) error {
354+
func (h *Header) syncInterimFinalizedUpdate(ctx context.Context, lastSyncedSlot, newCheckpointSlot uint64) (scale.Update, error) {
355+
currentPeriod := h.protocol.ComputeSyncPeriodAtSlot(lastSyncedSlot)
356+
329357
// Calculate the range that the interim finalized header update may be in
330358
minSlot := newCheckpointSlot - h.protocol.SlotsPerHistoricalRoot
331-
maxSlot := lastSyncedSlot + h.protocol.SlotsPerHistoricalRoot
359+
maxSlot := ((currentPeriod + 1) * h.protocol.SlotsPerHistoricalRoot) - h.protocol.Settings.SlotsInEpoch // just before the new sync committee boundary
332360

333361
finalizedUpdate, err := h.syncer.GetFinalizedUpdateAtAttestedSlot(minSlot, maxSlot, false)
334362
if err != nil {
335-
return fmt.Errorf("get interim checkpoint to update chain (last synced slot %d, new slot: %d): %w", lastSyncedSlot, newCheckpointSlot, err)
363+
return scale.Update{}, fmt.Errorf("get interim checkpoint to update chain (last synced slot %d, new slot: %d): %w", lastSyncedSlot, newCheckpointSlot, err)
336364
}
337365

338366
log.WithField("slot", finalizedUpdate.Payload.FinalizedHeader.Slot).Info("syncing an interim update to on-chain")
339367

340368
err = h.updateFinalizedHeaderOnchain(ctx, finalizedUpdate)
341-
if err != nil {
342-
return fmt.Errorf("update interim finalized header on-chain: %w", err)
369+
switch {
370+
case errors.Is(err, ErrFinalizedHeaderNotImported):
371+
return scale.Update{}, ErrInterimHeaderNotImported
372+
case err != nil:
373+
return scale.Update{}, fmt.Errorf("update interim finalized header on-chain: %w", err)
343374
}
344375

345-
return nil
376+
return finalizedUpdate, nil
346377
}
347378

348379
func (h *Header) syncLaggingSyncCommitteePeriods(ctx context.Context, latestSyncedPeriod, currentSyncPeriod uint64) error {
@@ -515,6 +546,7 @@ func (h *Header) waitingForBatchCallFinished(toSlot uint64) error {
515546
if err != nil {
516547
return fmt.Errorf("fetch last execution hash: %w", err)
517548
}
549+
h.cache.SetLastSyncedExecutionSlot(executionHeaderState.BeaconSlot)
518550
if executionHeaderState.BeaconSlot == toSlot {
519551
batchCallFinished = true
520552
break

relayer/relays/beacon/header/header_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ func TestSyncInterimFinalizedUpdate_WithDataFromAPI(t *testing.T) {
6565
settings,
6666
&beaconStore,
6767
p,
68+
8,
6869
)
6970

7071
// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
71-
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
72+
_, err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
7273
require.NoError(t, err)
7374
}
7475

@@ -128,10 +129,11 @@ func TestSyncInterimFinalizedUpdate_WithDataFromStore(t *testing.T) {
128129
settings,
129130
&beaconStore,
130131
p,
132+
8,
131133
)
132134

133135
// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
134-
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
136+
_, err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
135137
require.NoError(t, err)
136138
}
137139

@@ -193,10 +195,11 @@ func TestSyncInterimFinalizedUpdate_WithDataFromStoreWithDifferentBlocks(t *test
193195
settings,
194196
&beaconStore,
195197
p,
198+
8,
196199
)
197200

198201
// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
199-
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
202+
_, err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
200203
require.NoError(t, err)
201204
}
202205

@@ -238,10 +241,11 @@ func TestSyncInterimFinalizedUpdate_BeaconStateNotAvailableInAPIAndStore(t *test
238241
settings,
239242
&beaconStore,
240243
p,
244+
8,
241245
)
242246

243247
// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
244-
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
248+
_, err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
245249
require.Error(t, err)
246250
}
247251

@@ -276,9 +280,10 @@ func TestSyncInterimFinalizedUpdate_NoValidBlocksFound(t *testing.T) {
276280
settings,
277281
&beaconStore,
278282
p,
283+
8,
279284
)
280285

281286
// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
282-
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
287+
_, err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
283288
require.Errorf(t, err, "cannot find blocks at boundaries")
284289
}

0 commit comments

Comments
 (0)