Skip to content

Commit b790eb9

Browse files
committed
feat(parser): add parser opentsdb_json
1 parent 621864a commit b790eb9

File tree

4 files changed

+213
-0
lines changed

4 files changed

+213
-0
lines changed

plugins/parsers/all/opentsdb_json.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//go:build !custom || parsers || parsers.opentsdbtelnet
2+
3+
package all
4+
5+
import _ "github.com/influxdata/telegraf/plugins/parsers/opentsdb_json" // register plugin
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# OpenTSDB JSON Style Parser Plugin
2+
3+
## Configuration
4+
5+
```toml
6+
[[inputs.file]]
7+
files = ["example"]
8+
data_format = "opentsdb_json"
9+
```
10+
11+
## Example
12+
13+
```json
14+
[
15+
{
16+
"metric": "sys.cpu.nice",
17+
"timestamp": 1346846400,
18+
"value": 18,
19+
"tags": {
20+
"host": "web01",
21+
"dc": "lga"
22+
}
23+
},
24+
{
25+
"metric": "sys.cpu.nice",
26+
"timestamp": 1346846400,
27+
"value": 9,
28+
"tags": {
29+
"host": "web02",
30+
"dc": "lga"
31+
}
32+
}
33+
]
34+
```
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package opentsdb
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"time"
9+
10+
"github.com/influxdata/telegraf"
11+
"github.com/influxdata/telegraf/metric"
12+
"github.com/influxdata/telegraf/plugins/parsers"
13+
)
14+
15+
func init() {
16+
parsers.Add("opentsdb_json",
17+
func(_ string) telegraf.Parser {
18+
return &Parser{}
19+
},
20+
)
21+
}
22+
23+
type point struct {
24+
Metric string `json:"metric"`
25+
Time int64 `json:"timestamp"`
26+
Value float64 `json:"value"`
27+
Tags map[string]string `json:"tags,omitempty"`
28+
}
29+
30+
type Parser struct {
31+
DefaultTags map[string]string `toml:"-"`
32+
}
33+
34+
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
35+
var multi bool
36+
switch buf[0] {
37+
case '{':
38+
case '[':
39+
multi = true
40+
default:
41+
return nil, errors.New("expected JSON array or hash")
42+
}
43+
44+
points := make([]point, 1)
45+
if dec := json.NewDecoder(bytes.NewReader(buf)); multi {
46+
if err := dec.Decode(&points); err != nil {
47+
return nil, errors.New("json array decode error for data format: opentsdb")
48+
}
49+
} else {
50+
if err := dec.Decode(&points[0]); err != nil {
51+
return nil, errors.New("json object decode error for data format: opentsdb")
52+
}
53+
}
54+
55+
metrics := make([]telegraf.Metric, 0, len(points))
56+
for i := range points {
57+
pt := points[i]
58+
59+
// Convert timestamp to Go time.
60+
// If time value is over a billion then it's microseconds.
61+
var ts time.Time
62+
if pt.Time < 10000000000 {
63+
ts = time.Unix(pt.Time, 0)
64+
} else {
65+
ts = time.Unix(pt.Time/1000, (pt.Time%1000)*1000)
66+
}
67+
68+
var tags map[string]string
69+
if len(p.DefaultTags) > 0 {
70+
tags = make(map[string]string)
71+
for k, v := range p.DefaultTags {
72+
tags[k] = v
73+
}
74+
for k, v := range pt.Tags {
75+
tags[k] = v
76+
}
77+
} else {
78+
tags = pt.Tags
79+
}
80+
81+
mt := metric.New(pt.Metric, tags, map[string]interface{}{"value": pt.Value}, ts)
82+
metrics = append(metrics, mt)
83+
}
84+
85+
return metrics, nil
86+
}
87+
88+
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
89+
metrics, err := p.Parse([]byte(line))
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
if len(metrics) < 1 {
95+
return nil, fmt.Errorf("can not parse the line: %s, for data format: opentsdb", line)
96+
}
97+
98+
return metrics[0], nil
99+
}
100+
101+
func (p *Parser) SetDefaultTags(tags map[string]string) {
102+
p.DefaultTags = tags
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package opentsdb
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
const JsonArray = `[
11+
{
12+
"metric": "sys.cpu.nice",
13+
"timestamp": 1346846400,
14+
"value": 18,
15+
"tags": {
16+
"host": "web01",
17+
"dc": "lga"
18+
}
19+
},
20+
{
21+
"metric": "sys.cpu.nice",
22+
"timestamp": 1346846400,
23+
"value": 9,
24+
"tags": {
25+
"host": "web02",
26+
"dc": "lga"
27+
}
28+
}
29+
]`
30+
31+
const JsonObject = `{
32+
"metric": "sys.cpu.nice",
33+
"timestamp": 1346846400,
34+
"value": 18,
35+
"tags": {
36+
"host": "web01",
37+
"dc": "lga"
38+
}
39+
}`
40+
41+
func TestParseJSONArray(t *testing.T) {
42+
parser := &Parser{}
43+
metrics, err := parser.Parse([]byte(JsonArray))
44+
require.NoError(t, err)
45+
require.Len(t, metrics, 2)
46+
require.Equal(t, "sys.cpu.nice", metrics[0].Name())
47+
require.Equal(t, map[string]string{
48+
"host": "web01",
49+
"dc": "lga",
50+
}, metrics[0].Tags())
51+
require.Equal(t, map[string]interface{}{
52+
"value": float64(18),
53+
}, metrics[0].Fields())
54+
require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time())
55+
}
56+
57+
func TestParseJSONObject(t *testing.T) {
58+
parser := &Parser{}
59+
metrics, err := parser.Parse([]byte(JsonObject))
60+
require.NoError(t, err)
61+
require.Len(t, metrics, 1)
62+
require.Equal(t, "sys.cpu.nice", metrics[0].Name())
63+
require.Equal(t, map[string]string{
64+
"host": "web01",
65+
"dc": "lga",
66+
}, metrics[0].Tags())
67+
require.Equal(t, map[string]interface{}{
68+
"value": float64(18),
69+
}, metrics[0].Fields())
70+
require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time())
71+
}

0 commit comments

Comments
 (0)