diff --git a/cmd/serve_grpc.go b/cmd/serve_grpc.go index 1c1189b..dceec6b 100644 --- a/cmd/serve_grpc.go +++ b/cmd/serve_grpc.go @@ -48,7 +48,19 @@ func withServeGrpcCmd() func(*viper.Viper) *cobra.Command { } zap.L().Info("listening for grpc requests", zap.String("addr", addr)) - evrys := grpc.NewEvrysService(zap.L()) + // this will fail to validate right now and will not work! + evrys, err := grpc.NewEvrysService(grpc.EvrysServiceConfig{ + EventStore: nil, + Log: zap.L(), + }) + if err != nil { + zap.L().Fatal( + "unexpected error when initializing evrys service", + zap.String("addr", addr), + zap.Error(err), + ) + return + } err = evrys.Serve(cmd.Context(), ls) if err != nil && !errors.Is(err, grpc.ErrServerStopped) { zap.L().Fatal( diff --git a/eventstore/errors.go b/eventstore/errors.go index 08f63da..fd2fecd 100644 --- a/eventstore/errors.go +++ b/eventstore/errors.go @@ -82,6 +82,34 @@ func (p *PutError) Unwrap() error { return p.Err } +// QueryError defines an error when putting data into a database +type QueryError struct { + Source string + QueriedType string + Err error + IsNotFound bool +} + +// NewQueryError creates a new PutError +func NewQueryError(source, queriedType string, isNotFound bool, err error) *QueryError { + return &QueryError{ + Source: source, + QueriedType: queriedType, + Err: err, + IsNotFound: isNotFound, + } +} + +// Error returns a string form of the error and implements the error interface +func (p *QueryError) Error() string { + return fmt.Sprintf("failed to put %s into %s. NotFound: %v. Error: %s", p.QueriedType, p.Source, p.IsNotFound, p.Err) +} + +// Unwrap returns the inner error, making it compatible with errors.Unwrap +func (p *QueryError) Unwrap() error { + return p.Err +} + // InvalidValidationError Alias for validator package validator.InvalidValidationError var InvalidValidationError = validator.InvalidValidationError{} diff --git a/eventstore/mongo.go b/eventstore/mongo.go index e70be09..98fedda 100644 --- a/eventstore/mongo.go +++ b/eventstore/mongo.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/cloudevents/sdk-go/v2/event" "github.com/go-playground/validator/v10" @@ -157,3 +158,78 @@ func (m *Mongo) Append(ctx context.Context, event *event.Event) error { return nil } + +func (m *Mongo) GetByID(ctx context.Context, id string) (*event.Event, error) { + m.logger.Info( + "attempting to lookup event by id", + zap.String("id", id), + ) + + coll := m.client.Database(m.config.Database).Collection(m.config.Collection) + + filter := bson.D{{Key: "id", Value: id}} + result := coll.FindOne(ctx, filter) + if result.Err() != nil { + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + m.logger.Error( + "event with id does not exist", + zap.String("id", id), + zap.Error(result.Err()), + ) + return nil, NewQueryError("mongo", "*event.Event", true, result.Err()) + } + m.logger.Error( + "failed to lookup event by id", + zap.String("id", id), + zap.Error(result.Err()), + ) + return nil, NewQueryError("mongo", "*event.Event", false, result.Err()) + } + + var b bson.D + err := result.Decode(&b) + if err != nil { + return nil, err + } + + // remove mongo id field since cloud events doesn't like it + // it will always be the first index + b, err = removeElement(b, 0) + if err != nil { + return nil, err + } + + jsb, err := bson.MarshalExtJSON(b, false, false) + if err != nil { + return nil, err + } + + ev := event.New() + err = ev.UnmarshalJSON(jsb) + if err != nil { + return nil, err + } + + return &ev, nil +} + +func (m *Mongo) EventsBefore(ctx context.Context, until time.Time) (events []*event.Event, cursor any, err error) { + return nil, nil, nil +} + +func removeElement[T any](s []T, i int) ([]T, error) { + // s is [1,2,3,4,5,6], i is 2 + + // perform bounds checking first to prevent a panic! + if i >= len(s) || i < 0 { + return nil, fmt.Errorf("index is out of range. index is %d with slice length %d", i, len(s)) + } + + // copy first element (1) to index `i`. At this point, + // `s` will be [1,2,1,4,5,6] + s[i] = s[0] + // Remove the first element from the slice by truncating it + // This way, `s` will now include all the elements from index 1 + // until the element at the last index + return s[1:], nil +} diff --git a/eventstore/mongo_test.go b/eventstore/mongo_test.go index 56ef693..60410c6 100644 --- a/eventstore/mongo_test.go +++ b/eventstore/mongo_test.go @@ -272,4 +272,23 @@ func TestMongoIntegration(t *testing.T) { } req.True(idCheck && specVersionCheck && sourceCheck && typeCheck && subjectCheck && dataContentTypeCheck && timeCheck && dataCheck, "all values have not been verified") + + // validated its there + + // test single lookup + lookupEv, err := mongoImpl.GetByID(ctx, id) + req.NoError(err, "should not fail") + req.NotNil(lookupEv) + req.Equal(id, lookupEv.ID(), "ids should match") + req.Equal("1.0", lookupEv.SpecVersion(), "spec version should match") + req.Equal("mongo_test", lookupEv.Source(), "source not expected value") + req.Equal("test", lookupEv.Type(), "type not expected value") + req.Equal("test", lookupEv.Subject(), "subject not expected value") + req.Equal(curTime, lookupEv.Time(), "time is not equal") + var m map[string]interface{} + err = lookupEv.DataAs(&m) + req.NoError(err, "should be able to convert") + value, ok := m["hello"] + req.True(ok, "key 'hello' should exist") + req.Equal("world", value, "value does not match expected") } diff --git a/eventstore/store.go b/eventstore/store.go index 0ae8ab9..97217e3 100644 --- a/eventstore/store.go +++ b/eventstore/store.go @@ -2,6 +2,7 @@ package eventstore import ( "context" + "time" "github.com/cloudevents/sdk-go/v2/event" ) @@ -11,3 +12,9 @@ type AppendOnly interface { // AppendEvent pushes an event to the event store and assumes that the event has already been validated before receiving Append(ctx context.Context, event *event.Event) error } + +// Query looks up events in event store +type Query interface { + GetByID(ctx context.Context, id string) (*event.Event, error) + EventsBefore(ctx context.Context, until time.Time) (events []*event.Event, cursor any, err error) +} diff --git a/grpc/grpc.go b/grpc/grpc.go new file mode 100644 index 0000000..a4110a7 --- /dev/null +++ b/grpc/grpc.go @@ -0,0 +1,8 @@ +package grpc + +import "github.com/z5labs/evrys/eventstore" + +// EventStore combines single method interfaces from eventstore into one interface +type EventStore interface { + eventstore.AppendOnly +} diff --git a/grpc/service.go b/grpc/service.go index fe78c3e..72f0644 100644 --- a/grpc/service.go +++ b/grpc/service.go @@ -16,8 +16,10 @@ package grpc import ( "context" + "fmt" "net" + "github.com/go-playground/validator/v10" evryspb "github.com/z5labs/evrys/proto" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" @@ -29,22 +31,35 @@ import ( "google.golang.org/grpc/status" ) +// ErrServerStopped wraps grpc.ErrServerStopped var ErrServerStopped = grpc.ErrServerStopped var _ evryspb.EvrysServer = &EvrysService{} +// EvrysServiceConfig stores configuration values for evrys service +type EvrysServiceConfig struct { + EventStore EventStore `validate:"required"` + Log *zap.Logger `validate:"required"` +} + +// EvrysService is defines the grpc for evrys and implements the interface from evrys proto type EvrysService struct { evryspb.UnimplementedEvrysServer - - log *zap.Logger + config EvrysServiceConfig } -func NewEvrysService(logger *zap.Logger) *EvrysService { - return &EvrysService{ - log: logger, +// NewEvrysService creates an instance of EvrysService +func NewEvrysService(config EvrysServiceConfig) (*EvrysService, error) { + if err := validator.New().Struct(&config); err != nil { + return nil, fmt.Errorf("failed to validate config") } + + return &EvrysService{ + config: config, + }, nil } +// Serve creates and runs the grpc server func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error { grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) evryspb.RegisterEvrysServer(grpcServer, s) @@ -70,25 +85,54 @@ func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error { } } +// GetEvent retrieves an event from the event store func (s *EvrysService) GetEvent(ctx context.Context, req *evryspb.GetEventRequest) (*cloudeventpb.CloudEvent, error) { return nil, status.Error(codes.NotFound, "") } +// RecordEvent records an event in the event store func (s *EvrysService) RecordEvent(ctx context.Context, req *cloudeventpb.CloudEvent) (*evryspb.RecordEventResponse, error) { event, err := format.FromProto(req) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - s.log.Info( + s.config.Log.Info( "received event", zap.String("event_id", event.ID()), zap.String("event_source", event.Source()), zap.String("event_type", event.Type()), ) + + err = event.Validate() + if err != nil { + s.config.Log.Error( + "cloud event failed to validate", + zap.Error(err), + zap.String("event_id", event.ID()), + zap.String("event_source", event.Source()), + zap.String("event_type", event.Type()), + ) + return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("cloud event failed validation: %s", err.Error())) + } + + err = s.config.EventStore.Append(ctx, event) + if err != nil { + s.config.Log.Error( + "failed to append event to event store", + zap.Error(err), + zap.String("event_id", event.ID()), + zap.String("event_source", event.Source()), + zap.String("event_type", event.Type()), + ) + + return nil, status.Error(codes.Internal, fmt.Sprintf("failed to append event to event store: %s", err.Error())) + } + return new(evryspb.RecordEventResponse), nil } +// SliceEvents retrieves multiple events from the event store func (s *EvrysService) SliceEvents(req *evryspb.SliceEventsRequest, stream evryspb.Evrys_SliceEventsServer) error { - s.log.Info("received slice request") + s.config.Log.Info("received slice request") return nil } diff --git a/grpc/service_test.go b/grpc/service_test.go index 68604aa..5184515 100644 --- a/grpc/service_test.go +++ b/grpc/service_test.go @@ -26,6 +26,7 @@ import ( evryspb "github.com/z5labs/evrys/proto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -33,6 +34,22 @@ import ( "google.golang.org/grpc/status" ) +func TestNewEvrysService(t *testing.T) { + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: nil, + Log: nil, + }) + require.Nil(t, s, "service should be nil") + require.Error(t, err, "new evrys service should return an error") + + s, err = NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + require.NotNil(t, s, "service should not be nil") + require.NoError(t, err, "new evrys service should not return an error") +} + func TestEvrysService_GetEvent(t *testing.T) { t.Run("will return not found", func(t *testing.T) { t.Run("if no events exists in the store", func(t *testing.T) { @@ -44,7 +61,13 @@ func TestEvrysService_GetEvent(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { @@ -96,7 +119,13 @@ func TestEvrysService_RecordEvent(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { @@ -112,18 +141,153 @@ func TestEvrysService_RecordEvent(t *testing.T) { if !assert.Nil(t, err) { return } + ev := event.New() + ev.SetID("adsf") + ev.SetSource("asdf") + ev.SetSubject("asdfasfd") + ev.SetType("asdfas") + ev.SetData(event.ApplicationJSON, map[string]interface{}{"adfasd": 1}) + ev.SetTime(time.Now().UTC()) + event, err := format.ToProto(&ev) + if !assert.Nil(t, err) { + return + } - event, err := format.ToProto(&event.Event{}) + client := evryspb.NewEvrysClient(cc) + _, err = client.RecordEvent(ctx, event) if !assert.Nil(t, err) { return } + cancel() + + select { + case <-ctx.Done(): + case err := <-errChan: + if !assert.Nil(t, err) { + return + } + } + }) + }) + t.Run("will fail", func(t *testing.T) { + t.Run("event validation fail", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ls, err := net.Listen("tcp", ":0") + if !assert.Nil(t, err) { + return + } + + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } + + errChan := make(chan error, 1) + go func(ls net.Listener) { + defer close(errChan) + + err := s.Serve(ctx, ls) + if err != nil && err != ErrServerStopped { + errChan <- err + } + }(ls) + + cc, err := grpc.Dial(ls.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if !assert.Nil(t, err) { + return + } + + // empty event should fail + ev := event.New() + + event, err := format.ToProto(&ev) + if !assert.NoError(t, err) { + return + } + client := evryspb.NewEvrysClient(cc) _, err = client.RecordEvent(ctx, event) + if !assert.Error(t, err) { + return + } + + grpcStatus := status.Convert(err) + require.Equal(t, codes.FailedPrecondition.String(), grpcStatus.Code().String(), "expected failed precondition error") + + cancel() + + select { + case <-ctx.Done(): + case err := <-errChan: + if !assert.Nil(t, err) { + return + } + } + }) + + t.Run("event append fail", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ls, err := net.Listen("tcp", ":0") + if !assert.Nil(t, err) { + return + } + + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{ + Fail: true, + }, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } + + errChan := make(chan error, 1) + go func(ls net.Listener) { + defer close(errChan) + + err := s.Serve(ctx, ls) + if err != nil && err != ErrServerStopped { + errChan <- err + } + }(ls) + + cc, err := grpc.Dial(ls.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if !assert.Nil(t, err) { return } + // empty event should fail + ev := event.New() + ev.SetID("adsf") + ev.SetSource("asdf") + ev.SetSubject("asdfasfd") + ev.SetType("asdfas") + ev.SetData(event.ApplicationJSON, map[string]interface{}{"adfasd": 1}) + ev.SetTime(time.Now().UTC()) + + event, err := format.ToProto(&ev) + if !assert.Nil(t, err) { + return + } + + client := evryspb.NewEvrysClient(cc) + _, err = client.RecordEvent(ctx, event) + if !assert.Error(t, err) { + return + } + + grpcStatus := status.Convert(err) + require.Equal(t, codes.Internal.String(), grpcStatus.Code().String(), "expected internal error") + cancel() select { @@ -148,7 +312,13 @@ func TestEvrysService_SliceEvents(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { diff --git a/grpc/testing.go b/grpc/testing.go index 5aad467..bb55f9b 100644 --- a/grpc/testing.go +++ b/grpc/testing.go @@ -16,15 +16,29 @@ package grpc import ( "context" + "errors" "net" evryspb "github.com/z5labs/evrys/proto" cloudeventpb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/event" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) +type mockEventStore struct { + Fail bool +} + +func (m *mockEventStore) Append(ctx context.Context, event *event.Event) error { + if m.Fail { + return errors.New("test fail") + } + + return nil +} + func MockEvrys(opts ...func(MockEvrysService) MockEvrysService) MockEvrysService { s := MockEvrysService{} for _, opt := range opts {