diff --git a/cmd/serve_grpc.go b/cmd/serve_grpc.go index 1c1189b..dceec6b 100644 --- a/cmd/serve_grpc.go +++ b/cmd/serve_grpc.go @@ -48,7 +48,19 @@ func withServeGrpcCmd() func(*viper.Viper) *cobra.Command { } zap.L().Info("listening for grpc requests", zap.String("addr", addr)) - evrys := grpc.NewEvrysService(zap.L()) + // this will fail to validate right now and will not work! + evrys, err := grpc.NewEvrysService(grpc.EvrysServiceConfig{ + EventStore: nil, + Log: zap.L(), + }) + if err != nil { + zap.L().Fatal( + "unexpected error when initializing evrys service", + zap.String("addr", addr), + zap.Error(err), + ) + return + } err = evrys.Serve(cmd.Context(), ls) if err != nil && !errors.Is(err, grpc.ErrServerStopped) { zap.L().Fatal( diff --git a/grpc/grpc.go b/grpc/grpc.go new file mode 100644 index 0000000..a4110a7 --- /dev/null +++ b/grpc/grpc.go @@ -0,0 +1,8 @@ +package grpc + +import "github.com/z5labs/evrys/eventstore" + +// EventStore combines single method interfaces from eventstore into one interface +type EventStore interface { + eventstore.AppendOnly +} diff --git a/grpc/service.go b/grpc/service.go index fe78c3e..72f0644 100644 --- a/grpc/service.go +++ b/grpc/service.go @@ -16,8 +16,10 @@ package grpc import ( "context" + "fmt" "net" + "github.com/go-playground/validator/v10" evryspb "github.com/z5labs/evrys/proto" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" @@ -29,22 +31,35 @@ import ( "google.golang.org/grpc/status" ) +// ErrServerStopped wraps grpc.ErrServerStopped var ErrServerStopped = grpc.ErrServerStopped var _ evryspb.EvrysServer = &EvrysService{} +// EvrysServiceConfig stores configuration values for evrys service +type EvrysServiceConfig struct { + EventStore EventStore `validate:"required"` + Log *zap.Logger `validate:"required"` +} + +// EvrysService is defines the grpc for evrys and implements the interface from evrys proto type EvrysService struct { evryspb.UnimplementedEvrysServer - - log *zap.Logger + config EvrysServiceConfig } -func NewEvrysService(logger *zap.Logger) *EvrysService { - return &EvrysService{ - log: logger, +// NewEvrysService creates an instance of EvrysService +func NewEvrysService(config EvrysServiceConfig) (*EvrysService, error) { + if err := validator.New().Struct(&config); err != nil { + return nil, fmt.Errorf("failed to validate config") } + + return &EvrysService{ + config: config, + }, nil } +// Serve creates and runs the grpc server func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error { grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) evryspb.RegisterEvrysServer(grpcServer, s) @@ -70,25 +85,54 @@ func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error { } } +// GetEvent retrieves an event from the event store func (s *EvrysService) GetEvent(ctx context.Context, req *evryspb.GetEventRequest) (*cloudeventpb.CloudEvent, error) { return nil, status.Error(codes.NotFound, "") } +// RecordEvent records an event in the event store func (s *EvrysService) RecordEvent(ctx context.Context, req *cloudeventpb.CloudEvent) (*evryspb.RecordEventResponse, error) { event, err := format.FromProto(req) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - s.log.Info( + s.config.Log.Info( "received event", zap.String("event_id", event.ID()), zap.String("event_source", event.Source()), zap.String("event_type", event.Type()), ) + + err = event.Validate() + if err != nil { + s.config.Log.Error( + "cloud event failed to validate", + zap.Error(err), + zap.String("event_id", event.ID()), + zap.String("event_source", event.Source()), + zap.String("event_type", event.Type()), + ) + return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("cloud event failed validation: %s", err.Error())) + } + + err = s.config.EventStore.Append(ctx, event) + if err != nil { + s.config.Log.Error( + "failed to append event to event store", + zap.Error(err), + zap.String("event_id", event.ID()), + zap.String("event_source", event.Source()), + zap.String("event_type", event.Type()), + ) + + return nil, status.Error(codes.Internal, fmt.Sprintf("failed to append event to event store: %s", err.Error())) + } + return new(evryspb.RecordEventResponse), nil } +// SliceEvents retrieves multiple events from the event store func (s *EvrysService) SliceEvents(req *evryspb.SliceEventsRequest, stream evryspb.Evrys_SliceEventsServer) error { - s.log.Info("received slice request") + s.config.Log.Info("received slice request") return nil } diff --git a/grpc/service_test.go b/grpc/service_test.go index 68604aa..5184515 100644 --- a/grpc/service_test.go +++ b/grpc/service_test.go @@ -26,6 +26,7 @@ import ( evryspb "github.com/z5labs/evrys/proto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -33,6 +34,22 @@ import ( "google.golang.org/grpc/status" ) +func TestNewEvrysService(t *testing.T) { + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: nil, + Log: nil, + }) + require.Nil(t, s, "service should be nil") + require.Error(t, err, "new evrys service should return an error") + + s, err = NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + require.NotNil(t, s, "service should not be nil") + require.NoError(t, err, "new evrys service should not return an error") +} + func TestEvrysService_GetEvent(t *testing.T) { t.Run("will return not found", func(t *testing.T) { t.Run("if no events exists in the store", func(t *testing.T) { @@ -44,7 +61,13 @@ func TestEvrysService_GetEvent(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { @@ -96,7 +119,13 @@ func TestEvrysService_RecordEvent(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { @@ -112,18 +141,153 @@ func TestEvrysService_RecordEvent(t *testing.T) { if !assert.Nil(t, err) { return } + ev := event.New() + ev.SetID("adsf") + ev.SetSource("asdf") + ev.SetSubject("asdfasfd") + ev.SetType("asdfas") + ev.SetData(event.ApplicationJSON, map[string]interface{}{"adfasd": 1}) + ev.SetTime(time.Now().UTC()) + event, err := format.ToProto(&ev) + if !assert.Nil(t, err) { + return + } - event, err := format.ToProto(&event.Event{}) + client := evryspb.NewEvrysClient(cc) + _, err = client.RecordEvent(ctx, event) if !assert.Nil(t, err) { return } + cancel() + + select { + case <-ctx.Done(): + case err := <-errChan: + if !assert.Nil(t, err) { + return + } + } + }) + }) + t.Run("will fail", func(t *testing.T) { + t.Run("event validation fail", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ls, err := net.Listen("tcp", ":0") + if !assert.Nil(t, err) { + return + } + + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } + + errChan := make(chan error, 1) + go func(ls net.Listener) { + defer close(errChan) + + err := s.Serve(ctx, ls) + if err != nil && err != ErrServerStopped { + errChan <- err + } + }(ls) + + cc, err := grpc.Dial(ls.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if !assert.Nil(t, err) { + return + } + + // empty event should fail + ev := event.New() + + event, err := format.ToProto(&ev) + if !assert.NoError(t, err) { + return + } + client := evryspb.NewEvrysClient(cc) _, err = client.RecordEvent(ctx, event) + if !assert.Error(t, err) { + return + } + + grpcStatus := status.Convert(err) + require.Equal(t, codes.FailedPrecondition.String(), grpcStatus.Code().String(), "expected failed precondition error") + + cancel() + + select { + case <-ctx.Done(): + case err := <-errChan: + if !assert.Nil(t, err) { + return + } + } + }) + + t.Run("event append fail", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ls, err := net.Listen("tcp", ":0") + if !assert.Nil(t, err) { + return + } + + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{ + Fail: true, + }, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } + + errChan := make(chan error, 1) + go func(ls net.Listener) { + defer close(errChan) + + err := s.Serve(ctx, ls) + if err != nil && err != ErrServerStopped { + errChan <- err + } + }(ls) + + cc, err := grpc.Dial(ls.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if !assert.Nil(t, err) { return } + // empty event should fail + ev := event.New() + ev.SetID("adsf") + ev.SetSource("asdf") + ev.SetSubject("asdfasfd") + ev.SetType("asdfas") + ev.SetData(event.ApplicationJSON, map[string]interface{}{"adfasd": 1}) + ev.SetTime(time.Now().UTC()) + + event, err := format.ToProto(&ev) + if !assert.Nil(t, err) { + return + } + + client := evryspb.NewEvrysClient(cc) + _, err = client.RecordEvent(ctx, event) + if !assert.Error(t, err) { + return + } + + grpcStatus := status.Convert(err) + require.Equal(t, codes.Internal.String(), grpcStatus.Code().String(), "expected internal error") + cancel() select { @@ -148,7 +312,13 @@ func TestEvrysService_SliceEvents(t *testing.T) { return } - s := NewEvrysService(zap.L()) + s, err := NewEvrysService(EvrysServiceConfig{ + EventStore: &mockEventStore{}, + Log: zap.L(), + }) + if !assert.NoError(t, err, "error should be nil") { + return + } errChan := make(chan error, 1) go func(ls net.Listener) { diff --git a/grpc/testing.go b/grpc/testing.go index 5aad467..bb55f9b 100644 --- a/grpc/testing.go +++ b/grpc/testing.go @@ -16,15 +16,29 @@ package grpc import ( "context" + "errors" "net" evryspb "github.com/z5labs/evrys/proto" cloudeventpb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/event" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) +type mockEventStore struct { + Fail bool +} + +func (m *mockEventStore) Append(ctx context.Context, event *event.Event) error { + if m.Fail { + return errors.New("test fail") + } + + return nil +} + func MockEvrys(opts ...func(MockEvrysService) MockEvrysService) MockEvrysService { s := MockEvrysService{} for _, opt := range opts {