Skip to content

Commit

Permalink
Add CrateDB database support
Browse files Browse the repository at this point in the history
This includes support for data generation and querying for
the devops use case.
  • Loading branch information
kovrus authored and RobAtticus committed Jun 17, 2019
1 parent ad198f4 commit 3adcb55
Show file tree
Hide file tree
Showing 25 changed files with 1,875 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
language: go

go_import_path: github.com/timescale/tsbs

go:
- 1.9.x
- 1.10.x
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Time Series Benchmark Suite (TSBS)
This repo contains code for benchmarking several time series databases,
including TimescaleDB, MongoDB, InfluxDB, and Cassandra.
including TimescaleDB, MongoDB, InfluxDB, CrateDB and Cassandra.
This code is based on a fork of work initially made public by InfluxDB
at https://github.com/influxdata/influxdb-comparisons.

Expand All @@ -11,6 +11,7 @@ Current databases supported:
+ InfluxDB [(supplemental docs)](docs/influx.md)
+ Cassandra [(supplemental docs)](docs/cassandra.md)
+ ClickHouse [(supplemental docs)](docs/clickhouse.md)
+ CrateDB [(supplemental docs)](docs/cratedb.md)
+ SiriDB [(supplemental docs)](docs/siridb.md)

## Overview
Expand Down Expand Up @@ -101,7 +102,9 @@ Variables needed:
1. a start time for the data's timestamps. E.g., `2016-01-01T00:00:00Z`
1. an end time. E.g., `2016-01-04T00:00:00Z`
1. how much time should be between each reading per device, in seconds. E.g., `10s`
1. and which database(s) you want to generate for. E.g., `timescaledb` (choose from `cassandra`, `clickhouse`, `influx`, `mongo`, `siridb` or `timescaledb`)
1. and which database(s) you want to generate for. E.g., `timescaledb`
(choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `siridb`,
or `timescaledb`)

Given the above steps you can now generate a dataset (or multiple
datasets, if you chose to generate for multiple databases) that can
Expand Down
56 changes: 56 additions & 0 deletions cmd/tsbs_generate_data/serialize/cratedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package serialize

import (
"fmt"
"io"
)

const TAB = '\t'

// CrateDBSerializer writes a Point in a serialized form for CrateDB
type CrateDBSerializer struct{}

// Serialize Point p to the given Writer w, so it can be loaded by the CrateDB
// loader. The format is TSV with one line per point, that contains the
// measurement type, tags with keys and values as a JSON object, timestamp,
// and metric values.
//
// An example of a serialized point:
// cpu\t{"hostname":"host_0","rack":"1"}\t1451606400000000000\t38\t0\t50\t41234
func (s *CrateDBSerializer) Serialize(p *Point, w io.Writer) error {
buf := make([]byte, 0, 256)

// measurement type
buf = append(buf, p.measurementName...)
buf = append(buf, TAB)

// tags
if len(p.tagKeys) > 0 {
buf = append(buf, '{')
for i, key := range p.tagKeys {
buf = append(buf, '"')
buf = append(buf, key...)
buf = append(buf, []byte("\":\"")...)
buf = append(buf, p.tagValues[i]...)
buf = append(buf, []byte("\",")...)
}
buf = buf[:len(buf)-1]
buf = append(buf, '}')
} else {
buf = append(buf, []byte("null")...)
}

// timestamp
buf = append(buf, TAB)
ts := fmt.Sprintf("%d", p.timestamp.UTC().UnixNano())
buf = append(buf, ts...)

// metrics
for _, v := range p.fieldValues {
buf = append(buf, TAB)
buf = fastFormatAppend(v, buf)
}
buf = append(buf, '\n')
_, err := w.Write(buf)
return err
}
44 changes: 44 additions & 0 deletions cmd/tsbs_generate_data/serialize/cratedb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package serialize

import (
"testing"
)

func TestCrateDBSerializerSerialize(t *testing.T) {
cases := []serializeCase{
{
desc: "a regular Point",
inputPoint: testPointDefault,
output: "cpu\t{\"hostname\":\"host_0\",\"region\":\"eu-west-1\",\"datacenter\":\"eu-west-1b\"}\t1451606400000000000\t38.24311829\n",
},
{
desc: "a regular Point using int as value",
inputPoint: testPointInt,
output: "cpu\t{\"hostname\":\"host_0\",\"region\":\"eu-west-1\",\"datacenter\":\"eu-west-1b\"}\t1451606400000000000\t38\n",
},
{
desc: "a regular Point with multiple fields",
inputPoint: testPointMultiField,
output: "cpu\t{\"hostname\":\"host_0\",\"region\":\"eu-west-1\",\"datacenter\":\"eu-west-1b\"}\t1451606400000000000\t5000000000\t38\t38.24311829\n",

},
{
desc: "a Point with no tags",
inputPoint: testPointNoTags,
output: "cpu\tnull\t1451606400000000000\t38.24311829\n",
},
}

testSerializer(t, cases, &CrateDBSerializer{})
}

func TestCrateDBSerializerSerializeErr(t *testing.T) {
p := testPointMultiField
s := &CrateDBSerializer{}
err := s.Serialize(p, &errWriter{})
if err == nil {
t.Errorf("no error returned when expected")
} else if err.Error() != errWriterAlwaysErr {
t.Errorf("unexpected writer error: %v", err)
}
}
236 changes: 236 additions & 0 deletions cmd/tsbs_generate_queries/databases/cratedb/devops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package cratedb

import (
"fmt"
"strings"
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/query"
)

// TODO: Remove the need for this by continuing to bubble up errors
func panicIfErr(err error) {
if err != nil {
panic(err.Error())
}
}

