Skip to content

Commit 83a1ab4

Browse files
author
Michael Ng
authored
feat(event): Use the cancelable context in the event processor. (#73)
1 parent 0e8e95e commit 83a1ab4

File tree

4 files changed

+81
-18
lines changed

4 files changed

+81
-18
lines changed

optimizely/client/factory.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ func (f OptimizelyFactory) ClientWithOptions(clientOptions Options) (*Optimizely
118118
}
119119

120120
// @TODO: allow event processor to be passed in
121-
// @TODO: pass the context object to the event processor
122-
client.eventProcessor = event.NewEventProcessor(defaultEventQueueSize, defaultEventFlushInterval)
121+
client.eventProcessor = event.NewEventProcessor(ctx, defaultEventQueueSize, defaultEventFlushInterval)
123122
client.isValid = true
124123
return client, nil
125124
}

optimizely/event/factory_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package event
22

33
import (
4+
"context"
45
"math/rand"
56
"testing"
67
"time"
@@ -88,10 +89,11 @@ func BuildTestConversionEvent() UserEvent {
8889
}
8990

9091
func TestCreateAndSendImpressionEvent(t *testing.T) {
92+
ctx := context.Background()
9193

9294
impressionUserEvent := BuildTestImpressionEvent()
9395

94-
processor := NewEventProcessor(100, 100)
96+
processor := NewEventProcessor(ctx, 100, 100)
9597

9698
processor.ProcessEvent(impressionUserEvent)
9799

@@ -103,10 +105,11 @@ func TestCreateAndSendImpressionEvent(t *testing.T) {
103105
}
104106

105107
func TestCreateAndSendConversionEvent(t *testing.T) {
108+
ctx := context.Background()
106109

107110
conversionUserEvent := BuildTestConversionEvent()
108111

109-
processor := NewEventProcessor(100, 100)
112+
processor := NewEventProcessor(ctx, 100, 100)
110113

111114
processor.ProcessEvent(conversionUserEvent)
112115

optimizely/event/processor.go

+19-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package event
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67
"time"
@@ -27,10 +28,15 @@ type QueueingEventProcessor struct {
2728
var pLogger = logging.GetLogger("EventProcessor")
2829

2930
// NewEventProcessor returns a new instance of QueueingEventProcessor with queueSize and flushInterval
30-
func NewEventProcessor(queueSize int, flushInterval time.Duration) *QueueingEventProcessor {
31-
p := &QueueingEventProcessor{MaxQueueSize: queueSize, FlushInterval: flushInterval, Q: NewInMemoryQueue(queueSize), EventDispatcher: &HTTPEventDispatcher{}}
31+
func NewEventProcessor(ctx context.Context, queueSize int, flushInterval time.Duration) *QueueingEventProcessor {
32+
p := &QueueingEventProcessor{
33+
MaxQueueSize: queueSize,
34+
FlushInterval: flushInterval,
35+
Q: NewInMemoryQueue(queueSize),
36+
EventDispatcher: &HTTPEventDispatcher{},
37+
}
3238
p.BatchSize = 10
33-
p.StartTicker()
39+
p.StartTicker(ctx)
3440
return p
3541
}
3642

@@ -61,14 +67,21 @@ func (p *QueueingEventProcessor) Remove(count int) []interface{} {
6167
}
6268

6369
// StartTicker starts new ticker for flushing events
64-
func (p *QueueingEventProcessor) StartTicker() {
70+
func (p *QueueingEventProcessor) StartTicker(ctx context.Context) {
6571
if p.Ticker != nil {
6672
return
6773
}
6874
p.Ticker = time.NewTicker(p.FlushInterval * time.Millisecond)
6975
go func() {
70-
for range p.Ticker.C {
71-
p.FlushEvents()
76+
for {
77+
select {
78+
case <-p.Ticker.C:
79+
p.FlushEvents()
80+
case <-ctx.Done():
81+
pLogger.Debug("Event processor stopped, flushing events.")
82+
p.FlushEvents()
83+
return
84+
}
7285
}
7386
}()
7487
}

optimizely/event/processor_test.go

+56-8
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package event
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

78
"github.com/stretchr/testify/assert"
89
)
910

1011
func TestDefaultEventProcessor_ProcessImpression(t *testing.T) {
11-
processor := NewEventProcessor(100, 100)
12+
ctx := context.Background()
13+
14+
processor := NewEventProcessor(ctx, 100, 100)
1215

1316
impression := BuildTestImpressionEvent()
1417

@@ -21,7 +24,6 @@ func TestDefaultEventProcessor_ProcessImpression(t *testing.T) {
2124
assert.NotNil(t, processor.Ticker)
2225

2326
assert.Equal(t, 0, processor.EventsCount())
24-
2527
}
2628

2729
type MockDispatcher struct {
@@ -34,9 +36,14 @@ func (f *MockDispatcher) DispatchEvent(event LogEvent, callback func(success boo
3436
}
3537

3638
func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
37-
processor := &QueueingEventProcessor{MaxQueueSize: 100, FlushInterval: 100, Q: NewInMemoryQueue(100), EventDispatcher: &MockDispatcher{}}
39+
processor := &QueueingEventProcessor{
40+
MaxQueueSize: 100,
41+
FlushInterval: 100,
42+
Q: NewInMemoryQueue(100),
43+
EventDispatcher: &MockDispatcher{},
44+
}
3845
processor.BatchSize = 10
39-
processor.StartTicker()
46+
processor.StartTicker(context.TODO())
4047

4148
impression := BuildTestImpressionEvent()
4249
conversion := BuildTestConversionEvent()
@@ -63,10 +70,46 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
6370
}
6471
}
6572

73+
func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {
74+
ctx, cancelFn := context.WithCancel(context.Background())
75+
processor := &QueueingEventProcessor{
76+
MaxQueueSize: 100,
77+
FlushInterval: 30 * time.Second,
78+
Q: NewInMemoryQueue(100),
79+
EventDispatcher: &MockDispatcher{},
80+
}
81+
processor.BatchSize = 10
82+
processor.StartTicker(ctx)
83+
84+
impression := BuildTestImpressionEvent()
85+
conversion := BuildTestConversionEvent()
86+
87+
processor.ProcessEvent(impression)
88+
processor.ProcessEvent(impression)
89+
processor.ProcessEvent(conversion)
90+
processor.ProcessEvent(conversion)
91+
92+
assert.Equal(t, 4, processor.EventsCount())
93+
94+
time.Sleep(500 * time.Millisecond)
95+
96+
// Triggers the flush in the processor
97+
cancelFn()
98+
99+
time.Sleep(500 * time.Millisecond)
100+
101+
assert.Equal(t, 0, processor.EventsCount())
102+
}
103+
66104
func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
67-
processor := &QueueingEventProcessor{MaxQueueSize: 100, FlushInterval: 100, Q: NewInMemoryQueue(100), EventDispatcher: &MockDispatcher{}}
105+
processor := &QueueingEventProcessor{
106+
MaxQueueSize: 100,
107+
FlushInterval: 100,
108+
Q: NewInMemoryQueue(100),
109+
EventDispatcher: &MockDispatcher{},
110+
}
68111
processor.BatchSize = 10
69-
processor.StartTicker()
112+
processor.StartTicker(context.TODO())
70113

71114
impression := BuildTestImpressionEvent()
72115
conversion := BuildTestConversionEvent()
@@ -95,9 +138,14 @@ func TestDefaultEventProcessor_ProcessBatchRevisionMismatch(t *testing.T) {
95138
}
96139

97140
func TestDefaultEventProcessor_ProcessBatchProjectMismatch(t *testing.T) {
98-
processor := &QueueingEventProcessor{MaxQueueSize: 100, FlushInterval: 100, Q: NewInMemoryQueue(100), EventDispatcher: &MockDispatcher{}}
141+
processor := &QueueingEventProcessor{
142+
MaxQueueSize: 100,
143+
FlushInterval: 100,
144+
Q: NewInMemoryQueue(100),
145+
EventDispatcher: &MockDispatcher{},
146+
}
99147
processor.BatchSize = 10
100-
processor.StartTicker()
148+
processor.StartTicker(context.TODO())
101149

102150
impression := BuildTestImpressionEvent()
103151
conversion := BuildTestConversionEvent()

0 commit comments

Comments
 (0)