-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtable.go
131 lines (115 loc) · 2.87 KB
/
table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package hltscl
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
const (
defaultTimeout = 10 * time.Second
defaultMinTimeout = 1 * time.Second
defaultBufferSize = 100
)
type WriteError struct {
Entry Entry
Err error
}
type TableWriter struct {
conn *pgxpool.Pool
name string
tsColName string
loc *time.Location
ch chan Entry
chErr chan WriteError
timeoutCtrl *timeoutController
}
type optItems struct {
timeout time.Duration
minTimeout time.Duration
bufferSize int
}
type OptsFn func(*optItems)
func WithTimeout(tm time.Duration) OptsFn {
return func(o *optItems) {
o.timeout = tm
}
}
func WithMinTimeout(tm time.Duration) OptsFn {
return func(o *optItems) {
o.minTimeout = tm
}
}
func WithBufferSize(size int) OptsFn {
return func(o *optItems) {
o.bufferSize = size
}
}
func (table *TableWriter) CurrentQueryTimeout() time.Duration {
return table.timeoutCtrl.currentTimeout
}
// Activate starts TimescaleDB writer and provides a writing channel to the client along with
// a reading channel for error reporting. Writing uses an adaptive timeout to prevent flooding
// of the incoming channel with requests in case the target database cannot handle queries.
//
// defaults:
//
// defaultTimeout = 10 * time.Second
// defaultMinTimeout = 1 * time.Second
// defaultBufferSize = 100
func (table *TableWriter) Activate(ctx context.Context, opts ...OptsFn) (chan<- Entry, <-chan WriteError) {
o := optItems{
timeout: defaultTimeout,
bufferSize: defaultBufferSize,
minTimeout: defaultMinTimeout,
}
for _, oItem := range opts {
oItem(&o)
}
table.timeoutCtrl = newTimeoutController(ctx, o.timeout, o.minTimeout)
table.ch = make(chan Entry, o.bufferSize)
table.chErr = make(chan WriteError, o.bufferSize)
go func() {
defer close(table.chErr)
for {
select {
case entry := <-table.ch:
sql, args := entry.ExportForSQL(table.name, table.tsColName)
ctx2, cancel := context.WithTimeout(ctx, o.timeout)
_, err := table.conn.Exec(ctx2, sql, args...)
if err == nil {
table.timeoutCtrl.reportSuccess()
} else {
table.chErr <- WriteError{
Entry: entry,
Err: fmt.Errorf("failed to write TimescaleDB entry: %w", err),
}
table.timeoutCtrl.ReportFailure()
}
cancel()
case <-ctx.Done():
if len(table.ch) > 0 {
table.chErr <- WriteError{
Err: fmt.Errorf("writer timed out with %d entries remaining", len(table.ch)),
}
}
return
}
}
}()
return table.ch, table.chErr
}
func (table *TableWriter) NewEntry(ts time.Time) *Entry {
return &Entry{
ts: ts.In(table.loc),
loc: table.loc,
data: make(map[string]any),
}
}
func NewTableWriter(conn *pgxpool.Pool, name, tsColName string, loc *time.Location) *TableWriter {
return &TableWriter{
conn: conn,
name: name,
tsColName: tsColName,
loc: loc,
}
}