Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions pkg/plugin/system/bep/bes_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/fatih/color"
"github.com/golang/protobuf/ptypes/empty"
Expand Down Expand Up @@ -527,15 +528,42 @@ func (bb *besBackend) PublishBuildToolEventStream(
// Goroutine to forward to build event to BES proxies
eg.Go(func() error {
for fwd := range fwdChanRead {
egFwd := errgroup.Group{}

for _, bp := range bb.besProxies {
if !bp.Healthy() {
continue
}
err := bp.Send(fwd)
if err != nil {
// If we fail to send to a proxy then print out an error but don't fail the GRPC call
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
}
bp := bp // capture
egFwd.Go(func() error {
if !bp.Healthy() {
return nil
}

// Channel for Send result
sendCh := make(chan error, 1)

// Run Send in goroutine
go func() {
err := bp.Send(fwd)
sendCh <- err
}()

// Wait for Send or timeout
select {
case err := <-sendCh:
if err != nil {
fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error())
bp.MarkUnhealthy()
}
return nil
case <-time.After(besSendTimeout):
fmt.Fprintf(os.Stderr, "Timeout sending build event to %v: marking unhealthy\n", bp.Host())
bp.MarkUnhealthy()
return nil
}
})
}

if err := egFwd.Wait(); err != nil {
// Optionally handle errors from sends, but since we log inside, perhaps no need to propagate
}
}
for _, bp := range bb.besProxies {
Expand Down
78 changes: 68 additions & 10 deletions pkg/plugin/system/bep/bes_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BESPipeInterceptor interface {
const besEventGlobalTimeoutDuration = 5 * time.Minute
const besEventThrottleDuration = 50 * time.Millisecond
const gracePeriodDuration = 2 * time.Second
const besSendTimeout = 1 * time.Minute

func NewBESPipe(buildId, invocationId string) (BESPipeInterceptor, error) {
return &besPipe{
Expand All @@ -72,6 +73,9 @@ type besPipe struct {
besInvocationId string
besProxies []besproxy.BESProxy

// Track whether we have already unlinked the pipe due to backend failure
pipeAborted sync.Once

wg *sync.WaitGroup
}

Expand Down Expand Up @@ -116,6 +120,8 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) {
break
}
}
// When the ACK goroutine exits (usually because of error), check if we should abort the pipe
bb.maybeAbortPipeBecauseNoHealthyBackends()
}()
}

Expand Down Expand Up @@ -156,17 +162,21 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
bb.wg.Add(1)
go func() {
defer bb.wg.Done()

// If the overall context is cancelled, abort the pipe immediately
go func() {
<-ctx.Done()
bb.maybeAbortPipeBecauseNoHealthyBackends()
}()

conn, err := os.OpenFile(bb.bepBinPath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
bb.errorsMutex.Lock()
defer bb.errorsMutex.Unlock()
bb.errors.Insert(fmt.Errorf("failed to accept connection on BES pipe %s: %w", bb.bepBinPath, err))
return
}

defer func() {
conn.Close()
}()
defer conn.Close()

if err := bb.streamBesEvents(ctx, conn); err != nil {
bb.errorsMutex.Lock()
Expand All @@ -175,6 +185,7 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
return
}

// Normal completion path
for _, p := range bb.besProxies {
if !p.Healthy() {
continue
Expand All @@ -190,6 +201,32 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
return nil
}

// maybeAbortPipeBecauseNoHealthyBackends unlinks the FIFO if all backends are unhealthy.
// This gives Bazel a broken pipe → it aborts the upload and exits.
func (bb *besPipe) maybeAbortPipeBecauseNoHealthyBackends() {
if len(bb.besProxies) == 0 {
return
}

var anyHealthy bool
for _, p := range bb.besProxies {
if p.Healthy() {
anyHealthy = true
break
}
}
if anyHealthy {
return
}

bb.pipeAborted.Do(func() {
fmt.Fprintf(os.Stderr, "All BES backends are unhealthy — unlinking pipe %s\n", bb.bepBinPath)
if err := syscall.Unlink(bb.bepBinPath); err != nil && !os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "failed to unlink BES pipe %s: %v\n", bb.bepBinPath, err)
}
})
}

func (bb *besPipe) streamBesEvents(ctx context.Context, conn *os.File) error {
reader := bufio.NewReader(conn)

Expand Down Expand Up @@ -285,14 +322,35 @@ func (bb *besPipe) publishBesEvent(seqId int64, event *buildeventstream.BuildEve
}

for _, p := range bb.besProxies {
eg.Go(
func() error {
if err := p.Send(grpcEvent); err != nil {
fmt.Fprintf(os.Stderr, "Error sending BES event to %v: %s\n", p.Host(), err.Error())
p := p // capture
eg.Go(func() error {
if !p.Healthy() {
return nil
}

// Channel for Send result
sendCh := make(chan error, 1)

// Run Send in goroutine
go func() {
err := p.Send(grpcEvent)
sendCh <- err
}()

// Wait for Send or timeout
select {
case err := <-sendCh:
if err != nil {
p.MarkUnhealthy()
bb.maybeAbortPipeBecauseNoHealthyBackends()
}
return nil
},
)
case <-time.After(besSendTimeout):
p.MarkUnhealthy()
bb.maybeAbortPipeBecauseNoHealthyBackends()
return nil
}
})
}
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/plugin/system/besproxy/bes_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
buildv1 "google.golang.org/genproto/googleapis/devtools/build/v1"
)

const maxStreamErrors = 5

// BESProxy implements a Build Event Protocol backend to be passed to the
// `bazel build` command so that the Aspect plugins can register as subscribers
// to the build events.
Expand All @@ -38,6 +40,7 @@ type BESProxy interface {
PublishLifecycleEvent(ctx context.Context, req *buildv1.PublishLifecycleEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
StreamCreated() bool
Healthy() bool
MarkUnhealthy()
Recv() (*buildv1.PublishBuildToolEventStreamResponse, error)
Send(req *buildv1.PublishBuildToolEventStreamRequest) error
}
Expand Down Expand Up @@ -135,13 +138,18 @@ func (bp *besProxy) CloseSend() error {
func (bp *besProxy) trackError(err error) error {
if err != nil {
bp.hadError++
if bp.hadError == 5 {
if bp.hadError == maxStreamErrors {
fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host)
}
}
return err
}

func (bp *besProxy) Healthy() bool {
return bp.hadError < 5 && bp.stream != nil
return bp.hadError < maxStreamErrors && bp.stream != nil
}

func (bp *besProxy) MarkUnhealthy() {
bp.hadError = maxStreamErrors
fmt.Printf("stream to %s is marked unhealthy, taking out of rotation.", bp.host)
}
5 changes: 4 additions & 1 deletion pkg/plugin/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func (ps *pluginSystem) BESPluginInterceptor() interceptors.Interceptor {
}

usePipe := os.Getenv("ASPECT_BEP_USE_PIPE") != ""
if forceBesBackend {
fmt.Fprintf(os.Stderr, "Using BES pipe\n")
}

return ps.createBesInterceptor(ctx, cmd, args, usePipe, next)
}
Expand Down Expand Up @@ -290,7 +293,7 @@ func (ps *pluginSystem) createBesInterceptor(ctx context.Context, cmd *cobra.Com

if os.Getenv("ASPECT_BEP_WRITE_LAST_VIA_PIPE") != "" {
newArgs, lastBackend := removeLastBesBackend(args)

fmt.Fprintf(os.Stderr, "Forwarding BES stream to %s\n", lastBackend)
besProxy := besproxy.NewBesProxy(lastBackend, map[string]string{})
if err := besProxy.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to build event stream backend %s: %s", lastBackend, err.Error())
Expand Down