Skip to content

Commit be04d90

Browse files
committed
fix: handle send timeouts in bes_pipe and bes_backend
1 parent 0f9c7bd commit be04d90

File tree

4 files changed

+119
-21
lines changed

4 files changed

+119
-21
lines changed

pkg/plugin/system/bep/bes_backend.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"os"
2727
"strings"
2828
"sync"
29+
"time"
2930

3031
"github.com/fatih/color"
3132
"github.com/golang/protobuf/ptypes/empty"
@@ -527,15 +528,42 @@ func (bb *besBackend) PublishBuildToolEventStream(
527528
// Goroutine to forward to build event to BES proxies
528529
eg.Go(func() error {
529530
for fwd := range fwdChanRead {
531+
egFwd := errgroup.Group{}
532+
530533
for _, bp := range bb.besProxies {
531-
if !bp.Healthy() {
532-
continue
533-
}
534-
err := bp.Send(fwd)
535-
if err != nil {
536-
// If we fail to send to a proxy then print out an error but don't fail the GRPC call
537-
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
538-
}
534+
bp := bp // capture
535+
egFwd.Go(func() error {
536+
if !bp.Healthy() {
537+
return nil
538+
}
539+
540+
// Channel for Send result
541+
sendCh := make(chan error, 1)
542+
543+
// Run Send in goroutine
544+
go func() {
545+
err := bp.Send(fwd)
546+
sendCh <- err
547+
}()
548+
549+
// Wait for Send or timeout
550+
select {
551+
case err := <-sendCh:
552+
if err != nil {
553+
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
554+
bp.MarkUnhealthy()
555+
}
556+
return nil
557+
case <-time.After(besSendTimeout):
558+
fmt.Fprintf(os.Stderr, "Timeout sending build event to %v: marking unhealthy\n", bp.Host())
559+
bp.MarkUnhealthy()
560+
return nil
561+
}
562+
})
563+
}
564+
565+
if err := egFwd.Wait(); err != nil {
566+
// Optionally handle errors from sends, but since we log inside, perhaps no need to propagate
539567
}
540568
}
541569
for _, bp := range bb.besProxies {

pkg/plugin/system/bep/bes_pipe.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type BESPipeInterceptor interface {
4949
const besEventGlobalTimeoutDuration = 5 * time.Minute
5050
const besEventThrottleDuration = 50 * time.Millisecond
5151
const gracePeriodDuration = 2 * time.Second
52+
const besSendTimeout = 1 * time.Minute
5253

5354
func NewBESPipe(buildId, invocationId string) (BESPipeInterceptor, error) {
5455
return &besPipe{
@@ -72,6 +73,9 @@ type besPipe struct {
7273
besInvocationId string
7374
besProxies []besproxy.BESProxy
7475

76+
// Track whether we have already unlinked the pipe due to backend failure
77+
pipeAborted sync.Once
78+
7579
wg *sync.WaitGroup
7680
}
7781

@@ -116,6 +120,8 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) {
116120
break
117121
}
118122
}
123+
// When the ACK goroutine exits (usually because of error), check if we should abort the pipe
124+
bb.maybeAbortPipeBecauseNoHealthyBackends()
119125
}()
120126
}
121127

@@ -156,17 +162,21 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
156162
bb.wg.Add(1)
157163
go func() {
158164
defer bb.wg.Done()
165+
166+
// If the overall context is cancelled, abort the pipe immediately
167+
go func() {
168+
<-ctx.Done()
169+
bb.maybeAbortPipeBecauseNoHealthyBackends()
170+
}()
171+
159172
conn, err := os.OpenFile(bb.bepBinPath, os.O_RDONLY, os.ModeNamedPipe)
160173
if err != nil {
161174
bb.errorsMutex.Lock()
162175
defer bb.errorsMutex.Unlock()
163176
bb.errors.Insert(fmt.Errorf("failed to accept connection on BES pipe %s: %w", bb.bepBinPath, err))
164177
return
165178
}
166-
167-
defer func() {
168-
conn.Close()
169-
}()
179+
defer conn.Close()
170180

171181
if err := bb.streamBesEvents(ctx, conn); err != nil {
172182
bb.errorsMutex.Lock()
@@ -175,6 +185,7 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
175185
return
176186
}
177187

188+
// Normal completion path
178189
for _, p := range bb.besProxies {
179190
if !p.Healthy() {
180191
continue
@@ -190,6 +201,32 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
190201
return nil
191202
}
192203

204+
// maybeAbortPipeBecauseNoHealthyBackends unlinks the FIFO if all backends are unhealthy.
205+
// This gives Bazel a broken pipe → it aborts the upload and exits.
206+
func (bb *besPipe) maybeAbortPipeBecauseNoHealthyBackends() {
207+
if len(bb.besProxies) == 0 {
208+
return
209+
}
210+
211+
var anyHealthy bool
212+
for _, p := range bb.besProxies {
213+
if p.Healthy() {
214+
anyHealthy = true
215+
break
216+
}
217+
}
218+
if anyHealthy {
219+
return
220+
}
221+
222+
bb.pipeAborted.Do(func() {
223+
fmt.Fprintf(os.Stderr, "All BES backends are unhealthy — unlinking pipe %s\n", bb.bepBinPath)
224+
if err := syscall.Unlink(bb.bepBinPath); err != nil && !os.IsNotExist(err) {
225+
fmt.Fprintf(os.Stderr, "failed to unlink BES pipe %s: %v\n", bb.bepBinPath, err)
226+
}
227+
})
228+
}
229+
193230
func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
194231
reader := bufio.NewReader(conn)
195232

@@ -204,6 +241,7 @@ func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
204241
seqId := int64(0)
205242

206243
besEventGlobalTimeout := time.After(besEventGlobalTimeoutDuration)
244+
207245
for {
208246
event := buildeventstream.BuildEvent{}
209247

@@ -285,14 +323,35 @@ func (bb *besPipe) publishBesEvent(seqId int64, event *buildeventstream.BuildEve
285323
}
286324

287325
for _, p := range bb.besProxies {
288-
eg.Go(
289-
func() error {
290-
if err := p.Send(grpcEvent); err != nil {
291-
fmt.Fprintf(os.Stderr, "Error sending BES event to %v: %s\n", p.Host(), err.Error())
326+
p := p // capture
327+
eg.Go(func() error {
328+
if !p.Healthy() {
329+
return nil
330+
}
331+
332+
// Channel for Send result
333+
sendCh := make(chan error, 1)
334+
335+
// Run Send in goroutine
336+
go func() {
337+
err := p.Send(grpcEvent)
338+
sendCh <- err
339+
}()
340+
341+
// Wait for Send or timeout
342+
select {
343+
case err := <-sendCh:
344+
if err != nil {
345+
p.MarkUnhealthy()
346+
bb.maybeAbortPipeBecauseNoHealthyBackends()
292347
}
293348
return nil
294-
},
295-
)
349+
case <-time.After(besSendTimeout):
350+
p.MarkUnhealthy()
351+
bb.maybeAbortPipeBecauseNoHealthyBackends()
352+
return nil
353+
}
354+
})
296355
}
297356
}
298357

pkg/plugin/system/besproxy/bes_proxy.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
buildv1 "google.golang.org/genproto/googleapis/devtools/build/v1"
2828
)
2929

30+
const maxStreamErrors = 5
31+
3032
// BESProxy implements a Build Event Protocol backend to be passed to the
3133
// `bazel build` command so that the Aspect plugins can register as subscribers
3234
// to the build events.
@@ -38,6 +40,7 @@ type BESProxy interface {
3840
PublishLifecycleEvent(ctx context.Context, req *buildv1.PublishLifecycleEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
3941
StreamCreated() bool
4042
Healthy() bool
43+
MarkUnhealthy()
4144
Recv() (*buildv1.PublishBuildToolEventStreamResponse, error)
4245
Send(req *buildv1.PublishBuildToolEventStreamRequest) error
4346
}
@@ -135,13 +138,18 @@ func (bp *besProxy) CloseSend() error {
135138
func (bp *besProxy) trackError(err error) error {
136139
if err != nil {
137140
bp.hadError++
138-
if bp.hadError == 5 {
141+
if bp.hadError == maxStreamErrors {
139142
fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host)
140143
}
141144
}
142145
return err
143146
}
144147

145148
func (bp *besProxy) Healthy() bool {
146-
return bp.hadError < 5 && bp.stream != nil
149+
return bp.hadError < maxStreamErrors && bp.stream != nil
150+
}
151+
152+
func (bp *besProxy) MarkUnhealthy() {
153+
bp.hadError = maxStreamErrors
154+
fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host)
147155
}

pkg/plugin/system/system.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ func (ps *pluginSystem) BESPluginInterceptor() interceptors.Interceptor {
204204
}
205205

206206
usePipe := os.Getenv("ASPECT_BEP_USE_PIPE") != ""
207+
if forceBesBackend {
208+
fmt.Fprintf(os.Stderr, "Using Aspect BES pipe\n")
209+
}
207210

208211
return ps.createBesInterceptor(ctx, cmd, args, usePipe, next)
209212
}
@@ -290,7 +293,7 @@ func (ps *pluginSystem) createBesInterceptor(ctx context.Context, cmd *cobra.Com
290293

291294
if os.Getenv("ASPECT_BEP_WRITE_LAST_VIA_PIPE") != "" {
292295
newArgs, lastBackend := removeLastBesBackend(args)
293-
296+
fmt.Fprintf(os.Stderr, "Forwarding BES stream to %s\n", lastBackend)
294297
besProxy := besproxy.NewBesProxy(lastBackend, map[string]string{})
295298
if err := besProxy.Connect(); err != nil {
296299
fmt.Fprintf(os.Stderr, "Failed to connect to build event stream backend %s: %s", lastBackend, err.Error())

0 commit comments

Comments
 (0)