Skip to content

Commit ba2df5d

Browse files
committed
json adapter
1 parent b9e5a1f commit ba2df5d

File tree

3 files changed

+157
-4
lines changed

3 files changed

+157
-4
lines changed

adapters/json/json.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package json
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"log"
7+
"net"
8+
"os"
9+
"strconv"
10+
"strings"
11+
"text/template"
12+
13+
"github.com/gliderlabs/logspout/adapters/syslog"
14+
"github.com/gliderlabs/logspout/router"
15+
)
16+
17+
var configDefaults = map[string]string{
18+
"JSON_FIELDS": "time:uint,message,docker.hostname,docker.image",
19+
"JSON_TIME": "{{.Time.Unix}}",
20+
"JSON_MESSAGE": "{{.Data}}",
21+
"JSON_SOURCE": "{{.Source}}",
22+
"JSON_DOCKER_HOSTNAME": "{{.Container.Config.Hostname}}",
23+
"JSON_DOCKER_IMAGE": "{{.Container.Config.Image}}",
24+
"JSON_DOCKER_ID": "{{.Container.ID}}",
25+
"JSON_DOCKER_NAME": "{{.ContainerName}}",
26+
}
27+
28+
func init() {
29+
router.AdapterFactories.Register(NewJSONAdapter, "json")
30+
}
31+
32+
func getopt(name string) string {
33+
value := os.Getenv(name)
34+
if value == "" {
35+
value = configDefaults[name]
36+
}
37+
return value
38+
}
39+
40+
type JSONAdapter struct {
41+
conn net.Conn
42+
route *router.Route
43+
tmpl *template.Template
44+
types map[string]string
45+
}
46+
47+
func NewJSONAdapter(route *router.Route) (router.LogAdapter, error) {
48+
transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("udp"))
49+
if !found {
50+
return nil, errors.New("unable to find adapter: " + route.Adapter)
51+
}
52+
conn, err := transport.Dial(route.Address, route.Options)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
fields := strings.Split(getopt("JSON_FIELDS"), ",")
58+
types := make(map[string]string)
59+
var values []string
60+
for _, field := range fields {
61+
parts := strings.Split(field, ":")
62+
if len(parts) > 1 {
63+
types[parts[0]] = parts[1]
64+
}
65+
config := "JSON_" + strings.ToUpper(strings.Replace(parts[0], ".", "_", -1))
66+
values = append(values, parts[0]+":"+getopt(config))
67+
}
68+
tmplStr := strings.Join(values, "\x00")
69+
tmpl, err := template.New("prejson").Parse(tmplStr)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
return &JSONAdapter{
75+
route: route,
76+
conn: conn,
77+
tmpl: tmpl,
78+
types: types,
79+
}, nil
80+
}
81+
82+
func (a *JSONAdapter) Stream(logstream chan *router.Message) {
83+
defer a.route.Close()
84+
for message := range logstream {
85+
m := syslog.NewSyslogMessage(message, a.conn)
86+
buf, err := m.Render(a.tmpl)
87+
if err != nil {
88+
log.Println("json:", err)
89+
return
90+
}
91+
data, err := json.Marshal(buildMap(buf.String(), a.types))
92+
if err != nil {
93+
log.Println("json:", err)
94+
return
95+
}
96+
_, err = a.conn.Write(data)
97+
if err != nil {
98+
log.Println("json:", err)
99+
return
100+
}
101+
}
102+
}
103+
104+
func buildMap(input string, types map[string]string) map[string]interface{} {
105+
m := make(map[string]interface{})
106+
fields := strings.Split(input, "\x00")
107+
for _, field := range fields {
108+
kvp := strings.SplitN(field, ":", 2)
109+
keys := strings.Split(kvp[0], ".")
110+
mm := m
111+
if len(keys) > 1 {
112+
for _, key := range keys[:len(keys)-1] {
113+
if mm[key] == nil {
114+
mm[key] = make(map[string]interface{})
115+
}
116+
mm = mm[key].(map[string]interface{})
117+
}
118+
}
119+
var value interface{}
120+
var err error
121+
switch types[field] {
122+
case "uint":
123+
value, err = strconv.ParseUint(kvp[1], 10, 64)
124+
if err != nil {
125+
value = nil
126+
}
127+
case "int":
128+
value, err = strconv.ParseInt(kvp[1], 10, 64)
129+
if err != nil {
130+
value = nil
131+
}
132+
case "float":
133+
value, err = strconv.ParseFloat(kvp[1], 64)
134+
if err != nil {
135+
value = nil
136+
}
137+
case "bool":
138+
value, err = strconv.ParseBool(kvp[1])
139+
if err != nil {
140+
value = nil
141+
}
142+
case "":
143+
value = kvp[1]
144+
}
145+
mm[keys[len(keys)-1]] = value
146+
}
147+
return m
148+
}

adapters/syslog/syslog.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ type SyslogAdapter struct {
8181
func (a *SyslogAdapter) Stream(logstream chan *router.Message) {
8282
defer a.route.Close()
8383
for message := range logstream {
84-
m := &SyslogMessage{message, a.conn}
84+
m := NewSyslogMessage(message, a.conn)
8585
buf, err := m.Render(a.tmpl)
8686
if err != nil {
8787
log.Println("syslog:", err)
8888
return
8989
}
90-
_, err = a.conn.Write(buf)
90+
_, err = a.conn.Write(buf.Bytes())
9191
if err != nil {
9292
log.Println("syslog:", err)
9393
return
@@ -100,13 +100,17 @@ type SyslogMessage struct {
100100
conn net.Conn
101101
}
102102

103-
func (m *SyslogMessage) Render(tmpl *template.Template) ([]byte, error) {
103+
func NewSyslogMessage(message *router.Message, conn net.Conn) *SyslogMessage {
104+
return &SyslogMessage{message, conn}
105+
}
106+
107+
func (m *SyslogMessage) Render(tmpl *template.Template) (*bytes.Buffer, error) {
104108
buf := new(bytes.Buffer)
105109
err := tmpl.Execute(buf, m)
106110
if err != nil {
107111
return nil, err
108112
}
109-
return buf.Bytes(), nil
113+
return buf, nil
110114
}
111115

112116
func (m *SyslogMessage) Priority() syslog.Priority {

modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
_ "github.com/gliderlabs/logspout/adapters/json"
45
_ "github.com/gliderlabs/logspout/adapters/raw"
56
_ "github.com/gliderlabs/logspout/adapters/syslog"
67
_ "github.com/gliderlabs/logspout/httpstream"

0 commit comments

Comments
 (0)