@@ -20,22 +20,23 @@ package event
20
20
import (
21
21
"errors"
22
22
"fmt"
23
+ "sync"
24
+ "time"
25
+
23
26
"github.com/optimizely/go-sdk/pkg/logging"
24
27
"github.com/optimizely/go-sdk/pkg/notification"
25
28
"github.com/optimizely/go-sdk/pkg/registry"
26
29
"github.com/optimizely/go-sdk/pkg/utils"
27
- "sync"
28
- "time"
29
30
)
30
31
31
32
// Processor processes events
32
33
type Processor interface {
33
34
ProcessEvent (event UserEvent )
34
35
}
35
36
36
- // QueueingEventProcessor is used out of the box by the SDK
37
- type QueueingEventProcessor struct {
38
- sdkKey string
37
+ // BatchEventProcessor is used out of the box by the SDK
38
+ type BatchEventProcessor struct {
39
+ sdkKey string
39
40
MaxQueueSize int // max size of the queue before flush
40
41
FlushInterval time.Duration // in milliseconds
41
42
BatchSize int
@@ -56,55 +57,55 @@ const DefaultEventFlushInterval = 30 * time.Second
56
57
57
58
var pLogger = logging .GetLogger ("EventProcessor" )
58
59
59
- // QPConfigOption is the QueuingProcessor options that give you the ability to add one more more options before the processor is initialized.
60
- type QPConfigOption func (qp * QueueingEventProcessor )
60
+ // BPOptionConfig is the BatchProcessor options that give you the ability to add one more more options before the processor is initialized.
61
+ type BPOptionConfig func (qp * BatchEventProcessor )
61
62
62
- // BatchSize sets the batch size as a config option to be passed into the NewProcessor method
63
- func BatchSize (bsize int ) QPConfigOption {
64
- return func (qp * QueueingEventProcessor ) {
63
+ // WithBatchSize sets the batch size as a config option to be passed into the NewProcessor method
64
+ func WithBatchSize (bsize int ) BPOptionConfig {
65
+ return func (qp * BatchEventProcessor ) {
65
66
qp .BatchSize = bsize
66
67
}
67
68
}
68
69
69
- // QueueSize sets the queue size as a config option to be passed into the NewProcessor method
70
- func QueueSize (qsize int ) QPConfigOption {
71
- return func (qp * QueueingEventProcessor ) {
70
+ // WithQueueSize sets the queue size as a config option to be passed into the NewProcessor method
71
+ func WithQueueSize (qsize int ) BPOptionConfig {
72
+ return func (qp * BatchEventProcessor ) {
72
73
qp .MaxQueueSize = qsize
73
74
}
74
75
}
75
76
76
- // FlushInterval sets the flush interval as a config option to be passed into the NewProcessor method
77
- func FlushInterval (flushInterval time.Duration ) QPConfigOption {
78
- return func (qp * QueueingEventProcessor ) {
77
+ // WithFlushInterval sets the flush interval as a config option to be passed into the NewProcessor method
78
+ func WithFlushInterval (flushInterval time.Duration ) BPOptionConfig {
79
+ return func (qp * BatchEventProcessor ) {
79
80
qp .FlushInterval = flushInterval
80
81
}
81
82
}
82
83
83
- // PQ sets the Processor Queue as a config option to be passed into the NewProcessor method
84
- func PQ (q Queue ) QPConfigOption {
85
- return func (qp * QueueingEventProcessor ) {
84
+ // WithQueue sets the Processor Queue as a config option to be passed into the NewProcessor method
85
+ func WithQueue (q Queue ) BPOptionConfig {
86
+ return func (qp * BatchEventProcessor ) {
86
87
qp .Q = q
87
88
}
88
89
}
89
90
90
- // PDispatcher sets the Processor Dispatcher as a config option to be passed into the NewProcessor method
91
- func PDispatcher (d Dispatcher ) QPConfigOption {
92
- return func (qp * QueueingEventProcessor ) {
91
+ // WithEventDispatcher sets the Processor Dispatcher as a config option to be passed into the NewProcessor method
92
+ func WithEventDispatcher (d Dispatcher ) BPOptionConfig {
93
+ return func (qp * BatchEventProcessor ) {
93
94
qp .EventDispatcher = d
94
95
}
95
96
}
96
97
97
- // SDKKey sets the SDKKey used to register for notifications. This should be removed when the project
98
+ // WithSDKKey sets the SDKKey used to register for notifications. This should be removed when the project
98
99
// config supports sdk key.
99
- func SDKKey (sdkKey string ) QPConfigOption {
100
- return func (qp * QueueingEventProcessor ) {
100
+ func WithSDKKey (sdkKey string ) BPOptionConfig {
101
+ return func (qp * BatchEventProcessor ) {
101
102
qp .sdkKey = sdkKey
102
103
}
103
104
}
104
105
105
- // NewEventProcessor returns a new instance of QueueingEventProcessor with queueSize and flushInterval
106
- func NewEventProcessor (options ... QPConfigOption ) * QueueingEventProcessor {
107
- p := & QueueingEventProcessor {}
106
+ // NewBatchEventProcessor returns a new instance of BatchEventProcessor with queueSize and flushInterval
107
+ func NewBatchEventProcessor (options ... BPOptionConfig ) * BatchEventProcessor {
108
+ p := & BatchEventProcessor {}
108
109
109
110
for _ , opt := range options {
110
111
opt (p )
@@ -130,7 +131,7 @@ func NewEventProcessor(options ...QPConfigOption) *QueueingEventProcessor {
130
131
}
131
132
132
133
// Start initializes the event processor
133
- func (p * QueueingEventProcessor ) Start (exeCtx utils.ExecutionCtx ) {
134
+ func (p * BatchEventProcessor ) Start (exeCtx utils.ExecutionCtx ) {
134
135
if p .EventDispatcher == nil {
135
136
p .EventDispatcher = NewQueueEventDispatcher (exeCtx .GetContext ())
136
137
}
@@ -140,7 +141,7 @@ func (p *QueueingEventProcessor) Start(exeCtx utils.ExecutionCtx) {
140
141
}
141
142
142
143
// ProcessEvent processes the given impression event
143
- func (p * QueueingEventProcessor ) ProcessEvent (event UserEvent ) {
144
+ func (p * BatchEventProcessor ) ProcessEvent (event UserEvent ) {
144
145
p .Q .Add (event )
145
146
146
147
if p .Q .Size () >= p .MaxQueueSize {
@@ -151,22 +152,22 @@ func (p *QueueingEventProcessor) ProcessEvent(event UserEvent) {
151
152
}
152
153
153
154
// EventsCount returns size of an event queue
154
- func (p * QueueingEventProcessor ) EventsCount () int {
155
+ func (p * BatchEventProcessor ) EventsCount () int {
155
156
return p .Q .Size ()
156
157
}
157
158
158
159
// GetEvents returns events from event queue for count
159
- func (p * QueueingEventProcessor ) GetEvents (count int ) []interface {} {
160
+ func (p * BatchEventProcessor ) GetEvents (count int ) []interface {} {
160
161
return p .Q .Get (count )
161
162
}
162
163
163
164
// Remove removes events from queue for count
164
- func (p * QueueingEventProcessor ) Remove (count int ) []interface {} {
165
+ func (p * BatchEventProcessor ) Remove (count int ) []interface {} {
165
166
return p .Q .Remove (count )
166
167
}
167
168
168
169
// StartTicker starts new ticker for flushing events
169
- func (p * QueueingEventProcessor ) startTicker (exeCtx utils.ExecutionCtx ) {
170
+ func (p * BatchEventProcessor ) startTicker (exeCtx utils.ExecutionCtx ) {
170
171
if p .Ticker != nil {
171
172
return
172
173
}
@@ -194,7 +195,7 @@ func (p *QueueingEventProcessor) startTicker(exeCtx utils.ExecutionCtx) {
194
195
}
195
196
196
197
// check if user event can be batched in the current batch
197
- func (p * QueueingEventProcessor ) canBatch (current * Batch , user UserEvent ) bool {
198
+ func (p * BatchEventProcessor ) canBatch (current * Batch , user UserEvent ) bool {
198
199
if current .ProjectID == user .EventContext .ProjectID &&
199
200
current .Revision == user .EventContext .Revision {
200
201
return true
@@ -204,13 +205,13 @@ func (p *QueueingEventProcessor) canBatch(current *Batch, user UserEvent) bool {
204
205
}
205
206
206
207
// add the visitor to the current batch
207
- func (p * QueueingEventProcessor ) addToBatch (current * Batch , visitor Visitor ) {
208
+ func (p * BatchEventProcessor ) addToBatch (current * Batch , visitor Visitor ) {
208
209
visitors := append (current .Visitors , visitor )
209
210
current .Visitors = visitors
210
211
}
211
212
212
213
// FlushEvents flushes events in queue
213
- func (p * QueueingEventProcessor ) FlushEvents () {
214
+ func (p * BatchEventProcessor ) FlushEvents () {
214
215
// we flush when queue size is reached.
215
216
// however, if there is a ticker cycle already processing, we should wait
216
217
p .Mux .Lock ()
@@ -275,7 +276,7 @@ func (p *QueueingEventProcessor) FlushEvents() {
275
276
}
276
277
277
278
// OnEventDispatch registers a handler for LogEvent notifications
278
- func (p * QueueingEventProcessor ) OnEventDispatch (callback func (logEvent LogEvent )) (int , error ) {
279
+ func (p * BatchEventProcessor ) OnEventDispatch (callback func (logEvent LogEvent )) (int , error ) {
279
280
notificationCenter := registry .GetNotificationCenter (p .sdkKey )
280
281
281
282
handler := func (payload interface {}) {
@@ -294,7 +295,7 @@ func (p *QueueingEventProcessor) OnEventDispatch(callback func(logEvent LogEvent
294
295
}
295
296
296
297
// RemoveOnEventDispatch removes handler for LogEvent notification with given id
297
- func (p * QueueingEventProcessor ) RemoveOnEventDispatch (id int ) error {
298
+ func (p * BatchEventProcessor ) RemoveOnEventDispatch (id int ) error {
298
299
notificationCenter := registry .GetNotificationCenter (p .sdkKey )
299
300
300
301
if err := notificationCenter .RemoveHandler (id , notification .LogEvent ); err != nil {
0 commit comments