Skip to content

Commit 859bc2a

Browse files
committed
add otlp event collector
1 parent 3f54b5c commit 859bc2a

File tree

2 files changed

+117
-0
lines changed

2 files changed

+117
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package eventhandler
6+
7+
import (
8+
"bytes"
9+
"fmt"
10+
"strconv"
11+
"time"
12+
13+
"github.com/tidwall/gjson"
14+
)
15+
16+
type OTLPEventCollector struct{}
17+
18+
// Filter skips processing RUM related events.
19+
func (a *OTLPEventCollector) Filter(line []byte) error {
20+
if bytes.HasPrefix(line, rumMetaHeader) {
21+
return fmt.Errorf("rum data support not implemented")
22+
}
23+
24+
return nil
25+
}
26+
27+
// IsMeta identifies metadata lines from APM protocol.
28+
func (a *OTLPEventCollector) IsMeta(line []byte) bool {
29+
return bytes.HasPrefix(line, metaHeader)
30+
}
31+
32+
// Process processes single lines extracting APM events.
33+
// It uniforms events timestamp.
34+
func (a *OTLPEventCollector) Process(linecopy []byte) event {
35+
event := event{payload: linecopy}
36+
result := gjson.ParseBytes(linecopy)
37+
38+
result.ForEach(func(key, value gjson.Result) bool {
39+
event.objectType = key.Str // lines look like {"span":{...}}
40+
41+
switch event.objectType {
42+
case "resourceLogs":
43+
// compute minimum timestamp from all resource logs
44+
value.Get("#.scopeLogs.#.logRecords").
45+
ForEach(func(key, value gjson.Result) bool {
46+
value.ForEach(func(key, value gjson.Result) bool {
47+
value.ForEach(func(key, value gjson.Result) bool {
48+
s, err := strconv.ParseInt(value.Get("timeUnixNano").String(), 10, 64)
49+
if err != nil {
50+
return true
51+
}
52+
t := time.Unix(0, s)
53+
if event.timestamp.IsZero() || t.Before(event.timestamp) {
54+
event.timestamp = t
55+
}
56+
return true
57+
})
58+
return true
59+
})
60+
return true
61+
})
62+
}
63+
64+
// timestampResult := value.Get("timestamp")
65+
// if timestampResult.Exists() {
66+
// switch timestampResult.Type {
67+
// case gjson.Number:
68+
// us := timestampResult.Int()
69+
// if us >= 0 {
70+
// s := us / 1000000
71+
// ns := (us - (s * 1000000)) * 1000
72+
// event.timestamp = time.Unix(s, ns)
73+
// }
74+
// case gjson.String:
75+
// tstr := timestampResult.Str
76+
// for _, f := range supportedTSFormats {
77+
// if t, err := time.Parse(f, tstr); err == nil {
78+
// event.timestamp = t
79+
// break
80+
// }
81+
// }
82+
// }
83+
// }
84+
return true
85+
})
86+
87+
return event
88+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package eventhandler
6+
7+
import (
8+
"strconv"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestOTLPEventCollector_Process(t *testing.T) {
16+
o := OTLPEventCollector{}
17+
18+
minTimestamp := "1581452773000000789"
19+
s, err := strconv.ParseInt(minTimestamp, 10, 64)
20+
assert.NoError(t, err)
21+
et := time.Unix(0, s)
22+
23+
line := `{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1581452773000009875","severityNumber":9,"severityText":"Info","name":"logA","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":"1"}}],"droppedAttributesCount":1,"traceId":"08040201000000000000000000000000","spanId":"0102040800000000"},{"timeUnixNano":"1581452773000000789","severityNumber":9,"severityText":"Info","name":"logB","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":"","spanId":""}]}]}]}`
24+
25+
event := o.Process([]byte(line))
26+
27+
assert.Equal(t, "resourceLogs", event.objectType)
28+
assert.Equal(t, et, event.timestamp)
29+
}

0 commit comments

Comments
 (0)