diff --git a/pkg/plugin/system/bep/bes_backend.go b/pkg/plugin/system/bep/bes_backend.go index 00d2b231..4911644f 100644 --- a/pkg/plugin/system/bep/bes_backend.go +++ b/pkg/plugin/system/bep/bes_backend.go @@ -26,6 +26,7 @@ import ( "os" "strings" "sync" + "time" "github.com/fatih/color" "github.com/golang/protobuf/ptypes/empty" @@ -527,15 +528,42 @@ func (bb *besBackend) PublishBuildToolEventStream( // Goroutine to forward to build event to BES proxies eg.Go(func() error { for fwd := range fwdChanRead { + egFwd := errgroup.Group{} + for _, bp := range bb.besProxies { - if !bp.Healthy() { - continue - } - err := bp.Send(fwd) - if err != nil { - // If we fail to send to a proxy then print out an error but don't fail the GRPC call - fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error()) - } + bp := bp // capture + egFwd.Go(func() error { + if !bp.Healthy() { + return nil + } + + // Channel for Send result + sendCh := make(chan error, 1) + + // Run Send in goroutine + go func() { + err := bp.Send(fwd) + sendCh <- err + }() + + // Wait for Send or timeout + select { + case err := <-sendCh: + if err != nil { + fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error()) + bp.MarkUnhealthy() + } + return nil + case <-time.After(besSendTimeout): + fmt.Fprintf(os.Stderr, "Timeout sending build event to %v: marking unhealthy\n", bp.Host()) + bp.MarkUnhealthy() + return nil + } + }) + } + + if err := egFwd.Wait(); err != nil { + // Optionally handle errors from sends, but since we log inside, perhaps no need to propagate } } for _, bp := range bb.besProxies { diff --git a/pkg/plugin/system/bep/bes_pipe.go b/pkg/plugin/system/bep/bes_pipe.go index f66f3072..a8aaa303 100644 --- a/pkg/plugin/system/bep/bes_pipe.go +++ b/pkg/plugin/system/bep/bes_pipe.go @@ -49,6 +49,7 @@ type BESPipeInterceptor interface { const besEventGlobalTimeoutDuration = 5 * time.Minute const besEventThrottleDuration = 50 * time.Millisecond const gracePeriodDuration = 2 * time.Second +const besSendTimeout = 1 * time.Minute func NewBESPipe(buildId, invocationId string) (BESPipeInterceptor, error) { return &besPipe{ @@ -72,6 +73,9 @@ type besPipe struct { besInvocationId string besProxies []besproxy.BESProxy + // Track whether we have already unlinked the pipe due to backend failure + pipeAborted sync.Once + wg *sync.WaitGroup } @@ -116,6 +120,8 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) { break } } + // When the ACK goroutine exits (usually because of error), check if we should abort the pipe + bb.maybeAbortPipeBecauseNoHealthyBackends() }() } @@ -156,6 +162,13 @@ func (bb *besPipe) ServeWait(ctx context.Context) error { bb.wg.Add(1) go func() { defer bb.wg.Done() + + // If the overall context is cancelled, abort the pipe immediately + go func() { + <-ctx.Done() + bb.maybeAbortPipeBecauseNoHealthyBackends() + }() + conn, err := os.OpenFile(bb.bepBinPath, os.O_RDONLY, os.ModeNamedPipe) if err != nil { bb.errorsMutex.Lock() @@ -163,10 +176,7 @@ func (bb *besPipe) ServeWait(ctx context.Context) error { bb.errors.Insert(fmt.Errorf("failed to accept connection on BES pipe %s: %w", bb.bepBinPath, err)) return } - - defer func() { - conn.Close() - }() + defer conn.Close() if err := bb.streamBesEvents(ctx, conn); err != nil { bb.errorsMutex.Lock() @@ -175,6 +185,7 @@ func (bb *besPipe) ServeWait(ctx context.Context) error { return } + // Normal completion path for _, p := range bb.besProxies { if !p.Healthy() { continue @@ -190,6 +201,32 @@ func (bb *besPipe) ServeWait(ctx context.Context) error { return nil } +// maybeAbortPipeBecauseNoHealthyBackends unlinks the FIFO if all backends are unhealthy. +// This gives Bazel a broken pipe → it aborts the upload and exits. +func (bb *besPipe) maybeAbortPipeBecauseNoHealthyBackends() { + if len(bb.besProxies) == 0 { + return + } + + var anyHealthy bool + for _, p := range bb.besProxies { + if p.Healthy() { + anyHealthy = true + break + } + } + if anyHealthy { + return + } + + bb.pipeAborted.Do(func() { + fmt.Fprintf(os.Stderr, "All BES backends are unhealthy — unlinking pipe %s\n", bb.bepBinPath) + if err := syscall.Unlink(bb.bepBinPath); err != nil && !os.IsNotExist(err) { + fmt.Fprintf(os.Stderr, "failed to unlink BES pipe %s: %v\n", bb.bepBinPath, err) + } + }) +} + func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error { reader := bufio.NewReader(conn) @@ -285,14 +322,35 @@ func (bb *besPipe) publishBesEvent(seqId int64, event *buildeventstream.BuildEve } for _, p := range bb.besProxies { - eg.Go( - func() error { - if err := p.Send(grpcEvent); err != nil { - fmt.Fprintf(os.Stderr, "Error sending BES event to %v: %s\n", p.Host(), err.Error()) + p := p // capture + eg.Go(func() error { + if !p.Healthy() { + return nil + } + + // Channel for Send result + sendCh := make(chan error, 1) + + // Run Send in goroutine + go func() { + err := p.Send(grpcEvent) + sendCh <- err + }() + + // Wait for Send or timeout + select { + case err := <-sendCh: + if err != nil { + p.MarkUnhealthy() + bb.maybeAbortPipeBecauseNoHealthyBackends() } return nil - }, - ) + case <-time.After(besSendTimeout): + p.MarkUnhealthy() + bb.maybeAbortPipeBecauseNoHealthyBackends() + return nil + } + }) } } diff --git a/pkg/plugin/system/besproxy/bes_proxy.go b/pkg/plugin/system/besproxy/bes_proxy.go index 7420ecd0..b2b8ca90 100644 --- a/pkg/plugin/system/besproxy/bes_proxy.go +++ b/pkg/plugin/system/besproxy/bes_proxy.go @@ -27,6 +27,8 @@ import ( buildv1 "google.golang.org/genproto/googleapis/devtools/build/v1" ) +const maxStreamErrors = 5 + // BESProxy implements a Build Event Protocol backend to be passed to the // `bazel build` command so that the Aspect plugins can register as subscribers // to the build events. @@ -38,6 +40,7 @@ type BESProxy interface { PublishLifecycleEvent(ctx context.Context, req *buildv1.PublishLifecycleEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) StreamCreated() bool Healthy() bool + MarkUnhealthy() Recv() (*buildv1.PublishBuildToolEventStreamResponse, error) Send(req *buildv1.PublishBuildToolEventStreamRequest) error } @@ -135,7 +138,7 @@ func (bp *besProxy) CloseSend() error { func (bp *besProxy) trackError(err error) error { if err != nil { bp.hadError++ - if bp.hadError == 5 { + if bp.hadError == maxStreamErrors { fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host) } } @@ -143,5 +146,10 @@ func (bp *besProxy) trackError(err error) error { } func (bp *besProxy) Healthy() bool { - return bp.hadError < 5 && bp.stream != nil + return bp.hadError < maxStreamErrors && bp.stream != nil +} + +func (bp *besProxy) MarkUnhealthy() { + bp.hadError = maxStreamErrors + fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host) } diff --git a/pkg/plugin/system/system.go b/pkg/plugin/system/system.go index e7f876d4..cb884b17 100644 --- a/pkg/plugin/system/system.go +++ b/pkg/plugin/system/system.go @@ -204,6 +204,9 @@ func (ps *pluginSystem) BESPluginInterceptor() interceptors.Interceptor { } usePipe := os.Getenv("ASPECT_BEP_USE_PIPE") != "" + if forceBesBackend { + fmt.Fprintf(os.Stderr, "Using BES pipe\n") + } return ps.createBesInterceptor(ctx, cmd, args, usePipe, next) } @@ -290,7 +293,7 @@ func (ps *pluginSystem) createBesInterceptor(ctx context.Context, cmd *cobra.Com if os.Getenv("ASPECT_BEP_WRITE_LAST_VIA_PIPE") != "" { newArgs, lastBackend := removeLastBesBackend(args) - + fmt.Fprintf(os.Stderr, "Forwarding BES stream to %s\n", lastBackend) besProxy := besproxy.NewBesProxy(lastBackend, map[string]string{}) if err := besProxy.Connect(); err != nil { fmt.Fprintf(os.Stderr, "Failed to connect to build event stream backend %s: %s", lastBackend, err.Error())