Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/aws/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (b *ByocAws) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (ite
if err != nil {
return nil, AnnotateAwsError(err)
}
logSeq = cw.Flatten(cdSeq)
logSeq = pkg.Flatten(cdSeq)
// No need to filter events by etag because we only show logs from the specified task ID
} else {
logSeq, err = b.queryOrTailLogs(ctx, cwClient, req)
Expand Down Expand Up @@ -785,7 +785,7 @@ func (b *ByocAws) queryOrTailLogs(ctx context.Context, cwClient cw.LogsClient, r
if err != nil {
return nil, err
}
return cw.Flatten(logSeq), nil
return pkg.Flatten(logSeq), nil
} else {
logSeq, err := cw.QueryLogGroups(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion src/pkg/cli/client/byoc/aws/byoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/DefangLabs/defang/src/pkg"
"github.com/DefangLabs/defang/src/pkg/cli/client/byoc"
"github.com/DefangLabs/defang/src/pkg/clouds/aws"
"github.com/DefangLabs/defang/src/pkg/clouds/aws/cw"
Expand Down Expand Up @@ -693,7 +694,7 @@ func TestQueryCdLogs(t *testing.T) {
require.NoError(t, err)

// Flatten and collect
logSeq := cw.Flatten(batchSeq)
logSeq := pkg.Flatten(batchSeq)
events := collectEvents(t, logSeq)
assert.Len(t, events, tt.wantCount)
})
Expand Down
8 changes: 4 additions & 4 deletions src/pkg/cli/client/byoc/gcp/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,17 +586,17 @@ func (b *ByocGcp) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest)

now := time.Now()
subscribeStream.query.AddSince(now) // Do no query historical events
return subscribeStream.Follow(now)
return subscribeStream.Follow(ctx, now)
}

func (b *ByocGcp) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (iter.Seq2[*defangv1.TailResponse, error], error) {
logStream := b.getLogStream(ctx, b.driver, req)
if req.Follow {
return logStream.Follow(req.Since.AsTime())
return logStream.Follow(ctx, req.Since.AsTime())
} else if req.Since.IsValid() {
return logStream.Head(req.Limit), nil
return logStream.Head(ctx, req.Limit), nil
}
return logStream.Tail(req.Limit), nil
return logStream.Tail(ctx, req.Limit), nil
}

