Skip to content

Commit 37da3dd

Browse files
authored
allowing custom action for elastic format (#796)
* allowing custom action for elastic format
1 parent 7ee95ba commit 37da3dd

File tree

5 files changed

+42
-10
lines changed

5 files changed

+42
-10
lines changed

cmd/ktranslate/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ func applyFlags(cfg *ktranslate.Config) error {
450450
cfg.OtelFormat.ClientKey = val
451451
case "otel.root_ca":
452452
cfg.OtelFormat.RootCA = val
453+
// pkg/formats/elasticsearch
454+
case "elastic.action":
455+
cfg.ElasticFormat.Action = val
453456
// pkg/formats/snmp
454457
case "snmp.format.conf":
455458
cfg.SnmpFormat.ConfigFile = val

config.go

+10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ type PrometheusFormatConfig struct {
3535
FlowsNeeded int
3636
}
3737

38+
// ElasticFormatConfig is the config for the elastic format
39+
type ElasticFormatConfig struct {
40+
Action string
41+
}
42+
3843
// OtelFormatConfig is the config for the otel format
3944
type OtelFormatConfig struct {
4045
Endpoint string
@@ -291,6 +296,8 @@ type Config struct {
291296
OtelFormat *OtelFormatConfig
292297
// pkg/formats/snmp
293298
SnmpFormat *SnmpFormatConfig
299+
// pkg/formats/elasticsearch
300+
ElasticFormat *ElasticFormatConfig
294301

295302
// pkg/sinks/prom
296303
PrometheusSink *PrometheusSinkConfig
@@ -389,6 +396,9 @@ func DefaultConfig() *Config {
389396
ClientCert: "",
390397
RootCA: "",
391398
},
399+
ElasticFormat: &ElasticFormatConfig{
400+
Action: "index",
401+
},
392402
SnmpFormat: &SnmpFormatConfig{
393403
ConfigFile: "",
394404
},

pkg/formats/elasticsearch/elasticsearch.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"bufio"
55
"bytes"
66
"compress/gzip"
7+
"flag"
78
"fmt"
89
"io"
910
"strings"
1011
"time"
1112

13+
"github.com/kentik/ktranslate"
1214
"github.com/kentik/ktranslate/pkg/formats/util"
1315
"github.com/kentik/ktranslate/pkg/kt"
1416
"github.com/kentik/ktranslate/pkg/rollup"
@@ -18,21 +20,31 @@ import (
1820
)
1921

2022
const (
21-
actionEntry = `{"index":{}}`
23+
actionEntryFormat = `{"%s":{}}`
2224
)
2325

26+
var (
27+
actionEntrySet string
28+
)
29+
30+
func init() {
31+
flag.StringVar(&actionEntrySet, "elastic.action", "index", "Use this action when sending to elastic.")
32+
}
33+
2434
var json = jsoniter.ConfigFastest
2535

2636
type ElasticsearchFormat struct {
2737
logger.ContextL
2838
compression kt.Compression
2939
useGzip bool
40+
action string
3041
}
3142

32-
func NewFormat(log logger.Underlying, compression kt.Compression) (*ElasticsearchFormat, error) {
43+
func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslate.ElasticFormatConfig) (*ElasticsearchFormat, error) {
3344
ef := &ElasticsearchFormat{
3445
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "elasticsearchFormat"}, log),
3546
compression: compression,
47+
action: fmt.Sprintf(actionEntryFormat, cfg.Action),
3648
}
3749

3850
switch compression {
@@ -44,6 +56,8 @@ func NewFormat(log logger.Underlying, compression kt.Compression) (*Elasticsearc
4456
return nil, fmt.Errorf("Invalid compression (%s): format json only supports none|gzip", compression)
4557
}
4658

59+
ef.Infof("Using action %s", ef.action)
60+
4761
return ef, nil
4862
}
4963

@@ -62,7 +76,7 @@ func (f *ElasticsearchFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, er
6276

6377
esBulkData := []string{}
6478
for _, m := range msgsNew {
65-
data, err := serialize(m)
79+
data, err := serialize(m, f.action)
6680
if err != nil {
6781
return nil, err
6882
}
@@ -108,7 +122,7 @@ func (f *ElasticsearchFormat) From(raw *kt.Output) ([]map[string]interface{}, er
108122
for sc.Scan() {
109123
// check for ES action and ignore
110124
v := sc.Text()
111-
if v == actionEntry {
125+
if v == f.action {
112126
continue
113127
}
114128
msg := &kt.JCHF{}
@@ -131,7 +145,7 @@ func (f *ElasticsearchFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error)
131145
// serialize rolls
132146
esBulkData := []string{}
133147
for _, m := range rolls {
134-
data, err := serialize(m)
148+
data, err := serialize(m, f.action)
135149
if err != nil {
136150
return nil, err
137151
}
@@ -166,8 +180,8 @@ func (f *ElasticsearchFormat) handleSynth(in *kt.JCHF) map[string]interface{} {
166180
return attr
167181
}
168182

169-
func serialize(o interface{}) (string, error) {
170-
s := actionEntry + "\n"
183+
func serialize(o interface{}, action string) (string, error) {
184+
s := action + "\n"
171185
data, err := json.Marshal(o)
172186
if err != nil {
173187
return "", err

pkg/formats/elasticsearch/elasticsearch_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@ package elasticsearch
33
import (
44
"testing"
55

6+
"github.com/kentik/ktranslate"
7+
"github.com/kentik/ktranslate/pkg/eggs/logger"
8+
lt "github.com/kentik/ktranslate/pkg/eggs/logger/testing"
69
"github.com/kentik/ktranslate/pkg/kt"
710
"github.com/stretchr/testify/assert"
811
)
912

1013
func TestSerializeElasticsearch(t *testing.T) {
1114
serBuf := make([]byte, 0)
1215
assert := assert.New(t)
16+
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
1317

14-
f, err := NewFormat(nil, kt.CompressionNone)
18+
f, err := NewFormat(l, kt.CompressionNone, &ktranslate.ElasticFormatConfig{Action: "index"})
1519
assert.NoError(err)
1620

1721
res, err := f.To(kt.InputTesting, serBuf)
@@ -29,7 +33,8 @@ func TestSerializeElasticsearch(t *testing.T) {
2933
func TestSerializeElasticsearchGzip(t *testing.T) {
3034
serBuf := make([]byte, 0)
3135
assert := assert.New(t)
32-
f, err := NewFormat(nil, kt.CompressionGzip)
36+
l := lt.NewTestContextL(logger.NilContext, t).GetLogger().GetUnderlyingLogger()
37+
f, err := NewFormat(l, kt.CompressionGzip, &ktranslate.ElasticFormatConfig{Action: "index"})
3338
assert.NoError(err)
3439
res, err := f.To(kt.InputTesting, serBuf)
3540
assert.NoError(err)

pkg/formats/format.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func NewFormat(ctx context.Context, format Format, log logger.Underlying, regist
6060
case FORMAT_AVRO:
6161
return avro.NewFormat(log, compression)
6262
case FORMAT_ELASTICSEARCH:
63-
return elasticsearch.NewFormat(log, compression)
63+
return elasticsearch.NewFormat(log, compression, cfg.ElasticFormat)
6464
case FORMAT_JSON:
6565
return json.NewFormat(log, compression, false)
6666
case FORMAT_NETFLOW:

0 commit comments

Comments
 (0)