-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsse.go
More file actions
145 lines (126 loc) · 3.44 KB
/
sse.go
File metadata and controls
145 lines (126 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// SPDX-License-Identifier: EUPL-1.2
package api
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// SSEBroker manages Server-Sent Events connections and broadcasts events
// to subscribed clients. Clients connect via a GET endpoint and receive
// a streaming text/event-stream response. Each client may optionally
// subscribe to a specific channel via the ?channel= query parameter.
type SSEBroker struct {
mu sync.RWMutex
clients map[*sseClient]struct{}
}
// sseClient represents a single connected SSE consumer.
type sseClient struct {
channel string
events chan sseEvent
done chan struct{}
}
// sseEvent is an internal representation of a single SSE message.
type sseEvent struct {
Event string
Data string
}
// NewSSEBroker creates a ready-to-use SSE broker.
func NewSSEBroker() *SSEBroker {
return &SSEBroker{
clients: make(map[*sseClient]struct{}),
}
}
// Publish sends an event to all clients subscribed to the given channel.
// Clients subscribed to an empty channel (no ?channel= param) receive
// events on every channel. The data value is JSON-encoded before sending.
func (b *SSEBroker) Publish(channel, event string, data any) {
encoded, err := json.Marshal(data)
if err != nil {
return
}
msg := sseEvent{
Event: event,
Data: string(encoded),
}
b.mu.RLock()
defer b.mu.RUnlock()
for client := range b.clients {
// Send to clients on the matching channel, or clients with no channel filter.
if client.channel == "" || client.channel == channel {
select {
case client.events <- msg:
case <-client.done:
default:
// Drop event if client buffer is full.
}
}
}
}
// Handler returns a Gin handler for the SSE endpoint. Clients connect with
// a GET request and receive events as text/event-stream. An optional
// ?channel=<name> query parameter subscribes the client to a specific channel.
func (b *SSEBroker) Handler() gin.HandlerFunc {
return func(c *gin.Context) {
channel := c.Query("channel")
client := &sseClient{
channel: channel,
events: make(chan sseEvent, 64),
done: make(chan struct{}),
}
b.mu.Lock()
b.clients[client] = struct{}{}
b.mu.Unlock()
defer func() {
close(client.done)
b.mu.Lock()
delete(b.clients, client)
b.mu.Unlock()
}()
// Set SSE headers.
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Status(http.StatusOK)
c.Writer.Flush()
// Stream events until client disconnects.
ctx := c.Request.Context()
for {
select {
case <-ctx.Done():
return
case evt := <-client.events:
_, err := fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", evt.Event, evt.Data)
if err != nil {
return
}
// Flush to ensure the event is sent immediately.
if f, ok := c.Writer.(http.Flusher); ok {
f.Flush()
}
}
}
}
}
// ClientCount returns the number of currently connected SSE clients.
func (b *SSEBroker) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// Drain closes all connected clients by writing an empty response.
// Useful for graceful shutdown.
func (b *SSEBroker) Drain() {
b.mu.Lock()
defer b.mu.Unlock()
for client := range b.clients {
select {
case <-client.done:
default:
// Write EOF to trigger client disconnect via their event loop.
close(client.events)
}
}
}