Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

story(storage): support mongodb for reading events #65

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/serve_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,19 @@ func withServeGrpcCmd() func(*viper.Viper) *cobra.Command {
}
zap.L().Info("listening for grpc requests", zap.String("addr", addr))

evrys := grpc.NewEvrysService(zap.L())
// this will fail to validate right now and will not work!
evrys, err := grpc.NewEvrysService(grpc.EvrysServiceConfig{
EventStore: nil,
Log: zap.L(),
})
if err != nil {
zap.L().Fatal(
"unexpected error when initializing evrys service",
zap.String("addr", addr),
zap.Error(err),
)
return
}
err = evrys.Serve(cmd.Context(), ls)
if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
zap.L().Fatal(
Expand Down
28 changes: 28 additions & 0 deletions eventstore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,34 @@ func (p *PutError) Unwrap() error {
return p.Err
}

// QueryError defines an error when putting data into a database
type QueryError struct {
Source string
QueriedType string
Err error
IsNotFound bool
}

// NewQueryError creates a new PutError
func NewQueryError(source, queriedType string, isNotFound bool, err error) *QueryError {
return &QueryError{
Source: source,
QueriedType: queriedType,
Err: err,
IsNotFound: isNotFound,
}
}

// Error returns a string form of the error and implements the error interface
func (p *QueryError) Error() string {
return fmt.Sprintf("failed to put %s into %s. NotFound: %v. Error: %s", p.QueriedType, p.Source, p.IsNotFound, p.Err)
}

// Unwrap returns the inner error, making it compatible with errors.Unwrap
func (p *QueryError) Unwrap() error {
return p.Err
}

// InvalidValidationError Alias for validator package validator.InvalidValidationError
var InvalidValidationError = validator.InvalidValidationError{}

Expand Down
76 changes: 76 additions & 0 deletions eventstore/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -157,3 +158,78 @@ func (m *Mongo) Append(ctx context.Context, event *event.Event) error {

return nil
}

func (m *Mongo) GetByID(ctx context.Context, id string) (*event.Event, error) {
m.logger.Info(
"attempting to lookup event by id",
zap.String("id", id),
)

coll := m.client.Database(m.config.Database).Collection(m.config.Collection)

filter := bson.D{{Key: "id", Value: id}}
result := coll.FindOne(ctx, filter)
if result.Err() != nil {
if errors.Is(result.Err(), mongo.ErrNoDocuments) {
m.logger.Error(
"event with id does not exist",
zap.String("id", id),
zap.Error(result.Err()),
)
return nil, NewQueryError("mongo", "*event.Event", true, result.Err())
}
m.logger.Error(
"failed to lookup event by id",
zap.String("id", id),
zap.Error(result.Err()),
)
return nil, NewQueryError("mongo", "*event.Event", false, result.Err())
}

var b bson.D
err := result.Decode(&b)
if err != nil {
return nil, err
}

// remove mongo id field since cloud events doesn't like it
// it will always be the first index
b, err = removeElement(b, 0)
if err != nil {
return nil, err
}

jsb, err := bson.MarshalExtJSON(b, false, false)
if err != nil {
return nil, err
}

ev := event.New()
err = ev.UnmarshalJSON(jsb)
if err != nil {
return nil, err
}

return &ev, nil
}

func (m *Mongo) EventsBefore(ctx context.Context, until time.Time) (events []*event.Event, cursor any, err error) {
return nil, nil, nil
}

func removeElement[T any](s []T, i int) ([]T, error) {
// s is [1,2,3,4,5,6], i is 2

// perform bounds checking first to prevent a panic!
if i >= len(s) || i < 0 {
return nil, fmt.Errorf("index is out of range. index is %d with slice length %d", i, len(s))
}

// copy first element (1) to index `i`. At this point,
// `s` will be [1,2,1,4,5,6]
s[i] = s[0]
// Remove the first element from the slice by truncating it
// This way, `s` will now include all the elements from index 1
// until the element at the last index
return s[1:], nil
}
19 changes: 19 additions & 0 deletions eventstore/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,23 @@ func TestMongoIntegration(t *testing.T) {
}
req.True(idCheck && specVersionCheck && sourceCheck && typeCheck && subjectCheck && dataContentTypeCheck && timeCheck && dataCheck,
"all values have not been verified")

// validated its there

// test single lookup
lookupEv, err := mongoImpl.GetByID(ctx, id)
req.NoError(err, "should not fail")
req.NotNil(lookupEv)
req.Equal(id, lookupEv.ID(), "ids should match")
req.Equal("1.0", lookupEv.SpecVersion(), "spec version should match")
req.Equal("mongo_test", lookupEv.Source(), "source not expected value")
req.Equal("test", lookupEv.Type(), "type not expected value")
req.Equal("test", lookupEv.Subject(), "subject not expected value")
req.Equal(curTime, lookupEv.Time(), "time is not equal")
var m map[string]interface{}
err = lookupEv.DataAs(&m)
req.NoError(err, "should be able to convert")
value, ok := m["hello"]
req.True(ok, "key 'hello' should exist")
req.Equal("world", value, "value does not match expected")
}
7 changes: 7 additions & 0 deletions eventstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventstore

