Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/pkg/cli/client/byoc/gcp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (s *ServerStream[T]) Follow(start time.Time) (iter.Seq2[*T, error], error)
yield(nil, err)
return
}
if entry == nil {
continue // empty Recv response (heartbeat/suppression), keep looping
}
resps, err := s.parseAndFilter(entry)
if err != nil {
yield(nil, err)
Expand Down
71 changes: 71 additions & 0 deletions src/pkg/cli/client/byoc/gcp/stream_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package gcp

import (
"context"
"iter"
"strconv"
"testing"
"time"

"cloud.google.com/go/logging/apiv2/loggingpb"
"github.com/DefangLabs/defang/src/pkg/clouds/gcp"
Expand Down Expand Up @@ -179,3 +181,72 @@ func TestServerStream_Start(t *testing.T) {
})
}
}

// TestServerStream_Follow_SkipsNilEntries verifies that Follow() skips nil entries
// returned by the tailer (heartbeat or suppression-info responses from GCP) and
// continues yielding real log entries without error.
func TestServerStream_Follow_SkipsNilEntries(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

// instanceId avoids a nil-pointer dereference in the parser when Resource is absent.
svcLabels := map[string]string{"defang-service": "svc", "instanceId": "inst1"}

realEntry := &loggingpb.LogEntry{
Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "real log"},
Labels: svcLabels,
Timestamp: timestamppb.Now(),
}

// cancelEntry is a sentinel: when the tailer returns it, we cancel the context
// so the Follow loop exits cleanly rather than blocking forever.
cancelEntry := &loggingpb.LogEntry{
Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "cancel"},
Labels: svcLabels,
Timestamp: timestamppb.Now(),
}

tailerEntries := []*loggingpb.LogEntry{
nil, // heartbeat — must be skipped
realEntry,
nil, // suppression info — must be skipped
cancelEntry,
}

mockClient := &MockGcpLogsClient{
lister: &MockGcpLoggingLister{},
tailer: &MockGcpLoggingTailer{
MockGcpLoggingLister: MockGcpLoggingLister{logEntries: tailerEntries},
},
}

services := []string{"svc"}
restoreServiceName := getServiceNameRestorer(services, gcp.SafeLabelValue,
func(entry *defangv1.TailResponse) string { return entry.Service },
func(entry *defangv1.TailResponse, name string) *defangv1.TailResponse {
entry.Service = name
return entry
})

stream := NewServerStream(ctx, mockClient, getLogEntryParser(ctx, mockClient), restoreServiceName)
stream.query = NewLogQuery(mockClient.GetProjectID())

seq, err := stream.Follow(time.Time{}) // zero start → skip listing, go straight to tail
assert.NoError(t, err)

var messages []string
for resp, err := range seq {
assert.NoError(t, err)
if err != nil {
break
}
msg := resp.Entries[0].Message
messages = append(messages, msg)
if msg == "cancel" {
cancel()
}
}

assert.Equal(t, []string{"real log", "cancel"}, messages,
"Follow() should skip nil tailer entries and yield real entries")
}
9 changes: 8 additions & 1 deletion src/pkg/cli/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"iter"

