Skip to content

Commit 8837bdd

Browse files
tmcokay
authored and
okay
committed
[grpc] add sybild grpc service (#37)
[proto] add proto file [grpc] simplify response [grpc] stub out implementation [grpc] add sybild cmd skeleton [grpc] add stub comments to satisfy linter [grpc] use x/net/context for go<1.9 [grpc] add Trim methods [grpc] add initial grpc implementation [grpc] remove panics [grpc] implement GetTable [grpc] implement more of query [grpc] query - implement samples [grpc] populate histogram [grpc] fixup [grpc] use exec, implement more of query [query] use x/net/context for 1.6 support [grpc] implement time series queries [grpc] set large fields to uint64 [grpc] add filter support [grpc] populate distinct count [grpc] add set fields support in samples [grpc] add basic doc.go [grpc] implement ReadIngestionLog [grpc] implement ingest
1 parent f2bbe7c commit 8837bdd

File tree

17 files changed

+2851
-14
lines changed

17 files changed

+2851
-14
lines changed

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func setupCommands() {
2020
CMD_FUNCS["inspect"] = cmd.RunInspectCmdLine
2121
CMD_FUNCS["aggregate"] = cmd.RunAggregateCmdLine
2222
CMD_FUNCS["version"] = cmd.RunVersionCmdLine
23+
CMD_FUNCS["serve"] = cmd.RunServeCmdLine
2324

2425
for k, _ := range CMD_FUNCS {
2526
CMD_KEYS = append(CMD_KEYS, k)

src/cmd/cmd_ingest.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,25 @@ package sybil_cmd
33
import sybil "github.com/logv/sybil/src/lib"
44

55
import (
6+
"bufio"
7+
"bytes"
8+
69
"encoding/csv"
710
"encoding/json"
811
"flag"
912
"fmt"
13+
"github.com/golang/protobuf/jsonpb"
14+
"github.com/golang/protobuf/ptypes/struct"
1015
"io"
1116
"os"
1217
"strconv"
1318
"strings"
1419
"syscall"
1520
"time"
21+
22+
pb "github.com/logv/sybil/src/sybilpb"
23+
"golang.org/x/net/context"
24+
"google.golang.org/grpc"
1625
)
1726

1827
type Dictionary map[string]interface{}
@@ -220,6 +229,20 @@ func RunIngestCmdLine() {
220229
f_REOPEN := flag.String("infile", "", "input file to use (instead of stdin)")
221230

222231
flag.Parse()
232+
if sybil.FLAGS.DIAL != "" {
233+
var r io.ReadCloser = os.Stdin
234+
235+
var err error
236+
if *f_REOPEN != "" {
237+
r, err = os.Open(*f_REOPEN)
238+
}
239+
if err != nil {
240+
return
241+
}
242+
243+
defer r.Close()
244+
runIngestGRPC(&sybil.FLAGS, r)
245+
}
223246

224247
digestfile := fmt.Sprintf("%s", *ingestfile)
225248

@@ -286,3 +309,58 @@ func RunIngestCmdLine() {
286309

287310
t.IngestRecords(digestfile)
288311
}
312+
313+
func runIngestGRPC(flags *sybil.FlagDefs, r io.Reader) error {
314+
ctx := context.Background()
315+
opts := []grpc.DialOption{
316+
// todo
317+
grpc.WithInsecure(),
318+
}
319+
conn, err := grpc.Dial(flags.DIAL, opts...)
320+
if err != nil {
321+
return err
322+
}
323+
defer conn.Close()
324+
c := pb.NewSybilClient(conn)
325+
m := &jsonpb.Marshaler{OrigName: true}
326+
maxErrs := 100
327+
var errs int
328+
var vals []*structpb.Struct
329+
s := bufio.NewScanner(r)
330+
for s.Scan() {
331+
v := &structpb.Struct{}
332+
if err := jsonpb.Unmarshal(bytes.NewReader(s.Bytes()), v); err != nil {
333+
if err == io.EOF {
334+
break
335+
}
336+
if err != nil {
337+
sybil.Debug("ERR", err)
338+
errs++
339+
if errs > maxErrs {
340+
break
341+
}
342+
continue
343+
}
344+
}
345+
vals = append(vals, v)
346+
}
347+
if err := s.Err(); err != nil {
348+
return err
349+
}
350+
i := &pb.IngestRequest{
351+
Dataset: flags.TABLE,
352+
Records: vals,
353+
}
354+
qr, err := c.Ingest(ctx, i)
355+
if err != nil {
356+
return err
357+
}
358+
if err := m.Marshal(os.Stdout, qr); err != nil {
359+
return err
360+
}
361+
if errs > 0 {
362+
fmt.Fprintln(os.Stderr, "exiting due to error threshold being reached:", errs)
363+
os.Exit(errs)
364+
}
365+
return nil
366+
}

src/cmd/cmd_query.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@ import sybil "github.com/logv/sybil/src/lib"
44

55
import "fmt"
66
import "flag"
7+
import "os"
78
import "strings"
89
import "time"
910
import "path"
1011
import "runtime/debug"
1112

13+
import (
14+
"golang.org/x/net/context"
15+
16+
"github.com/golang/protobuf/jsonpb"
17+
pb "github.com/logv/sybil/src/sybilpb"
18+
"google.golang.org/grpc"
19+
)
20+
1221
var MAX_RECORDS_NO_GC = 4 * 1000 * 1000 // 4 million
1322

1423
const (
@@ -68,6 +77,16 @@ func RunQueryCmdLine() {
6877
addPrintFlags()
6978
flag.Parse()
7079

80+
runQueryCmdLine()
81+
82+
}
83+
84+
func runQueryCmdLine() {
85+
if sybil.FLAGS.DIAL != "" {
86+
runQueryGRPC(&sybil.FLAGS)
87+
return
88+
}
89+
7190
if sybil.FLAGS.DECODE_FLAGS {
7291
sybil.DecodeFlags()
7392
}
@@ -301,3 +320,88 @@ func RunQueryCmdLine() {
301320
}
302321

303322
}
323+
324+
func split(s, sep string) []string {
325+
if s == "" {
326+
return nil
327+
}
328+
return strings.Split(s, sep)
329+
}
330+
func splitFilters(s, sep, fSep string) []*pb.QueryFilter {
331+
if s == "" {
332+
return nil
333+
}
334+
var result []*pb.QueryFilter
335+
for _, filter := range strings.Split(s, sep) {
336+
parts := strings.Split(filter, fSep)
337+
// TODO: check filter validity
338+
result = append(result, &pb.QueryFilter{
339+
Column: parts[0],
340+
Op: pb.QueryFilterOp(pb.QueryFilterOp_value[strings.ToUpper(parts[1])]),
341+
Value: parts[2],
342+
})
343+
}
344+
return result
345+
}
346+
func runQueryGRPC(flags *sybil.FlagDefs) error {
347+
ctx := context.Background()
348+
opts := []grpc.DialOption{
349+
// todo
350+
grpc.WithInsecure(),
351+
}
352+
conn, err := grpc.Dial(flags.DIAL, opts...)
353+
if err != nil {
354+
return err
355+
}
356+
defer conn.Close()
357+
c := pb.NewSybilClient(conn)
358+
m := &jsonpb.Marshaler{OrigName: true}
359+
if flags.LIST_TABLES {
360+
r, err := c.ListTables(ctx, &pb.ListTablesRequest{})
361+
if err != nil {
362+
return err
363+
}
364+
return m.Marshal(os.Stdout, r)
365+
}
366+
if flags.PRINT_INFO {
367+
r, err := c.GetTable(ctx, &pb.GetTableRequest{
368+
Name: flags.TABLE,
369+
})
370+
if err != nil {
371+
return err
372+
}
373+
return m.Marshal(os.Stdout, r)
374+
}
375+
fs, flts := flags.FIELD_SEPARATOR, flags.FILTER_SEPARATOR
376+
q := &pb.QueryRequest{
377+
Dataset: flags.TABLE,
378+
Ints: split(flags.INTS, fs),
379+
Strs: split(flags.STRS, fs),
380+
GroupBy: split(flags.GROUPS, fs),
381+
DistinctGroupBy: split(flags.DISTINCT, fs),
382+
Limit: int64(flags.LIMIT),
383+
SortBy: flags.SORT,
384+
IntFilters: splitFilters(flags.INT_FILTERS, fs, flts),
385+
StrFilters: splitFilters(flags.STR_FILTERS, fs, flts),
386+
SetFilters: splitFilters(flags.SET_FILTERS, fs, flts),
387+
ReadIngestionLog: flags.READ_ROWSTORE,
388+
// TODO: replacements
389+
}
390+
if flags.SAMPLES {
391+
q.Type = pb.QueryType_SAMPLES
392+
}
393+
if flags.OP == sybil.OP_AVG {
394+
q.Op = pb.QueryOp_AVERAGE
395+
} else if flags.OP == sybil.OP_HIST {
396+
q.Op = pb.QueryOp_HISTOGRAM
397+
q.Type = pb.QueryType_DISTRIBUTION
398+
}
399+
if flags.TIME {
400+
q.Type = pb.QueryType_TIME_SERIES
401+
}
402+
qr, err := c.Query(ctx, q)
403+
if err != nil {
404+
return err
405+
}
406+
return m.Marshal(os.Stdout, qr)
407+
}

src/cmd/cmd_serve.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package sybil_cmd
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"net"
7+
"os"
8+
9+
sybil "github.com/logv/sybil/src/lib"
10+
"github.com/logv/sybil/src/sybild"
11+
pb "github.com/logv/sybil/src/sybilpb"
12+
"github.com/pkg/errors"
13+
"google.golang.org/grpc"
14+
)
15+
16+
const defaultServeListenAddr = "localhost:7000"
17+
18+
func RunServeCmdLine() {
19+
flag.Parse()
20+
if err := runServeCmdLine(&sybil.FLAGS); err != nil {
21+
fmt.Fprintln(os.Stderr, errors.Wrap(err, "serve"))
22+
os.Exit(1)
23+
}
24+
}
25+
26+
func runServeCmdLine(flags *sybil.FlagDefs) error {
27+
// TODO: handle signals, shutdown
28+
// TODO: auth, tls
29+
// TODO: add configurable listening address
30+
lis, err := net.Listen("tcp", defaultServeListenAddr)
31+
if err != nil {
32+
return err
33+
}
34+
s, err := sybild.NewServer()
35+
if err != nil {
36+
return err
37+
}
38+
grpcServer := grpc.NewServer()
39+
pb.RegisterSybilServer(grpcServer, s)
40+
return grpcServer.Serve(lis)
41+
}

src/cmd/sybild/main.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/logv/sybil/src/sybild"
8+
)
9+
10+
func main() {
11+
if err := run(); err != nil {
12+
fmt.Fprintln(os.Stderr, "sybild:", err)
13+
os.Exit(1)
14+
}
15+
}
16+
17+
func run() error {
18+
s, err := sybild.NewServer()
19+
if err != nil {
20+
return err
21+
}
22+
_ = s
23+
return nil
24+
}

src/lib/aggregate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ type op string
2222
const SORT_COUNT = "$COUNT"
2323

2424
const (
25-
NO_OP = iota
26-
OP_AVG = iota
27-
OP_HIST = iota
28-
OP_DISTINCT = iota
25+
NO_OP = ""
26+
OP_AVG = "avg"
27+
OP_HIST = "hist"
28+
OP_DISTINCT = "distinct"
2929
)
3030

3131
var GROUP_DELIMITER = "\t"

src/lib/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type FlagDefs struct {
8888

8989
UPDATE_TABLE_INFO bool
9090
SKIP_OUTLIERS bool
91+
92+
DIAL string // GRPC dialing
9193
}
9294

9395
type StrReplace struct {
@@ -140,6 +142,8 @@ func setDefaults() {
140142
flag.StringVar(&FLAGS.FIELD_SEPARATOR, "field-separator", ",", "Field separator used in command line params")
141143
flag.StringVar(&FLAGS.FILTER_SEPARATOR, "filter-separator", ":", "Filter separator used in filters")
142144

145+
flag.StringVar(&FLAGS.DIAL, "dial", "", "address of remote sybild to query")
146+
143147
FLAGS.UPDATE_TABLE_INFO = false
144148
FLAGS.SKIP_OUTLIERS = true
145149
FLAGS.SAMPLES = false

src/lib/query_spec.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ type Grouping struct {
7575

7676
type Aggregation struct {
7777
Op string
78-
op_id int
7978
Name string
8079
name_id int16
8180
HistType string
@@ -219,12 +218,8 @@ func (t *Table) Aggregation(name string, op string) Aggregation {
219218
col_id := t.get_key_id(name)
220219

221220
agg := Aggregation{Name: name, name_id: col_id, Op: op}
222-
if op == "avg" {
223-
agg.op_id = OP_AVG
224-
}
225221

226222
if op == "hist" {
227-
agg.op_id = OP_HIST
228223
agg.HistType = "basic"
229224
if FLAGS.LOG_HIST {
230225
agg.HistType = "multi"
@@ -236,10 +231,6 @@ func (t *Table) Aggregation(name string, op string) Aggregation {
236231
}
237232
}
238233

239-
if op == DISTINCT_STR {
240-
agg.op_id = OP_DISTINCT
241-
}
242-
243234
_, ok := t.IntInfo[col_id]
244235
if !ok {
245236
// TODO: tell our table we need to load all records!
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
eyJPUCI6ImF2ZyIsIlBSSU5UIjp0cnVlLCJFWFBPUlQiOmZhbHNlLCJMSVNUX1RBQkxFUyI6ZmFsc2UsIkRFQ09ERV9GTEFHUyI6ZmFsc2UsIkVOQ09ERV9GTEFHUyI6ZmFsc2UsIkVOQ09ERV9SRVNVTFRTIjpmYWxzZSwiSU5UX0ZJTFRFUlMiOiIiLCJTVFJfRklMVEVSUyI6IiIsIlNUUl9SRVBMQUNFIjoiIiwiU0VUX0ZJTFRFUlMiOiIiLCJJTlRTIjoiZm9vLGJhciIsIlNUUlMiOiIiLCJHUk9VUFMiOiJhLGIsYyIsIkRJU1RJTkNUIjoiIiwiQUREX1JFQ09SRFMiOjAsIlRJTUUiOmZhbHNlLCJUSU1FX0NPTCI6InRpbWUiLCJUSU1FX0JVQ0tFVCI6MzYwMCwiSElTVF9CVUNLRVQiOjAsIkhEUl9ISVNUIjpmYWxzZSwiTE9HX0hJU1QiOmZhbHNlLCJGSUVMRF9TRVBBUkFUT1IiOiIsIiwiRklMVEVSX1NFUEFSQVRPUiI6IjoiLCJQUklOVF9LRVlTIjpmYWxzZSwiTE9BRF9BTkRfUVVFUlkiOnRydWUsIkxPQURfVEhFTl9RVUVSWSI6ZmFsc2UsIlJFQURfSU5HRVNUSU9OX0xPRyI6ZmFsc2UsIlJFQURfUk9XU1RPUkUiOmZhbHNlLCJTS0lQX0NPTVBBQ1QiOmZhbHNlLCJQUk9GSUxFIjpmYWxzZSwiUFJPRklMRV9NRU0iOmZhbHNlLCJSRUNZQ0xFX01FTSI6dHJ1ZSwiQ0FDSEVEX1FVRVJJRVMiOmZhbHNlLCJXRUlHSFRfQ09MIjoiIiwiTElNSVQiOjEwMCwiREVCVUciOmZhbHNlLCJKU09OIjpmYWxzZSwiR0MiOnRydWUsIkRJUiI6Ii4vZGIvIiwiU09SVCI6IiRDT1VOVCIsIlBSVU5FX0JZIjoiJENPVU5UIiwiVEFCTEUiOiJ0ZXN0YWJsZSIsIlBSSU5UX0lORk8iOmZhbHNlLCJTQU1QTEVTIjpmYWxzZSwiVVBEQVRFX1RBQkxFX0lORk8iOmZhbHNlLCJTS0lQX09VVExJRVJTIjp0cnVlfQ==
1+
eyJPUCI6ImF2ZyIsIlBSSU5UIjp0cnVlLCJFWFBPUlQiOmZhbHNlLCJMSVNUX1RBQkxFUyI6ZmFsc2UsIkRFQ09ERV9GTEFHUyI6ZmFsc2UsIkVOQ09ERV9GTEFHUyI6ZmFsc2UsIkVOQ09ERV9SRVNVTFRTIjpmYWxzZSwiSU5UX0ZJTFRFUlMiOiIiLCJTVFJfRklMVEVSUyI6IiIsIlNUUl9SRVBMQUNFIjoiIiwiU0VUX0ZJTFRFUlMiOiIiLCJJTlRTIjoiZm9vLGJhciIsIlNUUlMiOiIiLCJHUk9VUFMiOiJhLGIsYyIsIkRJU1RJTkNUIjoiIiwiQUREX1JFQ09SRFMiOjAsIlRJTUUiOmZhbHNlLCJUSU1FX0NPTCI6InRpbWUiLCJUSU1FX0JVQ0tFVCI6MzYwMCwiSElTVF9CVUNLRVQiOjAsIkhEUl9ISVNUIjpmYWxzZSwiTE9HX0hJU1QiOmZhbHNlLCJGSUVMRF9TRVBBUkFUT1IiOiIsIiwiRklMVEVSX1NFUEFSQVRPUiI6IjoiLCJQUklOVF9LRVlTIjpmYWxzZSwiTE9BRF9BTkRfUVVFUlkiOnRydWUsIkxPQURfVEhFTl9RVUVSWSI6ZmFsc2UsIlJFQURfSU5HRVNUSU9OX0xPRyI6ZmFsc2UsIlJFQURfUk9XU1RPUkUiOmZhbHNlLCJTS0lQX0NPTVBBQ1QiOmZhbHNlLCJQUk9GSUxFIjpmYWxzZSwiUFJPRklMRV9NRU0iOmZhbHNlLCJSRUNZQ0xFX01FTSI6dHJ1ZSwiQ0FDSEVEX1FVRVJJRVMiOmZhbHNlLCJXRUlHSFRfQ09MIjoiIiwiTElNSVQiOjEwMCwiREVCVUciOmZhbHNlLCJKU09OIjpmYWxzZSwiR0MiOnRydWUsIkRJUiI6Ii4vZGIvIiwiU09SVCI6IiRDT1VOVCIsIlBSVU5FX0JZIjoiJENPVU5UIiwiVEFCTEUiOiJ0ZXN0YWJsZSIsIlBSSU5UX0lORk8iOmZhbHNlLCJTQU1QTEVTIjpmYWxzZSwiVVBEQVRFX1RBQkxFX0lORk8iOmZhbHNlLCJTS0lQX09VVExJRVJTIjp0cnVlLCJESUFMIjoiIn0=

src/sybild/doc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Package sybild holds the implementation of the sybil grpc service.
2+
//
3+
// A sybil server can be started with the `serve` subcommand.
4+
//
5+
// Sybil queries are sent to a remote by supplying a value to the `-dial` argument.
6+
package sybild

0 commit comments

Comments
 (0)