import (
"context"
"time"

"github.com/cloudevents/sdk-go/v2/event"
)
Expand All @@ -11,3 +12,9 @@ type AppendOnly interface {
// AppendEvent pushes an event to the event store and assumes that the event has already been validated before receiving
Append(ctx context.Context, event *event.Event) error
}

// Query looks up events in event store
type Query interface {
GetByID(ctx context.Context, id string) (*event.Event, error)
EventsBefore(ctx context.Context, until time.Time) (events []*event.Event, cursor any, err error)
}
8 changes: 8 additions & 0 deletions grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package grpc

import "github.com/z5labs/evrys/eventstore"

// EventStore combines single method interfaces from eventstore into one interface
type EventStore interface {
eventstore.AppendOnly
}
58 changes: 51 additions & 7 deletions grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package grpc

import (
"context"
"fmt"
"net"

"github.com/go-playground/validator/v10"
evryspb "github.com/z5labs/evrys/proto"

format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
Expand All @@ -29,22 +31,35 @@ import (
"google.golang.org/grpc/status"
)

// ErrServerStopped wraps grpc.ErrServerStopped
var ErrServerStopped = grpc.ErrServerStopped

var _ evryspb.EvrysServer = &EvrysService{}

// EvrysServiceConfig stores configuration values for evrys service
type EvrysServiceConfig struct {
EventStore EventStore `validate:"required"`
Log *zap.Logger `validate:"required"`
}

// EvrysService is defines the grpc for evrys and implements the interface from evrys proto
type EvrysService struct {
evryspb.UnimplementedEvrysServer

log *zap.Logger
config EvrysServiceConfig
}

func NewEvrysService(logger *zap.Logger) *EvrysService {
return &EvrysService{
log: logger,
// NewEvrysService creates an instance of EvrysService
func NewEvrysService(config EvrysServiceConfig) (*EvrysService, error) {
if err := validator.New().Struct(&config); err != nil {
return nil, fmt.Errorf("failed to validate config")
}

return &EvrysService{
config: config,
}, nil
}

// Serve creates and runs the grpc server
func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error {
grpcServer := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
evryspb.RegisterEvrysServer(grpcServer, s)
Expand All @@ -70,25 +85,54 @@ func (s *EvrysService) Serve(ctx context.Context, ls net.Listener) error {
}
}

// GetEvent retrieves an event from the event store
func (s *EvrysService) GetEvent(ctx context.Context, req *evryspb.GetEventRequest) (*cloudeventpb.CloudEvent, error) {
return nil, status.Error(codes.NotFound, "")
}

// RecordEvent records an event in the event store
func (s *EvrysService) RecordEvent(ctx context.Context, req *cloudeventpb.CloudEvent) (*evryspb.RecordEventResponse, error) {
event, err := format.FromProto(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
s.log.Info(
s.config.Log.Info(
"received event",
zap.String("event_id", event.ID()),
zap.String("event_source", event.Source()),
zap.String("event_type", event.Type()),
)

err = event.Validate()
if err != nil {
s.config.Log.Error(
"cloud event failed to validate",
zap.Error(err),
zap.String("event_id", event.ID()),
zap.String("event_source", event.Source()),
zap.String("event_type", event.Type()),
)
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("cloud event failed validation: %s", err.Error()))
}

err = s.config.EventStore.Append(ctx, event)
if err != nil {
s.config.Log.Error(
"failed to append event to event store",
zap.Error(err),
zap.String("event_id", event.ID()),
zap.String("event_source", event.Source()),
zap.String("event_type", event.Type()),
)

return nil, status.Error(codes.Internal, fmt.Sprintf("failed to append event to event store: %s", err.Error()))
}

return new(evryspb.RecordEventResponse), nil
}

// SliceEvents retrieves multiple events from the event store
func (s *EvrysService) SliceEvents(req *evryspb.SliceEventsRequest, stream evryspb.Evrys_SliceEventsServer) error {
s.log.Info("received slice request")
s.config.Log.Info("received slice request")
return nil
}
Loading