Skip to content

Commit de3d90a

Browse files
(feat): add a way to create a custom event processor taking advantage of Exec… (#113)
* add a way to create a custom event processor taking advantage of ExecutionContext * remove redundant ticker start and bath setting * add full test coverage * fix race condition * add option config stuff * add comment for options function * up code coverage * refactor to include all option configs * fix lint * rename and drop timers package
1 parent cda1d9a commit de3d90a

File tree

4 files changed

+204
-76
lines changed

4 files changed

+204
-76
lines changed

optimizely/client/factory.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
4545
appClient := &OptimizelyClient{
4646
executionCtx: executionCtx,
4747
decisionService: decision.NewCompositeService(f.SDKKey),
48-
eventProcessor: event.NewEventProcessor(executionCtx, event.DefaultBatchSize, event.DefaultEventQueueSize, event.DefaultEventFlushInterval),
48+
eventProcessor: event.NewEventProcessor(executionCtx, event.BatchSize(event.DefaultBatchSize),
49+
event.QueueSize(event.DefaultEventQueueSize), event.FlushInterval(event.DefaultEventFlushInterval)),
4950
}
5051

5152
for _, opt := range clientOptions {
@@ -97,7 +98,8 @@ func DecisionService(decisionService decision.Service) OptionFunc {
9798
// BatchEventProcessor sets event processor on a client
9899
func BatchEventProcessor(batchSize, queueSize int, flushInterval time.Duration) OptionFunc {
99100
return func(f *OptimizelyClient, executionCtx utils.ExecutionCtx) {
100-
f.eventProcessor = event.NewEventProcessor(executionCtx, batchSize, queueSize, flushInterval)
101+
f.eventProcessor = event.NewEventProcessor(executionCtx, event.BatchSize(batchSize),
102+
event.QueueSize(queueSize), event.FlushInterval(flushInterval))
101103
}
102104
}
103105

optimizely/event/factory_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,24 @@ func BuildTestConversionEvent() UserEvent {
104104
return conversionUserEvent
105105
}
106106

107+
func TestCreateEmptyEvent(t *testing.T) {
108+
109+
impressionUserEvent := BuildTestImpressionEvent()
110+
111+
impressionUserEvent.Impression = nil
112+
impressionUserEvent.Conversion = nil
113+
114+
visitor := createVisitorFromUserEvent(impressionUserEvent)
115+
116+
assert.Nil(t, visitor.Snapshots)
117+
118+
}
119+
107120
func TestCreateAndSendImpressionEvent(t *testing.T) {
108121

109122
impressionUserEvent := BuildTestImpressionEvent()
110123

111-
processor := NewEventProcessor(utils.NewCancelableExecutionCtx(), 10, 100, 100)
124+
processor := NewEventProcessor(utils.NewCancelableExecutionCtx(), BatchSize(10), QueueSize(100), FlushInterval(100))
112125

113126
processor.ProcessEvent(impressionUserEvent)
114127

@@ -123,7 +136,7 @@ func TestCreateAndSendConversionEvent(t *testing.T) {
123136

124137
conversionUserEvent := BuildTestConversionEvent()
125138

126-
processor := NewEventProcessor(utils.NewCancelableExecutionCtx(), 10, 100, 100)
139+
processor := NewEventProcessor(utils.NewCancelableExecutionCtx(), FlushInterval(100))
127140

128141
processor.ProcessEvent(conversionUserEvent)
129142

optimizely/event/processor.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,75 @@ const DefaultEventFlushInterval = 30 * time.Second
5656

5757
var pLogger = logging.GetLogger("EventProcessor")
5858

59+
// QPConfigOption is the QueuingProcessor options that give you the ability to add one more more options before the processor is initialized.
60+
type QPConfigOption func(qp *QueueingEventProcessor)
61+
62+
// BatchSize sets the batch size as a config option to be passed into the NewProcessor method
63+
func BatchSize(bsize int) QPConfigOption {
64+
return func(qp *QueueingEventProcessor) {
65+
qp.BatchSize = bsize
66+
}
67+
}
68+
69+
// QueueSize sets the queue size as a config option to be passed into the NewProcessor method
70+
func QueueSize(qsize int) QPConfigOption {
71+
return func(qp *QueueingEventProcessor) {
72+
qp.MaxQueueSize = qsize
73+
}
74+
}
75+
76+
// FlushInterval sets the flush interval as a config option to be passed into the NewProcessor method
77+
func FlushInterval(flushInterval time.Duration) QPConfigOption {
78+
return func(qp *QueueingEventProcessor) {
79+
qp.FlushInterval = flushInterval
80+
}
81+
}
82+
83+
// PQ sets the Processor Queue as a config option to be passed into the NewProcessor method
84+
func PQ(q Queue) QPConfigOption {
85+
return func(qp *QueueingEventProcessor) {
86+
qp.Q = q
87+
}
88+
}
89+
// PDispatcher sets the Processor Dispatcher as a config option to be passed into the NewProcessor method
90+
func PDispatcher(d Dispatcher) QPConfigOption {
91+
return func(qp *QueueingEventProcessor) {
92+
qp.EventDispatcher = d
93+
}
94+
}
95+
5996
// NewEventProcessor returns a new instance of QueueingEventProcessor with queueSize and flushInterval
60-
func NewEventProcessor(exeCtx utils.ExecutionCtx, batchSize, queueSize int, flushInterval time.Duration) *QueueingEventProcessor {
97+
func NewEventProcessor(exeCtx utils.ExecutionCtx, options ...QPConfigOption) *QueueingEventProcessor {
6198
p := &QueueingEventProcessor{
62-
MaxQueueSize: queueSize,
63-
FlushInterval: flushInterval,
64-
Q: NewInMemoryQueue(queueSize),
65-
EventDispatcher: NewQueueEventDispatcher(exeCtx.GetContext()),
6699
wg: exeCtx.GetWaitSync(),
67100
}
68-
p.BatchSize = DefaultBatchSize
69-
if batchSize > 0 {
70-
p.BatchSize = batchSize
101+
102+
for _, opt := range options {
103+
opt(p)
104+
}
105+
106+
if p.MaxQueueSize == 0 {
107+
p.MaxQueueSize = defaultQueueSize
108+
}
109+
110+
if p.FlushInterval == 0 {
111+
p.FlushInterval = DefaultEventFlushInterval
112+
}
113+
114+
if p.BatchSize == 0 {
115+
p.BatchSize = DefaultBatchSize
116+
}
117+
118+
if p.Q == nil {
119+
p.Q = NewInMemoryQueue(p.MaxQueueSize)
120+
}
121+
122+
if p.EventDispatcher == nil {
123+
p.EventDispatcher = NewQueueEventDispatcher(exeCtx.GetContext())
71124
}
72125

73126
p.StartTicker(exeCtx.GetContext())
127+
74128
return p
75129
}
76130

0 commit comments

Comments
 (0)