forked from ajeetraina/plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
83 lines (73 loc) · 1.8 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main
import (
"log"
"github.com/infinimesh/plugins/pkg/api"
"github.com/infinimesh/plugins/pkg/wrappers"
redistimeseries "github.com/RedisTimeSeries/redistimeseries-go"
)
type objectWorkerFactory struct {
api api.Handler
redisClient *redistimeseries.Client
}
func (f *objectWorkerFactory) NewObjectWorker(obj api.Object) wrappers.Process {
return &objectWorker{
obj: obj,
api: f.api,
redisClient: f.redisClient,
done: make(chan struct{}),
}
}
type objectWorker struct {
obj api.Object
api api.Handler
redisClient *redistimeseries.Client
done chan struct{}
}
func (w *objectWorker) Start() {
createOpts := redistimeseries.CreateOptions{
Labels: map[string]string{
"uid": w.obj.UID,
"name": w.obj.Name,
"kind": w.obj.Kind,
},
}
ch, err := w.api.GetDevicesStateStream(w.obj.UID)
if err != nil {
log.Printf("error on get devices state stream: %s\n", err)
}
createdKeys := map[string]bool{}
for {
select {
case <-w.done:
return
case state := <-ch:
if state == nil {
log.Printf("received nil state for object %v", w.obj)
continue
}
for k, v := range state.Result.ReportedState.Data {
if v == nil {
continue
}
f, ok := v.(float64)
if !ok {
log.Printf("invalid data type found for object %v and key %s", w.obj, k)
continue
}
if !createdKeys[k] {
_ = w.redisClient.CreateKeyWithOptions(k+":"+w.obj.UID, createOpts)
createdKeys[k] = true
}
_, err = w.redisClient.AddAutoTsWithOptions(k+":"+w.obj.UID, f, createOpts)
if err != nil {
log.Printf("failed to add time series item: %s\n", err)
continue
}
log.Printf("added time series item: object=%v key=%s\n", w.obj, k)
}
}
}
}
func (w *objectWorker) Stop() {
close(w.done)
}