Skip to content

[WIP] Synchronous and Asynchronous reconnection #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: reconnection-sink
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions instrumented/reconnector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package instrumented

import (
"github.com/utilitywarehouse/go-pubsub"
"sync"
"time"
)

var _ pubsub.MessageSink = (*ReconnectSink)(nil)

// ReconnectSink represents a Sink decorator
// that enhance others sink with retry logic
// mechanism in case the connection drops
type ReconnectSink struct {
sync.RWMutex
newSink func() (pubsub.MessageSink, error)

options *ReconnectionOptions
sink pubsub.MessageSink
needReconnect chan struct{}
}

// ReconnectionOptions represents the options
// to configure and hook up to reconnection events
type ReconnectionOptions struct {
ReconnectSynchronously bool
OnReconnectFailed func(err error)
OnReconnectSuccess func(sink pubsub.MessageSink, err error)
}

// NewReconnectorSink creates a new reconnector sink
func NewReconnectorSink(
newSink func() (pubsub.MessageSink, error),
options *ReconnectionOptions,
) (pubsub.MessageSink, error) {
pubSubSink, err := newSink()

if err != nil {
return nil, err
}

reconnectSink := &ReconnectSink{
newSink: newSink,
options: options,
sink: pubSubSink,
needReconnect: make(chan struct{}),
}

go reconnectSink.reconnect()

return reconnectSink, nil
}

// PutMessage decorates the pubsub.MessageSink interface making
// aware the sink of disconnection errors
func (mq *ReconnectSink) PutMessage(m pubsub.ProducerMessage) error {
if mq.options.ReconnectSynchronously == false {
mq.RLock()
defer mq.RUnlock()
}

err := mq.sink.PutMessage(m)

if err != nil {
status, errStatus := mq.Status()
if errStatus == nil && status.Working == true {
return mq.PutMessage(m)
}
}

return err
}

// Close decorates the pubsub.MessageSink interface making
// aware the sink of disconnection errors
func (mq *ReconnectSink) Close() error {
if mq.options.ReconnectSynchronously == false {
mq.RLock()
defer mq.RUnlock()
}
return mq.sink.Close()
}

// Status decorates the pubsub.MessageSink interface making
// aware the sink of disconnection errors
func (mq *ReconnectSink) Status() (*pubsub.Status, error) {
if mq.options.ReconnectSynchronously == false {
mq.RLock()
defer mq.RUnlock()
}

status, err := mq.sink.Status()

if err != nil {
return status, err
}

if status.Working == false {
if mq.options.ReconnectSynchronously == true {
mq.Lock()
mq.retryStrategy(nil)
mq.Unlock()
return mq.Status()
}

mq.needReconnect <- struct{}{}
}

return status, err
}

func (mq *ReconnectSink) setSink(s pubsub.MessageSink) (sink pubsub.MessageSink, err error) {
if mq.options.ReconnectSynchronously == false {
mq.Lock()
defer mq.Unlock()
}
if mq.sink != nil {
err = mq.sink.Close()
}
mq.sink = s

return s, err
}

func (mq *ReconnectSink) reconnect() {
// If the reconnector is synchronous
// we can just clean up this go routine
if mq.options.ReconnectSynchronously == true {
close(mq.needReconnect)
return
}

reconnecting := false
reconnected := make(chan struct{})

for {
select {
case <-mq.needReconnect:
if reconnecting != true {
reconnecting = true
go mq.retryStrategy(reconnected)
}
case <-reconnected:
reconnecting = false
}
}
}

func (mq *ReconnectSink) retryStrategy(reconnected chan struct{}) {
t := time.NewTicker(time.Second * 2)
for {
<-t.C

newSink, err := mq.newSink()

if err != nil {
// Fire OnReconnectFailed if we have a func passed
if mq.options.OnReconnectFailed != nil {
mq.options.OnReconnectFailed(err)
}
} else {
sink, err := mq.setSink(newSink)

// Fire OnReconnectSuccess if we have a func passed
if mq.options != nil && mq.options.OnReconnectSuccess != nil {
mq.options.OnReconnectSuccess(sink, err)
}

if reconnected != nil {
reconnected <- struct{}{}
}

t.Stop()

break
}
}
}
127 changes: 127 additions & 0 deletions instrumented/reconnector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package instrumented_test

import (
"errors"
"github.com/stretchr/testify/assert"
"github.com/utilitywarehouse/go-pubsub"
"github.com/utilitywarehouse/go-pubsub/instrumented"
"github.com/utilitywarehouse/go-pubsub/mocks"
"sync"
"testing"
)

//go:generate mockery -dir ../ -name MessageSink --output ../mocks -case underscore
//go:generate mockery -dir ../ -name ProducerMessage --output ../mocks -case underscore

func TestNewReconnectorSink(t *testing.T) {
sink, err := instrumented.NewReconnectorSink(
func() (pubsub.MessageSink, error) {
return &mocks.MessageSink{}, nil
},
&instrumented.ReconnectionOptions{},
)

if err != nil {
t.Fail()
}

assert.Nil(t, err)
assert.NotNil(t, sink)
}

func TestWillReconnectAsynchronouslyWhenPutMessageDetectADisconnectionError(t *testing.T) {
sinkMock := &mocks.MessageSink{}

wg := sync.WaitGroup{}
wg.Add(1)

sink, err := instrumented.NewReconnectorSink(func() (pubsub.MessageSink, error) {
return sinkMock, nil
}, &instrumented.ReconnectionOptions{
OnReconnectSuccess: func(sink pubsub.MessageSink, err error) {
wg.Done()
},
})

if err != nil {
t.Fail()
}

called := 0
sinkMock.
On("PutMessage", &mocks.ProducerMessage{}).
Return(func (m pubsub.ProducerMessage) (error) {
if called == 0 {
called++
return errors.New("broker can't be reached")
}
return nil
})

sinkMock.
On("Status").
Return(&pubsub.Status{
Working: false,
}, nil)

sinkMock.On("Close").Return(nil)

err = sink.PutMessage(&mocks.ProducerMessage{})

// Will return error right away ( re-connection happens under the hood )
assert.NotNil(t, err)

wg.Wait()

// Re-Calling put message should now succeed
err = sink.PutMessage(&mocks.ProducerMessage{})

assert.Nil(t, err)
}

func TestWillReconnectSynchronouslyWhenPutMessageDetectADisconnectionError(t *testing.T) {
sinkMock := &mocks.MessageSink{}

sink, err := instrumented.NewReconnectorSink(func() (pubsub.MessageSink, error) {
return sinkMock, nil
}, &instrumented.ReconnectionOptions{
ReconnectSynchronously: true,
})

if err != nil {
t.Fail()
}

putMessageCalled := 0
sinkMock.
On("PutMessage", &mocks.ProducerMessage{}).
Return(func (m pubsub.ProducerMessage) (error) {
if putMessageCalled == 0 {
putMessageCalled++
return errors.New("broker can't be reached")
}
return nil
})

statusCalled := 0
sinkMock.
On("Status").
Return(func () (*pubsub.Status) {
if statusCalled == 0 {
statusCalled++
return &pubsub.Status{
Working: false,
}
}
return &pubsub.Status{
Working: true,
}
}, nil)

sinkMock.On("Close").Return(nil)

err = sink.PutMessage(&mocks.ProducerMessage{})

// reconnection happened synchronously
assert.Nil(t, err)
}
62 changes: 62 additions & 0 deletions mocks/message_sink.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading