-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathsession_window.go
140 lines (119 loc) · 3.5 KB
/
session_window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package flow
import (
"sync"
"time"
"github.com/reugn/go-streams"
)
// SessionWindow generates groups of elements by sessions of activity.
// Session windows do not overlap and do not have a fixed start and end time.
// T indicates the incoming element type, and the outgoing element type is []T.
type SessionWindow[T any] struct {
mu sync.Mutex
inactivityGap time.Duration
buffer []T
in chan any
out chan any
reset chan struct{}
done chan struct{}
}
// Verify SessionWindow satisfies the Flow interface.
var _ streams.Flow = (*SessionWindow[any])(nil)
// NewSessionWindow returns a new SessionWindow operator.
// T specifies the incoming element type, and the outgoing element type is []T.
//
// inactivityGap is the gap of inactivity that closes a session window when occurred.
func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T] {
sessionWindow := &SessionWindow[T]{
inactivityGap: inactivityGap,
in: make(chan any),
out: make(chan any),
reset: make(chan struct{}),
done: make(chan struct{}),
}
// start buffering incoming stream elements
go sessionWindow.receive()
// emit a session window based on the gap of inactivity
go sessionWindow.emit()
return sessionWindow
}
// Via asynchronously streams data to the given Flow and returns it.
func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow {
go sw.transmit(flow)
return flow
}
// To streams data to the given Sink and blocks until the Sink has completed
// processing all data.
func (sw *SessionWindow[T]) To(sink streams.Sink) {
sw.transmit(sink)
sink.AwaitCompletion()
}
// Out returns the output channel of the SessionWindow operator.
func (sw *SessionWindow[T]) Out() <-chan any {
return sw.out
}
// In returns the input channel of the SessionWindow operator.
func (sw *SessionWindow[T]) In() chan<- any {
return sw.in
}
// transmit submits closed windows to the next Inlet.
func (sw *SessionWindow[T]) transmit(inlet streams.Inlet) {
for window := range sw.out {
inlet.In() <- window
}
close(inlet.In())
}
// receive buffers the incoming elements.
// It resets the inactivity timer on each new element.
func (sw *SessionWindow[T]) receive() {
for element := range sw.in {
sw.mu.Lock()
sw.buffer = append(sw.buffer, element.(T))
sw.mu.Unlock()
sw.notifyTimerReset() // signal to reset the inactivity timer
}
close(sw.done)
}
// notifyTimerReset sends a notification to reset the inactivity timer.
func (sw *SessionWindow[T]) notifyTimerReset() {
select {
case sw.reset <- struct{}{}:
default:
}
}
// emit captures and emits a session window based on the gap of inactivity.
// When this period expires, the current session closes and subsequent elements
// are assigned to a new session window.
func (sw *SessionWindow[T]) emit() {
timer := time.NewTimer(sw.inactivityGap)
for {
select {
case <-timer.C:
sw.dispatchWindow()
case <-sw.reset:
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(sw.inactivityGap)
case <-sw.done:
timer.Stop()
sw.dispatchWindow()
close(sw.out)
return
}
}
}
// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SessionWindow[T]) dispatchWindow() {
sw.mu.Lock()
defer sw.mu.Unlock()
windowElements := sw.buffer
sw.buffer = nil
// send elements if the window is not empty
if len(windowElements) > 0 {
sw.out <- windowElements
}
}