func (b *ByocGcp) getLogStream(ctx context.Context, gcpLogsClient GcpLogsClient, req *defangv1.TailRequest) *LogStream {
Expand Down
56 changes: 16 additions & 40 deletions src/pkg/cli/client/byoc/gcp/byoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/base64"
"errors"
"io"
"iter"
"os"
"testing"
"time"
Expand Down Expand Up @@ -51,16 +51,24 @@ func TestSetUpCD(t *testing.T) {
}

type MockGcpLogsClient struct {
lister gcp.Lister
tailer gcp.Tailer
listEntries []*loggingpb.LogEntry
tailEntries []*loggingpb.LogEntry
}

func (m MockGcpLogsClient) ListLogEntries(ctx context.Context, query string, order gcp.Order) (gcp.Lister, error) {
return m.lister, nil
func mockEntryIter(entries []*loggingpb.LogEntry) iter.Seq2[[]*loggingpb.LogEntry, error] {
return func(yield func([]*loggingpb.LogEntry, error) bool) {
if !yield(entries, nil) {
return
}
}
}

func (m MockGcpLogsClient) NewTailer(ctx context.Context) (gcp.Tailer, error) {
return m.tailer, nil
func (m MockGcpLogsClient) ListLogEntries(ctx context.Context, query string, order gcp.Order) (iter.Seq2[[]*loggingpb.LogEntry, error], error) {
return mockEntryIter(m.listEntries), nil
}

func (m MockGcpLogsClient) TailLogEntries(ctx context.Context, query string) (iter.Seq2[[]*loggingpb.LogEntry, error], error) {
return mockEntryIter(m.tailEntries), nil
}
func (m MockGcpLogsClient) GetExecutionEnv(ctx context.Context, executionName string) (map[string]string, error) {
return nil, nil
Expand All @@ -77,35 +85,6 @@ func (m MockGcpLogsClient) GetBuildInfo(ctx context.Context, buildId string) (*g
}, nil
}

type MockGcpLoggingLister struct {
logEntries []*loggingpb.LogEntry
}

func (m *MockGcpLoggingLister) Next() (*loggingpb.LogEntry, error) {
if len(m.logEntries) > 0 {
entry := m.logEntries[0]
m.logEntries = m.logEntries[1:]
return entry, nil
}
return nil, io.EOF
}

type MockGcpLoggingTailer struct {
MockGcpLoggingLister
}

func (m *MockGcpLoggingTailer) Close() error {
return nil
}

func (m *MockGcpLoggingTailer) Start(ctx context.Context, query string) error {
return nil
}

func (m *MockGcpLoggingTailer) Next(ctx context.Context) (*loggingpb.LogEntry, error) {
return m.MockGcpLoggingLister.Next()
}

func TestGetLogStream(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -175,10 +154,7 @@ func TestGetLogStream(t *testing.T) {
b := NewByocProvider(ctx, "testTenantID", "")
b.cdExecution = tt.cdExecution

driver := &MockGcpLogsClient{
lister: &MockGcpLoggingLister{},
tailer: &MockGcpLoggingTailer{},
}
driver := &MockGcpLogsClient{}

logStream := b.getLogStream(ctx, driver, tt.req)

Expand Down
128 changes: 62 additions & 66 deletions src/pkg/cli/client/byoc/gcp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,22 @@ type LogParser[T any] func(*loggingpb.LogEntry) ([]*T, error)
type LogFilter[T any] func(entry T) T

type GcpLogsClient interface {
ListLogEntries(ctx context.Context, query string, order gcp.Order) (gcp.Lister, error)
NewTailer(ctx context.Context) (gcp.Tailer, error)
ListLogEntries(ctx context.Context, query string, order gcp.Order) (iter.Seq2[[]*loggingpb.LogEntry, error], error)
TailLogEntries(ctx context.Context, query string) (iter.Seq2[[]*loggingpb.LogEntry, error], error)
GetExecutionEnv(ctx context.Context, executionName string) (map[string]string, error)
GetProjectID() gcp.ProjectId
GetBuildInfo(ctx context.Context, buildId string) (*gcp.BuildTag, error)
}

type ServerStream[T any] struct {
ctx context.Context
gcpLogsClient GcpLogsClient
parse LogParser[T]
filters []LogFilter[*T]
query *Query
}

func NewServerStream[T any](ctx context.Context, gcpLogsClient GcpLogsClient, parse LogParser[T], filters ...LogFilter[*T]) *ServerStream[T] {
func NewServerStream[T any](gcpLogsClient GcpLogsClient, parse LogParser[T], filters ...LogFilter[*T]) *ServerStream[T] {
return &ServerStream[T]{
ctx: ctx,
gcpLogsClient: gcpLogsClient,
parse: parse,
filters: filters,
Expand All @@ -63,90 +61,87 @@ func isContextCanceledError(err error) bool {
}

// Follow returns an iterator that queries historical logs then tails live logs.
func (s *ServerStream[T]) Follow(start time.Time) (iter.Seq2[*T, error], error) {
tailer, err := s.gcpLogsClient.NewTailer(s.ctx)
func (s *ServerStream[T]) Follow(ctx context.Context, start time.Time) (iter.Seq2[*T, error], error) {
query := s.query.GetQuery()
shouldList := !start.IsZero() && start.Unix() > 0 && time.Since(start) > 10*time.Millisecond
// Establish tail connection eagerly so the server starts buffering entries while we list historical logs
tailIter, err := s.gcpLogsClient.TailLogEntries(ctx, query)
if err != nil {
return nil, err
}
query := s.query.GetQuery()
shouldList := !start.IsZero() && start.Unix() > 0 && time.Since(start) > 10*time.Millisecond
term.Debugf("Query and tail logs since %v with query: \n%v", start, query)
return func(yield func(*T, error) bool) {
defer tailer.Close()
// Only query older logs if start time is more than 10ms ago
if shouldList {
lister, err := s.gcpLogsClient.ListLogEntries(s.ctx, query, gcp.OrderAscending)
listIter, err := s.gcpLogsClient.ListLogEntries(ctx, query, gcp.OrderAscending)
if err != nil {
yield(nil, err)
return
}
if !s.yieldList(yield, lister, 0) {
if !s.yieldList(yield, listIter, 0) {
return
}
}

// Start tailing logs after all older logs are processed
if err := tailer.Start(s.ctx, query); err != nil {
yield(nil, err)
return
}
for {
entry, err := tailer.Next(s.ctx)
// Tail live logs after all older logs are processed
for entries, err := range tailIter {
if err != nil {
if context.Cause(s.ctx) == io.EOF || errors.Is(err, io.EOF) {
if context.Cause(ctx) == io.EOF || errors.Is(err, io.EOF) {
return
}
if isContextCanceledError(err) {
if cause := context.Cause(s.ctx); cause != nil {
if cause := context.Cause(ctx); cause != nil {
yield(nil, cause)
}
return
}
yield(nil, err)
return
}
resps, err := s.parseAndFilter(entry)
if err != nil {
yield(nil, err)
return
}
for _, resp := range resps {
if !yield(resp, nil) {
for _, entry := range entries {
resps, err := s.parseAndFilter(entry)
if err != nil {
yield(nil, err)
return
}
for _, resp := range resps {
if !yield(resp, nil) {
return
}
}
}
}
}, nil
}

// Head returns an iterator that queries logs in ascending order.
func (s *ServerStream[T]) Head(limit int32) iter.Seq2[*T, error] {
func (s *ServerStream[T]) Head(ctx context.Context, limit int32) iter.Seq2[*T, error] {
query := s.query.GetQuery()
term.Debugf("Query logs with query: \n%v", query)
return func(yield func(*T, error) bool) {
lister, err := s.gcpLogsClient.ListLogEntries(s.ctx, query, gcp.OrderAscending)
listIter, err := s.gcpLogsClient.ListLogEntries(ctx, query, gcp.OrderAscending)
if err != nil {
yield(nil, err)
return
}
s.yieldList(yield, lister, limit)
s.yieldList(yield, listIter, limit)
}
}

// Tail returns an iterator that queries logs in descending order, reversing if a limit is set.
func (s *ServerStream[T]) Tail(limit int32) iter.Seq2[*T, error] {
func (s *ServerStream[T]) Tail(ctx context.Context, limit int32) iter.Seq2[*T, error] {
query := s.query.GetQuery()
term.Debugf("Query logs with query: \n%v", query)
return func(yield func(*T, error) bool) {
lister, err := s.gcpLogsClient.ListLogEntries(s.ctx, query, gcp.OrderDescending)
listIter, err := s.gcpLogsClient.ListLogEntries(ctx, query, gcp.OrderDescending)
if err != nil {
yield(nil, err)
return
}
if limit == 0 {
s.yieldList(yield, lister, 0)
s.yieldList(yield, listIter, 0)
} else {
buffer, err := s.listToBuffer(lister, limit)
buffer, err := s.listToBuffer(listIter, limit)
if err != nil {
yield(nil, err)
return
Expand All @@ -161,51 +156,52 @@ func (s *ServerStream[T]) Tail(limit int32) iter.Seq2[*T, error] {
}
}

// yieldList yields items from lister to yield. Returns true if iteration completed
// (EOF or limit reached), false if the consumer stopped or an error was yielded.
func (s *ServerStream[T]) yieldList(yield func(*T, error) bool, lister gcp.Lister, limit int32) bool {
// yieldList yields items from entries to yield. Returns true if iteration completed
// (end of entries or limit reached), false if the consumer stopped or an error was yielded.
func (s *ServerStream[T]) yieldList(yield func(*T, error) bool, seq iter.Seq2[[]*loggingpb.LogEntry, error], limit int32) bool {
count := int32(0)
for {
if limit > 0 && count >= limit {
return true
}
entry, err := lister.Next()
for entries, err := range seq {
if err != nil {
if errors.Is(err, io.EOF) {
return true
}
yield(nil, err)
return false
}
resps, err := s.parseAndFilter(entry)
if err != nil {
yield(nil, err)
return false
}
for _, resp := range resps {
count++
if !yield(resp, nil) {
for _, entry := range entries {
resps, err := s.parseAndFilter(entry)
if err != nil {
yield(nil, err)
return false
}
for _, resp := range resps {
count++
if !yield(resp, nil) {
return false
}
if limit > 0 && count >= limit {
return true
}
}
}
}
return true
}

func (s *ServerStream[T]) listToBuffer(lister gcp.Lister, limit int32) ([]*T, error) {
func (s *ServerStream[T]) listToBuffer(seq iter.Seq2[[]*loggingpb.LogEntry, error], limit int32) ([]*T, error) {
buffer := make([]*T, 0, limit)
for range limit {
entry, err := lister.Next()
for entries, err := range seq {
if err != nil {
if errors.Is(err, io.EOF) {
return buffer, nil
}
return nil, err
}
resps, err := s.parseAndFilter(entry)
if err != nil {
return nil, err
for _, entry := range entries {
resps, err := s.parseAndFilter(entry)
if err != nil {
return nil, err
}
buffer = append(buffer, resps...)
if len(buffer) >= int(limit) {
buffer = buffer[:limit]
return buffer, nil
}
}
buffer = append(buffer, resps...)
}
return buffer, nil
}
Expand Down Expand Up @@ -251,7 +247,7 @@ func NewLogStream(ctx context.Context, gcpLogsClient GcpLogsClient, services []s
return entry
})

ss := NewServerStream(ctx, gcpLogsClient, getLogEntryParser(ctx, gcpLogsClient), restoreServiceName)
ss := NewServerStream(gcpLogsClient, getLogEntryParser(ctx, gcpLogsClient), restoreServiceName)
ss.query = NewLogQuery(gcpLogsClient.GetProjectID())
return &LogStream{ServerStream: ss}
}
Expand Down Expand Up @@ -312,7 +308,7 @@ func NewSubscribeStream(ctx context.Context, driver GcpLogsClient, waitForCD boo
}),
)

ss := NewServerStream(ctx, driver, getActivityParser(ctx, driver, waitForCD, etag), filters...)
ss := NewServerStream(driver, getActivityParser(ctx, driver, waitForCD, etag), filters...)
ss.query = NewSubscribeQuery()
return &SubscribeStream{ServerStream: ss}
}
Expand Down
Loading
Loading