|
1 | 1 | package builder
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "context" |
4 | 5 | "errors"
|
| 6 | + "golang.org/x/time/rate" |
5 | 7 | "math/big"
|
6 | 8 | _ "os"
|
7 | 9 | "sync"
|
@@ -44,66 +46,53 @@ type IBuilder interface {
|
44 | 46 | }
|
45 | 47 |
|
46 | 48 | type Builder struct {
|
47 |
| - ds flashbotsextra.IDatabaseService |
48 |
| - beaconClient IBeaconClient |
49 |
| - relay IRelay |
50 |
| - eth IEthereumService |
51 |
| - resubmitter Resubmitter |
52 |
| - blockSubmissionRateLimiter *BlockSubmissionRateLimiter |
53 |
| - builderSecretKey *bls.SecretKey |
54 |
| - builderPublicKey boostTypes.PublicKey |
55 |
| - builderSigningDomain boostTypes.Domain |
56 |
| - |
57 |
| - bestMu sync.Mutex |
58 |
| - bestAttrs BuilderPayloadAttributes |
59 |
| - bestBlockProfit *big.Int |
| 49 | + ds flashbotsextra.IDatabaseService |
| 50 | + relay IRelay |
| 51 | + eth IEthereumService |
| 52 | + builderSecretKey *bls.SecretKey |
| 53 | + builderPublicKey boostTypes.PublicKey |
| 54 | + builderSigningDomain boostTypes.Domain |
| 55 | + |
| 56 | + limiter *rate.Limiter |
| 57 | + |
| 58 | + slotMu sync.Mutex |
| 59 | + slot uint64 |
| 60 | + slotAttrs []BuilderPayloadAttributes |
| 61 | + slotCtx context.Context |
| 62 | + slotCtxCancel context.CancelFunc |
60 | 63 | }
|
61 | 64 |
|
62 |
| -func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { |
| 65 | +func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { |
63 | 66 | pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
|
64 | 67 | pk := boostTypes.PublicKey{}
|
65 | 68 | pk.FromSlice(pkBytes)
|
66 | 69 |
|
| 70 | + slotCtx, slotCtxCancel := context.WithCancel(context.Background()) |
67 | 71 | return &Builder{
|
68 |
| - ds: ds, |
69 |
| - beaconClient: bc, |
70 |
| - relay: relay, |
71 |
| - eth: eth, |
72 |
| - resubmitter: Resubmitter{}, |
73 |
| - blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(), |
74 |
| - builderSecretKey: sk, |
75 |
| - builderPublicKey: pk, |
76 |
| - |
| 72 | + ds: ds, |
| 73 | + relay: relay, |
| 74 | + eth: eth, |
| 75 | + builderSecretKey: sk, |
| 76 | + builderPublicKey: pk, |
77 | 77 | builderSigningDomain: builderSigningDomain,
|
78 |
| - bestBlockProfit: big.NewInt(0), |
| 78 | + |
| 79 | + limiter: rate.NewLimiter(rate.Every(time.Second), 1), |
| 80 | + slot: 0, |
| 81 | + slotCtx: slotCtx, |
| 82 | + slotCtxCancel: slotCtxCancel, |
79 | 83 | }
|
80 | 84 | }
|
81 | 85 |
|
82 | 86 | func (b *Builder) Start() error {
|
83 |
| - b.blockSubmissionRateLimiter.Start() |
84 | 87 | return nil
|
85 | 88 | }
|
86 | 89 |
|
87 | 90 | func (b *Builder) Stop() error {
|
88 |
| - b.blockSubmissionRateLimiter.Stop() |
89 | 91 | return nil
|
90 | 92 | }
|
91 | 93 |
|
92 | 94 | func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error {
|
93 |
| - b.bestMu.Lock() |
94 |
| - defer b.bestMu.Unlock() |
95 |
| - |
96 |
| - // Do not submit blocks that don't improve the profit |
97 |
| - if b.bestAttrs != *attrs { |
98 |
| - b.bestAttrs = *attrs |
99 |
| - b.bestBlockProfit.SetInt64(0) |
100 |
| - } else { |
101 |
| - if block.Profit.Cmp(b.bestBlockProfit) <= 0 { |
102 |
| - log.Info("Ignoring block that is not improving the profit") |
103 |
| - return nil |
104 |
| - } |
105 |
| - } |
106 |
| - |
| 95 | + start := time.Now() |
107 | 96 | executableData := beacon.BlockToExecutableData(block)
|
108 | 97 | payload, err := executableDataToExecutionPayload(executableData)
|
109 | 98 | if err != nil {
|
@@ -152,9 +141,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun
|
152 | 141 | log.Info("could submit block", "bundles", len(bundles))
|
153 | 142 | }
|
154 | 143 |
|
155 |
| - log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg) |
| 144 | + log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg, "time", time.Since(start)) |
156 | 145 |
|
157 |
| - b.bestBlockProfit.Set(block.Profit) |
158 | 146 | return nil
|
159 | 147 | }
|
160 | 148 |
|
@@ -188,29 +176,100 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error {
|
188 | 176 | return errors.New("parent block not found in blocktree")
|
189 | 177 | }
|
190 | 178 |
|
191 |
| - blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { |
192 |
| - select { |
193 |
| - case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block): |
194 |
| - if !shouldSubmit { |
195 |
| - log.Info("Block rate limited", "blochHash", block.Hash()) |
196 |
| - return |
| 179 | + b.slotMu.Lock() |
| 180 | + defer b.slotMu.Unlock() |
| 181 | + |
| 182 | + if b.slot != attrs.Slot { |
| 183 | + if b.slotCtxCancel != nil { |
| 184 | + b.slotCtxCancel() |
| 185 | + } |
| 186 | + |
| 187 | + slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) |
| 188 | + b.slot = attrs.Slot |
| 189 | + b.slotAttrs = nil |
| 190 | + b.slotCtx = slotCtx |
| 191 | + b.slotCtxCancel = slotCtxCancel |
| 192 | + } |
| 193 | + |
| 194 | + for _, currentAttrs := range b.slotAttrs { |
| 195 | + if *attrs == currentAttrs { |
| 196 | + log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash) |
| 197 | + return nil |
| 198 | + } |
| 199 | + } |
| 200 | + b.slotAttrs = append(b.slotAttrs, *attrs) |
| 201 | + |
| 202 | + go b.runBuildingJob(b.slotCtx, proposerPubkey, vd.FeeRecipient, attrs) |
| 203 | + return nil |
| 204 | +} |
| 205 | + |
| 206 | +func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTypes.PublicKey, feeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) { |
| 207 | + ctx, cancel := context.WithTimeout(slotCtx, 12*time.Second) |
| 208 | + defer cancel() |
| 209 | + |
| 210 | + // Submission queue for the given payload attributes |
| 211 | + // multiple jobs can run for different attributes fot the given slot |
| 212 | + // 1. When new block is ready we check if its profit is higher than profit of last best block |
| 213 | + // if it is we set queueBest* to values of the new block and notify queueSignal channel. |
| 214 | + // 2. Submission goroutine waits for queueSignal and submits queueBest* if its more valuable than |
| 215 | + // queueLastSubmittedProfit keeping queueLastSubmittedProfit to be the profit of the last submission. |
| 216 | + // Submission goroutine is globally rate limited to have fixed rate of submissions for all jobs. |
| 217 | + var ( |
| 218 | + queueSignal = make(chan struct{}, 1) |
| 219 | + |
| 220 | + queueMu sync.Mutex |
| 221 | + queueLastSubmittedProfit = new(big.Int) |
| 222 | + queueBestProfit = new(big.Int) |
| 223 | + queueBestBlock *types.Block |
| 224 | + queueBestBundles []types.SimulatedBundle |
| 225 | + ) |
| 226 | + |
| 227 | + log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash) |
| 228 | + |
| 229 | + submitBestBlock := func() { |
| 230 | + queueMu.Lock() |
| 231 | + if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 { |
| 232 | + err := b.onSealedBlock(queueBestBlock, queueBestBundles, proposerPubkey, feeRecipient, attrs) |
| 233 | + if err != nil { |
| 234 | + log.Error("could not run sealed block hook", "err", err) |
| 235 | + } else { |
| 236 | + queueLastSubmittedProfit.Set(queueBestProfit) |
197 | 237 | }
|
198 |
| - case <-time.After(200 * time.Millisecond): |
199 |
| - log.Info("Block rate limit timeout, submitting the block anyway") |
200 | 238 | }
|
| 239 | + queueMu.Unlock() |
| 240 | + } |
201 | 241 |
|
202 |
| - err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs) |
203 |
| - if err != nil { |
204 |
| - log.Error("could not run sealed block hook", "err", err) |
| 242 | + // Empties queue, submits the best block for current job with rate limit (global for all jobs) |
| 243 | + go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock) |
| 244 | + |
| 245 | + // Populates queue with submissions that increase block profit |
| 246 | + blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { |
| 247 | + if ctx.Err() != nil { |
| 248 | + return |
| 249 | + } |
| 250 | + |
| 251 | + queueMu.Lock() |
| 252 | + defer queueMu.Unlock() |
| 253 | + if block.Profit.Cmp(queueBestProfit) > 0 { |
| 254 | + queueBestBlock = block |
| 255 | + queueBestBundles = bundles |
| 256 | + queueBestProfit.Set(block.Profit) |
| 257 | + |
| 258 | + select { |
| 259 | + case queueSignal <- struct{}{}: |
| 260 | + default: |
| 261 | + } |
205 | 262 | }
|
206 | 263 | }
|
207 | 264 |
|
208 |
| - firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error { |
209 |
| - log.Info("Resubmitting build job") |
210 |
| - return b.eth.BuildBlock(attrs, blockHook) |
| 265 | + // resubmits block builder requests every second |
| 266 | + runRetryLoop(ctx, time.Second, func() { |
| 267 | + log.Debug("retrying BuildBlock", "slot", attrs.Slot, "parent", attrs.HeadHash) |
| 268 | + err := b.eth.BuildBlock(attrs, blockHook) |
| 269 | + if err != nil { |
| 270 | + log.Warn("Failed to build block", "err", err) |
| 271 | + } |
211 | 272 | })
|
212 |
| - |
213 |
| - return firstBlockResult |
214 | 273 | }
|
215 | 274 |
|
216 | 275 | func executableDataToExecutionPayload(data *beacon.ExecutableDataV1) (*boostTypes.ExecutionPayload, error) {
|
|
0 commit comments