"connectrpc.com/connect"
"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/term"
"github.com/DefangLabs/defang/src/pkg/types"
Expand Down Expand Up @@ -52,8 +53,14 @@ func WaitServiceState(
return serviceStates, nil
}
if err != nil {
// Reconnect on transient errors
// Reconnect on transient errors (including ResourceExhausted — quota resets within
// a minute and DelayBeforeRetry backs off exponentially up to 1 minute).
if isTransientError(err) {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
term.Warnf("quota exceeded; will retry subscribe stream after backoff: %v", err)
} else {
term.Debugf("WaitServiceState: transient error, reconnecting subscribe stream: %v", err)
}
if err := provider.DelayBeforeRetry(ctx); err != nil {
return serviceStates, err
}
Expand Down
7 changes: 7 additions & 0 deletions src/pkg/cli/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/types"
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// mockSubscribeProvider mocks the provider for Subscribe.
Expand Down Expand Up @@ -283,6 +285,11 @@ func TestWaitServiceStateStreamReceive(t *testing.T) {
err: connect.NewError(connect.CodeInternal, errors.New("internal error")),
expectRetry: true,
},
{
name: "stream receive returns resource exhausted error and retry to connect",
err: status.Error(codes.ResourceExhausted, "quota exceeded"),
expectRetry: true,
},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func isTransientError(err error) bool {

// GCP grpc transient errors
if st, ok := status.FromError(err); ok {
transientCodes := []codes.Code{codes.Unavailable, codes.Internal}
transientCodes := []codes.Code{codes.Unavailable, codes.Internal, codes.ResourceExhausted}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResourceExhausted is only transient for some resources, ie. log tail/queries can be retried. So I wonder if we should simply add the special case outside of the call to isTransientError.

Copy link
Copy Markdown
Member Author

@jordanstephens jordanstephens Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isTransientError is exclusively used in log monitoring code. I think it's appropriately placed, so that WaitServiceState,receiveLogs, and WaitForCdTaskExit can take advantage of it.

Would you prefer to rename it to something like isTransientLoggingError?

if slices.Contains(transientCodes, st.Code()) {
return true
}
Expand Down
31 changes: 31 additions & 0 deletions src/pkg/cli/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,41 @@ import (
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"

"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestIsTransientError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"eof", io.EOF, false},
{"connect unavailable", connect.NewError(connect.CodeUnavailable, errors.New("unavailable")), true},
{"connect internal non-wire", connect.NewError(connect.CodeInternal, errors.New("internal")), true},
{"connect permission denied", connect.NewError(connect.CodePermissionDenied, errors.New("denied")), false},
{"grpc unavailable", mustGRPCStatus(codes.Unavailable, "unavailable"), true},
{"grpc internal", mustGRPCStatus(codes.Internal, "internal"), true},
{"grpc resource exhausted", mustGRPCStatus(codes.ResourceExhausted, "quota exceeded"), true},
{"grpc permission denied", mustGRPCStatus(codes.PermissionDenied, "denied"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isTransientError(tt.err); got != tt.want {
t.Errorf("isTransientError(%v) = %v, want %v", tt.err, got, tt.want)
}
})
}
}

func mustGRPCStatus(code codes.Code, msg string) error {
return grpcstatus.Error(code, msg)
}

func TestIsProgressDot(t *testing.T) {
tests := []struct {
name string
Expand Down
3 changes: 1 addition & 2 deletions src/pkg/cli/waitForCdTaskExit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/term"
)

var pollDuration = 2 * time.Second
Expand All @@ -20,7 +19,7 @@ func WaitForCdTaskExit(ctx context.Context, provider client.Provider) error {
select {
case <-ticker.C:
done, err := provider.GetDeploymentStatus(ctx)
term.Debugf("Polled CD task status: done=%v, err=%v", done, err)
// term.Debugf("Polled CD task status: done=%v, err=%v", done, err)
if err != nil {
// End condition: EOF indicates that the task has completed successfully
if errors.Is(err, io.EOF) {
Expand Down
4 changes: 3 additions & 1 deletion src/pkg/clouds/gcp/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func (t *gcpLoggingTailer) Next(ctx context.Context) (*loggingpb.LogEntry, error
}
t.cache = resp.GetEntries()
if len(t.cache) == 0 {
return nil, errors.New("no log entries found")
// GCP may send empty responses (heartbeats, suppression info); return nil
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code would be improved by #1951

// so the caller can continue looping without treating this as an error.
return nil, nil
}
}

Expand Down
89 changes: 89 additions & 0 deletions src/pkg/clouds/gcp/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package gcp

import (
"context"
"io"
"testing"

"cloud.google.com/go/logging/apiv2/loggingpb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// mockTailLogEntriesClient implements loggingpb.LoggingServiceV2_TailLogEntriesClient
// for unit testing gcpLoggingTailer.Next().
type mockTailLogEntriesClient struct {
responses []*loggingpb.TailLogEntriesResponse
err error
}

func (m *mockTailLogEntriesClient) Send(*loggingpb.TailLogEntriesRequest) error { return nil }
func (m *mockTailLogEntriesClient) Recv() (*loggingpb.TailLogEntriesResponse, error) {
if len(m.responses) == 0 {
if m.err != nil {
return nil, m.err
}
return nil, io.EOF
}
resp := m.responses[0]
m.responses = m.responses[1:]
return resp, nil
}
func (m *mockTailLogEntriesClient) Header() (metadata.MD, error) { return nil, nil }
func (m *mockTailLogEntriesClient) Trailer() metadata.MD { return nil }
func (m *mockTailLogEntriesClient) CloseSend() error { return nil }
func (m *mockTailLogEntriesClient) Context() context.Context { return context.Background() }
func (m *mockTailLogEntriesClient) SendMsg(any) error { return nil }
func (m *mockTailLogEntriesClient) RecvMsg(any) error { return nil }

var _ grpc.ClientStream = (*mockTailLogEntriesClient)(nil)

func TestGcpLoggingTailerNext_EmptyResponse(t *testing.T) {
// An empty-entries response (heartbeat or suppression info) must return nil, nil
// so the caller can continue looping without treating it as an error.
client := &mockTailLogEntriesClient{
responses: []*loggingpb.TailLogEntriesResponse{
{Entries: nil}, // empty — heartbeat
},
}
tailer := &gcpLoggingTailer{tleClient: client}

entry, err := tailer.Next(context.Background())
if err != nil {
t.Fatalf("Next() error = %v, want nil", err)
}
if entry != nil {
t.Fatalf("Next() entry = %v, want nil", entry)
}
}

func TestGcpLoggingTailerNext_WithEntries(t *testing.T) {
// A response with entries should return the first entry and cache the rest.
entries := []*loggingpb.LogEntry{
{InsertId: "entry1"},
{InsertId: "entry2"},
}
client := &mockTailLogEntriesClient{
responses: []*loggingpb.TailLogEntriesResponse{
{Entries: entries},
},
}
tailer := &gcpLoggingTailer{tleClient: client}

entry, err := tailer.Next(context.Background())
if err != nil {
t.Fatalf("Next() error = %v, want nil", err)
}
if entry == nil || entry.InsertId != "entry1" {
t.Fatalf("Next() entry = %v, want entry1", entry)
}

// Second call should return cached entry without calling Recv again.
entry, err = tailer.Next(context.Background())
if err != nil {
t.Fatalf("Next() error = %v, want nil", err)
}
if entry == nil || entry.InsertId != "entry2" {
t.Fatalf("Next() entry = %v, want entry2", entry)
}
}
Loading