Skip to content

Commit f4ae231

Browse files
committed
feat(output): add cnosdb_subscription plugin
1 parent 585d5c2 commit f4ae231

12 files changed

+1559
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//go:build !custom || inputs || inputs.bcache
2+
3+
package all
4+
5+
import _ "github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription" // register plugin
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# CnosDB Subscription Input Plugin
2+
3+
## Build
4+
5+
To compile this plugin it requires protoc-gen-go and protoc-gen-go-grpc
6+
7+
```shell
8+
# install protoc-gen-go
9+
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
10+
# install protoc-gen-go-grpc
11+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
12+
```
13+
14+
## Usages
15+
16+
To listen on port 8803:
17+
18+
```toml
19+
[[inputs.cnosdb_subscription]]
20+
service_address = ":8803"
21+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
//go:generate ../../../tools/readme_config_includer/generator
2+
package cnosdb_subscription
3+
4+
import (
5+
_ "embed"
6+
"fmt"
7+
"net"
8+
"sync"
9+
10+
"github.com/influxdata/telegraf"
11+
"github.com/influxdata/telegraf/config"
12+
"github.com/influxdata/telegraf/plugins/inputs"
13+
"github.com/influxdata/telegraf/plugins/inputs/cnosdb_subscription/protos"
14+
"google.golang.org/grpc"
15+
)
16+
17+
func init() {
18+
inputs.Add("cnosdb_subscription", func() telegraf.Input {
19+
return &CnosDbSubscription{
20+
ServiceAddress: ":8803",
21+
}
22+
})
23+
}
24+
25+
//go:embed sample.conf
26+
var sampleConfig string
27+
28+
type CnosDbSubscription struct {
29+
ServiceAddress string `toml:"service_address"`
30+
Timeout config.Duration `toml:"timeout"`
31+
32+
Log telegraf.Logger `toml:"-"`
33+
34+
wg sync.WaitGroup `toml:"-"`
35+
36+
listener net.Listener `toml:"-"`
37+
grpcServer *grpc.Server `toml:"-"`
38+
}
39+
40+
func (*CnosDbSubscription) SampleConfig() string {
41+
return sampleConfig
42+
}
43+
44+
func (c *CnosDbSubscription) Init() error {
45+
c.Log.Info("Initialization completed.")
46+
return nil
47+
}
48+
49+
func (c *CnosDbSubscription) Gather(_ telegraf.Accumulator) error {
50+
return nil
51+
}
52+
53+
func (c *CnosDbSubscription) Start(acc telegraf.Accumulator) error {
54+
c.grpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(10 * 1024 * 1024))
55+
protos.RegisterTSKVServiceServer(c.grpcServer, NewTSKVService(acc))
56+
57+
if c.listener == nil {
58+
listener, err := net.Listen("tcp", c.ServiceAddress)
59+
if err != nil {
60+
return err
61+
}
62+
c.listener = listener
63+
}
64+
65+
c.wg.Add(1)
66+
go func() {
67+
defer c.wg.Done()
68+
if err := c.grpcServer.Serve(c.listener); err != nil {
69+
acc.AddError(fmt.Errorf("failed to stop CnosDbSubscription gRPC service: %w", err))
70+
}
71+
}()
72+
73+
c.Log.Infof("Listening on %s", c.listener.Addr().String())
74+
75+
return nil
76+
}
77+
78+
func (c *CnosDbSubscription) Stop() {
79+
if c.grpcServer != nil {
80+
c.grpcServer.Stop()
81+
}
82+
c.wg.Wait()
83+
}
84+
85+
func (c *CnosDbSubscription) MarkHighPriority() {
86+
// Do nothing
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package cnosdb
2+
3+
type ColumnType int
4+
5+
const (
6+
ColumnTypeUnknown ColumnType = iota
7+
ColumnTypeTag
8+
ColumnTypeTime
9+
ColumnTypeFieldUnknown
10+
ColumnTypeFieldFloat
11+
ColumnTypeFieldInteger
12+
ColumnTypeFieldUnsigned
13+
ColumnTypeFieldBoolean
14+
ColumnTypeFieldString
15+
ColumnTypeFieldGeometry
16+
)
17+
18+
type TimeUnit int
19+
20+
const (
21+
TimeUnitUnknown TimeUnit = iota
22+
TimeUnitSecond
23+
TimeUnitMillisecond
24+
TimeUnitMicrosecond
25+
TimeUnitNanosecond
26+
)
27+
28+
type TskvTableSchema struct {
29+
Tenant string `json:"tenant"`
30+
Db string `json:"db"`
31+
Name string `json:"name"`
32+
SchemaVersion uint64 `json:"schema_version"`
33+
NextColumnID uint32 `json:"next_column_id"`
34+
Columns []TableColumn `json:"columns"`
35+
ColumnsIndex map[string]uint32 `json:"columns_index"`
36+
}
37+
38+
type TableColumn struct {
39+
ID uint64 `json:"id"`
40+
Name string `json:"name"`
41+
ColumnType interface{} `json:"column_type"`
42+
Encoding interface{} `json:"encoding"`
43+
}
44+
45+
type ColumnTypeUnited struct {
46+
ColumnType ColumnType
47+
TimeUnit TimeUnit
48+
}
49+
50+
func (c *TableColumn) GetColumnTypeUnited() ColumnTypeUnited {
51+
switch columnType := c.ColumnType.(type) {
52+
case string:
53+
if columnType == "Tag" {
54+
// "column_type": "Tag"
55+
return ColumnTypeUnited{
56+
ColumnType: ColumnTypeTag,
57+
TimeUnit: TimeUnitUnknown,
58+
}
59+
}
60+
case map[string]interface{}:
61+
if timeUnitObj := columnType["Time"]; timeUnitObj != nil {
62+
// "column_type": {"Time":"Microsecond"}
63+
if timeUnit, ok := timeUnitObj.(string); ok {
64+
timeUnitCode := TimeUnitUnknown
65+
switch timeUnit {
66+
case "Second":
67+
timeUnitCode = TimeUnitSecond
68+
case "Millisecond":
69+
timeUnitCode = TimeUnitMillisecond
70+
case "Microsecond":
71+
timeUnitCode = TimeUnitMicrosecond
72+
case "Nanosecond":
73+
timeUnitCode = TimeUnitNanosecond
74+
}
75+
return ColumnTypeUnited{
76+
ColumnType: ColumnTypeTime,
77+
TimeUnit: timeUnitCode,
78+
}
79+
}
80+
} else if fieldTypeObj := columnType["Field"]; fieldTypeObj != nil {
81+
fieldTypeCode := ColumnTypeFieldUnknown
82+
switch fieldType := fieldTypeObj.(type) {
83+
case string:
84+
switch fieldType {
85+
case "Float":
86+
// "column_type": {"Field":"Float"}
87+
fieldTypeCode = ColumnTypeFieldFloat
88+
case "Integer":
89+
// "column_type": {"Field":"Integer"}
90+
fieldTypeCode = ColumnTypeFieldInteger
91+
case "Unsigned":
92+
// "column_type": {"Field":"Unsigned"}
93+
fieldTypeCode = ColumnTypeFieldUnsigned
94+
case "Boolean":
95+
// "column_type": {"Field":"Boolean"}
96+
fieldTypeCode = ColumnTypeFieldBoolean
97+
case "String":
98+
// "column_type": {"Field":"String"}
99+
fieldTypeCode = ColumnTypeFieldString
100+
case "Geometry":
101+
// "column_type": {"Field":"Geometry"}
102+
fieldTypeCode = ColumnTypeFieldGeometry
103+
case "Unknown":
104+
// "column_type": {"Field":"Unknown"}
105+
fieldTypeCode = ColumnTypeFieldUnknown
106+
}
107+
case map[string]interface{}:
108+
if geometryInfo := fieldType["Geometry"]; geometryInfo != nil {
109+
// "column_type": {"Field":{"Geometry":{"sub_type":"Point","srid":10}}}
110+
fieldTypeCode = ColumnTypeFieldGeometry
111+
}
112+
}
113+
return ColumnTypeUnited{
114+
ColumnType: fieldTypeCode,
115+
TimeUnit: TimeUnitUnknown,
116+
}
117+
}
118+
}
119+
120+
return ColumnTypeUnited{
121+
ColumnType: ColumnTypeUnknown,
122+
TimeUnit: TimeUnitUnknown,
123+
}
124+
}

0 commit comments

Comments
 (0)