// Devops produces CrateDB-specific queries for all the devops query types.
type Devops struct {
*devops.Core
}

// NewDevops makes an Devops object ready to generate Queries.
func NewDevops(start, end time.Time, scale int) *Devops {
core, err := devops.NewCore(start, end, scale)
panicIfErr(err)
return &Devops{core}
}

const hostnameField = "tags['hostname']"

// GenerateEmptyQuery returns an empty query.CrateDB
func (d *Devops) GenerateEmptyQuery() query.Query {
return query.NewCrateDB()
}

// getSelectAggClauses builds specified aggregate function clauses for
// a set of column idents.
//
// For instance:
// max(cpu_time) AS max_cpu_time
func (d *Devops) getSelectAggClauses(aggFunc string, idents []string) []string {
selectAggClauses := make([]string, len(idents))
for i, ident := range idents {
selectAggClauses[i] =
fmt.Sprintf("%[1]s(%[2]s) AS %[1]s_%[2]s", aggFunc, ident)
}
return selectAggClauses
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for N random
// hosts
//
// Queries:
// cpu-max-all-1
// cpu-max-all-8
func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.MaxAllDuration)
selectClauses := d.getSelectAggClauses("max", devops.GetAllCPUMetrics())
hosts, err := d.GetRandomHosts(nHosts)
panicIfErr(err)

sql := fmt.Sprintf(`
SELECT
date_trunc('hour', ts) AS hour,
%s
FROM cpu
WHERE %s IN ('%s')
AND ts >= %d
AND ts < %d
GROUP BY hour
ORDER BY hour`,
strings.Join(selectClauses, ", "),
hostnameField,
strings.Join(hosts, "', '"),
interval.StartUnixMillis(),
interval.EndUnixMillis())

humanLabel := devops.GetMaxAllLabel("CrateDB", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// GroupByTimeAndPrimaryTag selects the AVG of metrics in the group `cpu` per device
// per hour for a day
//
// Queries:
// double-groupby-1
// double-groupby-5
// double-groupby-all
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration)
selectClauses := d.getSelectAggClauses("mean", metrics)

sql := fmt.Sprintf(`
SELECT
date_trunc('hour', ts) AS hour,
%s
FROM cpu
WHERE ts >= %d
AND ts < %d
GROUP BY hour, %s
ORDER BY hour`,
strings.Join(selectClauses, ", "),
interval.StartUnixMillis(),
interval.EndUnixMillis(),
hostnameField)

humanLabel := devops.GetDoubleGroupByLabel("CrateDB", numMetrics)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// GroupByOrderByLimit populates a query.Query that has a time WHERE clause,
// that groups by a truncated date, orders by that date, and takes a limit:
//
// Queries:
// groupby-orderby-limit
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
sql := fmt.Sprintf(`
SELECT
date_trunc('minute', ts) as minute,
max(usage_user)
FROM cpu
WHERE ts < %d
GROUP BY minute
ORDER BY minute DESC
LIMIT 5`,
interval.EndUnixMillis())

humanLabel := "CrateDB max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// LastPointPerHost finds the last row for every host in the dataset
func (d *Devops) LastPointPerHost(qi query.Query) {
sql := fmt.Sprintf(`
SELECT *
FROM
(
SELECT %[1]s AS host, max(ts) AS max_ts
FROM cpu
GROUP BY %[1]s
) t, cpu c
WHERE t.max_ts = c.ts
AND t.host = c.%[1]s`, hostnameField)

humanLabel := "CrateDB last row per host"
humanDesc := humanLabel
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has
// high usage between a time period for a number of hosts (if 0, it will
// search all hosts)
//
// Queries:
// high-cpu-1
// high-cpu-all
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)
hosts, err := d.GetRandomHosts(nHosts)
panicIfErr(err)

sql := fmt.Sprintf(`
SELECT *
FROM cpu
WHERE usage_user > 90.0
AND ts >= %d
AND ts < %d
AND %s IN ('%s')`,
interval.StartUnixMillis(),
interval.EndUnixMillis(),
hostnameField,
strings.Join(hosts, "', '"))

humanLabel, err := devops.GetHighCPULabel("CrateDB", nHosts)
panicIfErr(err)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// GroupByTime selects the MAX for metrics under 'cpu', per minute for N random
// hosts
//
// Resultsets:
// single-groupby-1-1-12
// single-groupby-1-1-1
// single-groupby-1-8-1
// single-groupby-5-1-12
// single-groupby-5-1-1
// single-groupby-5-8-1
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.MustRandWindow(timeRange)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
selectClauses := d.getSelectAggClauses("max", metrics)
hosts, err := d.GetRandomHosts(nHosts)
panicIfErr(err)

sql := fmt.Sprintf(`
SELECT
date_trunc('minute', ts) as minute,
%s
FROM cpu
WHERE %s IN ('%s')
AND ts >= %d
AND ts < %d
GROUP BY minute
ORDER BY minute ASC`,
strings.Join(selectClauses, ", "),
hostnameField,
strings.Join(hosts, "', '"),
interval.StartUnixMillis(),
interval.EndUnixMillis())

humanLabel := fmt.Sprintf(
"CrateDB %d cpu metric(s), random %4d hosts, random %s by 1m",
numMetrics, nHosts, timeRange)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
d.fillInQuery(qi, humanLabel, humanDesc, sql)
}

// fill Query fills the query struct with data
func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) {
q := qi.(*query.CrateDB)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
q.Table = []byte("cpu")
q.SqlQuery = []byte(sql)
}
Loading

0 comments on commit 3adcb55

Please sign in to comment.