Skip to content

Commit bb4cb2a

Browse files
committed
fix: handle send timeouts in bes_pipe and bes_backend
1 parent 0f9c7bd commit bb4cb2a

File tree

4 files changed

+129
-50
lines changed

4 files changed

+129
-50
lines changed

pkg/plugin/system/bep/bes_backend.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"os"
2727
"strings"
2828
"sync"
29+
"time"
2930

3031
"github.com/fatih/color"
3132
"github.com/golang/protobuf/ptypes/empty"
@@ -176,7 +177,7 @@ func (bb *besBackend) GracefulStop() {
176177
}
177178

178179
// Addr returns the address for the gRPC server. Since the address is determined
179-
// by the OS based on an available port at the time the gRPC server starts, this
180+
// by the OS based on available port at the time the gRPC server starts, this
180181
// method returns the address to be used to construct the `bes_backend` flag
181182
// passed to the `bazel (build|test|run)` commands. The address includes the
182183
// scheme (protocol).
@@ -232,7 +233,7 @@ func (bb *besBackend) PublishLifecycleEvent(
232233
broadCastEvent := func(ctx context.Context, req *buildv1.PublishLifecycleEventRequest) {
233234
eg, ctx := errgroup.WithContext(ctx)
234235
for _, p := range bb.besProxies {
235-
if !p.Healthy() {
236+
if !p.IsHealthy() {
236237
continue
237238
}
238239

@@ -297,7 +298,7 @@ func (bb *besBackend) SendEventsToSubscribers(c <-chan *buildv1.PublishBuildTool
297298
}
298299
}
299300

300-
func (bb *besBackend) setupBesUpstreamBackends(ctx context.Context, optionsparsed *buildeventstream.OptionsParsed) error {
301+
func (bb *besBackend) setupBesUpstreamBackends(ctx context.Context, optionsParsed *buildeventstream.OptionsParsed) error {
301302
defer close(bb.ready)
302303

303304
backends := []string{}
@@ -308,7 +309,7 @@ func (bb *besBackend) setupBesUpstreamBackends(ctx context.Context, optionsparse
308309
hasNoWaitForUpload := false
309310

310311
// Parse backends first to build up that map, then parse headers
311-
for _, arg := range optionsparsed.CmdLine {
312+
for _, arg := range optionsParsed.CmdLine {
312313
if strings.HasPrefix(arg, "--bes_backend=") {
313314
// Always skip our bes_backend to avoid recursive uploads.
314315
if arg == fmt.Sprintf("--bes_backend=%s", bb.Addr()) {
@@ -448,7 +449,7 @@ func (bb *besBackend) PublishBuildToolEventStream(
448449
switch event.Id.Id.(type) {
449450
case *buildeventstream.BuildEventId_OptionsParsed:
450451
// Received options event, setup bes upstream backends based off commandline arguments bazel reported.
451-
// setup upstream backends async to prevent bazel client from waiting for upstream bes connections.
452+
// setup upstream backends async to prevent bazel to waiting for upstream bes connections.
452453
eg.Go(func() error {
453454
err = bb.setupBesUpstreamBackends(egCtx, event.GetOptionsParsed())
454455
if err != nil {
@@ -496,14 +497,14 @@ func (bb *besBackend) PublishBuildToolEventStream(
496497
<-bb.ready
497498
// Goroutines to receive acks from BES proxies
498499
for _, bp := range bb.besProxies {
499-
if !bp.Healthy() {
500+
if !bp.IsHealthy() {
500501
continue
501502
}
502503
proxy := bp // make a copy of the BESProxy before passing into the go-routine below.
503504
eg.Go(func() error {
504505
for {
505506
// If the proxy is not healthy, break out of the loop
506-
if !proxy.Healthy() {
507+
if !proxy.IsHealthy() {
507508
break
508509
}
509510
_, err := proxy.Recv()
@@ -527,19 +528,46 @@ func (bb *besBackend) PublishBuildToolEventStream(
527528
// Goroutine to forward to build event to BES proxies
528529
eg.Go(func() error {
529530
for fwd := range fwdChanRead {
531+
egFwd := errgroup.Group{}
532+
530533
for _, bp := range bb.besProxies {
531-
if !bp.Healthy() {
532-
continue
533-
}
534-
err := bp.Send(fwd)
535-
if err != nil {
536-
// If we fail to send to a proxy then print out an error but don't fail the GRPC call
537-
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
538-
}
534+
bp := bp // capture
535+
egFwd.Go(func() error {
536+
if !bp.IsHealthy() {
537+
return nil
538+
}
539+
540+
// Channel for Send result
541+
sendCh := make(chan error, 1)
542+
543+
// Run Send in goroutine
544+
go func() {
545+
err := bp.Send(fwd)
546+
sendCh <- err
547+
}()
548+
549+
// Wait for Send or timeout
550+
select {
551+
case err := <-sendCh:
552+
if err != nil {
553+
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
554+
bp.MarkUnhealthy()
555+
}
556+
return nil
557+
case <-time.After(sendTimeout):
558+
fmt.Fprintf(os.Stderr, "Timeout sending build event to %v: marking unhealthy\n", bp.Host())
559+
bp.MarkUnhealthy()
560+
return nil
561+
}
562+
})
563+
}
564+
565+
if err := egFwd.Wait(); err != nil {
566+
// Optionally handle errors from sends, but since we log inside, perhaps no need to propagate
539567
}
540568
}
541569
for _, bp := range bb.besProxies {
542-
if !bp.Healthy() {
570+
if !bp.IsHealthy() {
543571
continue
544572
}
545573
if err := bp.CloseSend(); err != nil {

pkg/plugin/system/bep/bes_backend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ func TestPublishBuildToolEventStream(t *testing.T) {
500500

501501
besProxy.
502502
EXPECT().
503-
Healthy().
503+
IsHealthy().
504504
Return(true).
505505
Times(5)
506506
besProxy.

pkg/plugin/system/bep/bes_pipe.go

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type BESPipeInterceptor interface {
4949
const besEventGlobalTimeoutDuration = 5 * time.Minute
5050
const besEventThrottleDuration = 50 * time.Millisecond
5151
const gracePeriodDuration = 2 * time.Second
52+
const sendTimeout = 10 * time.Second // Timeout for individual Send calls to detect backed-up receivers
5253

5354
func NewBESPipe(buildId, invocationId string) (BESPipeInterceptor, error) {
5455
return &besPipe{
@@ -72,6 +73,9 @@ type besPipe struct {
7273
besInvocationId string
7374
besProxies []besproxy.BESProxy
7475

76+
// Track whether we have already unlinked the pipe due to backend failure
77+
pipeAborted sync.Once
78+
7579
wg *sync.WaitGroup
7680
}
7781

@@ -101,7 +105,7 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) {
101105
go func() {
102106
for {
103107
// If the proxy is not healthy, break out of the loop
104-
if !p.Healthy() {
108+
if !p.IsHealthy() {
105109
break
106110
}
107111
_, err := p.Recv()
@@ -116,6 +120,8 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) {
116120
break
117121
}
118122
}
123+
// When the ACK goroutine exits (usually because of error), check if we should abort the pipe
124+
bb.maybeAbortPipeBecauseNoHealthyBackends()
119125
}()
120126
}
121127

@@ -156,32 +162,35 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
156162
bb.wg.Add(1)
157163
go func() {
158164
defer bb.wg.Done()
165+
166+
// If the overall context is cancelled, abort the pipe immediately
167+
go func() {
168+
<-ctx.Done()
169+
bb.maybeAbortPipeBecauseNoHealthyBackends()
170+
}()
171+
159172
conn, err := os.OpenFile(bb.bepBinPath, os.O_RDONLY, os.ModeNamedPipe)
160173
if err != nil {
161174
bb.errorsMutex.Lock()
162-
defer bb.errorsMutex.Unlock()
163175
bb.errors.Insert(fmt.Errorf("failed to accept connection on BES pipe %s: %w", bb.bepBinPath, err))
176+
bb.errorsMutex.Unlock()
164177
return
165178
}
166-
167-
defer func() {
168-
conn.Close()
169-
}()
179+
defer conn.Close()
170180

171181
if err := bb.streamBesEvents(ctx, conn); err != nil {
172182
bb.errorsMutex.Lock()
173-
defer bb.errorsMutex.Unlock()
174183
bb.errors.Insert(fmt.Errorf("failed to stream BES events: %w", err))
184+
bb.errorsMutex.Unlock()
175185
return
176186
}
177187

188+
// Normal completion path
178189
for _, p := range bb.besProxies {
179-
if !p.Healthy() {
190+
if !p.IsHealthy() {
180191
continue
181192
}
182-
183193
bb.sendFinalLifecycleEvents(context.Background(), p)
184-
185194
if err := p.CloseSend(); err != nil {
186195
fmt.Fprintf(os.Stderr, "Error closing build event stream to %v: %s\n", p.Host(), err.Error())
187196
}
@@ -190,25 +199,47 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
190199
return nil
191200
}
192201

202+
// maybeAbortPipeBecauseNoHealthyBackends unlinks the FIFO if all backends are unhealthy.
203+
// This gives Bazel a broken pipe → it aborts the upload and exits.
204+
func (bb *besPipe) maybeAbortPipeBecauseNoHealthyBackends() {
205+
if len(bb.besProxies) == 0 {
206+
return
207+
}
208+
209+
var anyHealthy bool
210+
for _, p := range bb.besProxies {
211+
if p.IsHealthy() {
212+
anyHealthy = true
213+
break
214+
}
215+
}
216+
if anyHealthy {
217+
return
218+
}
219+
220+
bb.pipeAborted.Do(func() {
221+
fmt.Fprintf(os.Stderr, "All BES backends are unhealthy — unlinking pipe %s to unblock Bazel\n", bb.bepBinPath)
222+
if err := syscall.Unlink(bb.bepBinPath); err != nil && !os.IsNotExist(err) {
223+
fmt.Fprintf(os.Stderr, "failed to unlink BES pipe %s: %v\n", bb.bepBinPath, err)
224+
}
225+
})
226+
}
227+
193228
func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
194229
reader := bufio.NewReader(conn)
195230

196231
go func() {
197232
<-ctx.Done()
198-
if err := conn.SetReadDeadline(time.Now().Add(gracePeriodDuration)); err != nil {
199-
fmt.Fprintf(os.Stderr, "failed to set read deadline after context done: %s\n", err.Error())
200-
}
233+
conn.SetReadDeadline(time.Now().Add(gracePeriodDuration))
201234
}()
202235

203-
// Manually manage a sequence ID for the events
204236
seqId := int64(0)
205-
206237
besEventGlobalTimeout := time.After(besEventGlobalTimeoutDuration)
238+
207239
for {
208240
event := buildeventstream.BuildEvent{}
209-
210241
opts := protodelim.UnmarshalOptions{
211-
MaxSize: 32 * 1024 * 1024, // 32 MB max; we have observed 17 MB BES events in the wild
242+
MaxSize: 32 * 1024 * 1024, // 32 MB max
212243
}
213244

214245
if err := opts.UnmarshalFrom(reader, &event); err != nil {
@@ -220,16 +251,13 @@ func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
220251
case <-besEventGlobalTimeout:
221252
return fmt.Errorf("timeout reached while waiting for BES events")
222253
case <-time.After(besEventThrottleDuration):
223-
// throttle the reading of the BES file when no new data is available
224254
continue
225255
}
226256
}
227257
return fmt.Errorf("failed to parse BES event: %w", err)
228258
}
229259

230-
// Reset the global timeout on each received event
231260
besEventGlobalTimeout = time.After(besEventGlobalTimeoutDuration)
232-
233261
seqId++
234262

235263
if err := bb.publishBesEvent(seqId, &event); err != nil {
@@ -241,9 +269,7 @@ func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
241269
}
242270
}
243271

244-
// Clear read deadline
245272
conn.SetReadDeadline(time.Time{})
246-
247273
return nil
248274
}
249275

@@ -269,7 +295,6 @@ func (bb *besPipe) publishBesEvent(seqId int64, event *buildeventstream.BuildEve
269295
return fmt.Errorf("failed to marshal BES event: %w", err)
270296
}
271297

272-
// Wrap the event in the gRPC message
273298
grpcEvent := &buildv1.PublishBuildToolEventStreamRequest{
274299
OrderedBuildEvent: &buildv1.OrderedBuildEvent{
275300
SequenceNumber: seqId,
@@ -285,14 +310,36 @@ func (bb *besPipe) publishBesEvent(seqId int64, event *buildeventstream.BuildEve
285310
}
286311

287312
for _, p := range bb.besProxies {
288-
eg.Go(
289-
func() error {
290-
if err := p.Send(grpcEvent); err != nil {
313+
p := p // capture
314+
eg.Go(func() error {
315+
if !p.IsHealthy() {
316+
return nil
317+
}
318+
319+
// Channel for Send result
320+
sendCh := make(chan error, 1)
321+
322+
// Run Send in goroutine
323+
go func() {
324+
err := p.Send(grpcEvent)
325+
sendCh <- err
326+
}()
327+
328+
// Wait for Send or timeout
329+
select {
330+
case err := <-sendCh:
331+
if err != nil {
291332
fmt.Fprintf(os.Stderr, "Error sending BES event to %v: %s\n", p.Host(), err.Error())
333+
bb.maybeAbortPipeBecauseNoHealthyBackends()
292334
}
293335
return nil
294-
},
295-
)
336+
case <-time.After(sendTimeout):
337+
fmt.Fprintf(os.Stderr, "Timeout sending BES event to %v: marking unhealthy\n", p.Host())
338+
p.MarkUnhealthy()
339+
bb.maybeAbortPipeBecauseNoHealthyBackends()
340+
return nil
341+
}
342+
})
296343
}
297344
}
298345

@@ -305,8 +352,6 @@ func (bb *besPipe) Args() []string {
305352
bb.bepBinPath,
306353
}
307354

308-
// Also add wait_for_upload_complete flag if the bes pipe was explicitly requested.
309-
// NOTE: this is explicitly not the default behavior to avoid breaking changes in bazel6
310355
if os.Getenv("ASPECT_BEP_USE_PIPE") != "" {
311356
args = append(args, "--build_event_binary_file_upload_mode=wait_for_upload_complete")
312357
}
@@ -326,6 +371,6 @@ func (bb *besPipe) Errors() []error {
326371

327372
func (bb *besPipe) GracefulStop() {
328373
bb.wg.Wait()
329-
374+
// Best-effort cleanup
330375
os.Remove(bb.bepBinPath)
331376
}

pkg/plugin/system/besproxy/bes_proxy.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ type BESProxy interface {
3737
PublishBuildToolEventStream(ctx context.Context, opts ...grpc.CallOption) error
3838
PublishLifecycleEvent(ctx context.Context, req *buildv1.PublishLifecycleEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
3939
StreamCreated() bool
40-
Healthy() bool
40+
IsHealthy() bool
41+
MarkUnhealthy()
4142
Recv() (*buildv1.PublishBuildToolEventStreamResponse, error)
4243
Send(req *buildv1.PublishBuildToolEventStreamRequest) error
4344
}
@@ -142,6 +143,11 @@ func (bp *besProxy) trackError(err error) error {
142143
return err
143144
}
144145

145-
func (bp *besProxy) Healthy() bool {
146+
func (bp *besProxy) IsHealthy() bool {
146147
return bp.hadError < 5 && bp.stream != nil
147148
}
149+
150+
func (bp *besProxy) MarkUnhealthy() {
151+
bp.hadError = 5
152+
fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host)
153+
}

0 commit comments

Comments
 (0)