Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

story(notifications): support apache kafka #55

Closed
29 changes: 27 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,64 @@ go 1.19
require (
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.12.0
github.com/cloudevents/sdk-go/v2 v2.12.0
github.com/docker/go-connections v0.4.0
github.com/segmentio/kafka-go v0.4.36
github.com/spf13/cobra v1.6.0
github.com/spf13/viper v1.13.0
github.com/stretchr/testify v1.8.1
github.com/testcontainers/testcontainers-go v0.15.0
go.uber.org/zap v1.17.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Microsoft/hcsshim v0.9.4 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/containerd v1.6.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
Expand Down
725 changes: 722 additions & 3 deletions go.sum

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions notification/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2022 Z5Labs and Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package notification provides a common abstraction over various notification bus services.
package notification

import (
"context"
"errors"
"fmt"
"strings"

evryspb "github.com/z5labs/evrys/proto"
)

var (
ErrMissingLogger = errors.New("logger is required")
)

// Bus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive comment needed

type Bus interface {
// Publish
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive comment needed

Publish(context.Context, *evryspb.Notification) error
}

// Error represents a generic error a Bus implementation encountered
type Error struct {
Bus string
Cause error
}

func (e Error) Error() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return fmt.Sprintf("%s: encountered unexpected error: %s", e.Bus, e.Cause)
}

func (e Error) Unwrap() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return e.Cause
}

// MarshalError
type MarshalError struct {
Protocol string
Cause error
}

func (e MarshalError) Error() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return fmt.Sprintf("%s: failed to marshal notification: %s", e.Protocol, e.Cause)
}

func (e MarshalError) Unwrap() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return e.Cause
}

// ConfigurationError
type ConfigurationError struct {
Bus string
Cause error
}

func (e ConfigurationError) Error() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return fmt.Sprintf("%s: invalid configuration: %s", e.Bus, e.Cause)
}

func (e ConfigurationError) Unwrap() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return e.Cause
}

// ValidationError
type ValidationError struct {
Cause error
}

func (e ValidationError) Error() string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return fmt.Sprintf("notification: must provide a valid notification: %s", e.Cause)
}

func (e ValidationError) Unwrap() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment for exported function

return e.Cause
}

var (
ErrInvalidEventType = errors.New("event type must be non-empty and non-blank")
ErrInvalidEventId = errors.New("event id must be non-empty and non-blank")
ErrInvalidEventSource = errors.New("event source must be non-empty and non-blank")
Comment on lines +93 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comments for exported types

)

type validator func(*evryspb.Notification) error

func validateNotification(n *evryspb.Notification) error {
validators := []validator{
validateEventType,
validateEventId,
validateEventSource,
}
for _, validator := range validators {
err := validator(n)
if err != nil {
return err
}
}
return nil
}

func validateEventType(n *evryspb.Notification) error {
eventType := strings.TrimSpace(n.EventType)
if len(eventType) == 0 {
return ErrInvalidEventType
}
return nil
}

func validateEventId(n *evryspb.Notification) error {
eventId := strings.TrimSpace(n.EventId)
if len(eventId) == 0 {
return ErrInvalidEventId
}
return nil
}

func validateEventSource(n *evryspb.Notification) error {
eventSource := strings.TrimSpace(n.EventSource)
if len(eventSource) == 0 {
return ErrInvalidEventSource
}
return nil
}
142 changes: 142 additions & 0 deletions notification/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2022 Z5Labs and Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notification

import (
"testing"

evryspb "github.com/z5labs/evrys/proto"

"github.com/stretchr/testify/assert"
)

func TestValidateNotification(t *testing.T) {
t.Run("will report a notification as invalid", func(t *testing.T) {
testCases := []struct {
Condition string
N *evryspb.Notification
Err error
}{
// Event Type cases
{
Condition: "if the event type is not set",
N: &evryspb.Notification{
EventSource: "source",
EventId: "id",
},
Err: ErrInvalidEventType,
},
{
Condition: "if the event type is an empty string",
N: &evryspb.Notification{
EventSource: "source",
EventId: "id",
EventType: "",
},
Err: ErrInvalidEventType,
},
{
Condition: "if the event type is a blank string",
N: &evryspb.Notification{
EventSource: "source",
EventId: "id",
EventType: " ",
},
Err: ErrInvalidEventType,
},
// Event Id cases
{
Condition: "if the event id is not set",
N: &evryspb.Notification{
EventSource: "source",
EventType: "type",
},
Err: ErrInvalidEventId,
},
{
Condition: "if the event id is an empty string",
N: &evryspb.Notification{
EventSource: "source",
EventId: "",
EventType: "type",
},
Err: ErrInvalidEventId,
},
{
Condition: "if the event id is a blank string",
N: &evryspb.Notification{
EventSource: "source",
EventId: " ",
EventType: "type",
},
Err: ErrInvalidEventId,
},
// Event Source cases
{
Condition: "if the event source is not set",
N: &evryspb.Notification{
EventId: "id",
EventType: "type",
},
Err: ErrInvalidEventSource,
},
{
Condition: "if the event source is an empty string",
N: &evryspb.Notification{
EventSource: "",
EventId: "id",
EventType: "type",
},
Err: ErrInvalidEventSource,
},
{
Condition: "if the event source is a blank string",
N: &evryspb.Notification{
EventSource: " ",
EventId: "id",
EventType: "type",
},
Err: ErrInvalidEventSource,
},
}

for _, testCase := range testCases {
t.Run(testCase.Condition, func(t *testing.T) {
err := validateNotification(testCase.N)
if !assert.Error(t, err) {
return
}
if !assert.Equal(t, testCase.Err, err) {
return
}
})
}
})

t.Run("will report a notification as valid", func(t *testing.T) {
t.Run("if the event id, source, and type are set to non blank or empty strings", func(t *testing.T) {
n := &evryspb.Notification{
EventSource: "source",
EventId: "id",
EventType: "type",
}

err := validateNotification(n)
if !assert.Nil(t, err) {
return
}
})
})
}
Loading