Skip to content

Commit

Permalink
Add Akumuli database support
Browse files Browse the repository at this point in the history
This PR adds support for the Akumuli database for the devops use case. Akumuli is able to handle all queries for devops except for the GroupByOrderByLimit.
  • Loading branch information
Lazin authored and RobAtticus committed Dec 28, 2019
1 parent f38a8a8 commit 938f015
Show file tree
Hide file tree
Showing 17 changed files with 1,128 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Current databases supported:
+ ClickHouse [(supplemental docs)](docs/clickhouse.md)
+ CrateDB [(supplemental docs)](docs/cratedb.md)
+ SiriDB [(supplemental docs)](docs/siridb.md)
+ Akumuli [(supplemental docs)](docs/akumuli.md)

## Overview

Expand Down
141 changes: 141 additions & 0 deletions cmd/tsbs_generate_data/serialize/akumuli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package serialize

import (
"encoding/binary"
"errors"
"fmt"
"io"
)

const (
placeholderText = "AAAAFFEE"
)

// AkumuliSerializer writes a series of Point elements into RESP encoded
// buffer.
type AkumuliSerializer struct {
book map[string]uint32
bookClosed bool
deferred []byte
index uint32
}

// NewAkumuliSerializer initializes AkumuliSerializer instance.
func NewAkumuliSerializer() *AkumuliSerializer {
s := &AkumuliSerializer{}
s.book = make(map[string]uint32)
s.deferred = make([]byte, 0, 4096)
s.bookClosed = false
return s
}

// Serialize writes Point data to the given writer, conforming to the
// AKUMULI RESP protocol. Serializer adds extra data to guide data loader.
// This function writes output that contains binary and text data in RESP format.
func (s *AkumuliSerializer) Serialize(p *Point, w io.Writer) (err error) {
deferPoint := false

buf := make([]byte, 0, 1024)
// Add cue
const HeaderLength = 8
buf = append(buf, placeholderText...)
buf = append(buf, "+"...)

// Series name
for i := 0; i < len(p.fieldKeys); i++ {
buf = append(buf, p.measurementName...)
buf = append(buf, '.')
buf = append(buf, p.fieldKeys[i]...)
if i+1 < len(p.fieldKeys) {
buf = append(buf, '|')
} else {
buf = append(buf, ' ')
}
}

for i := 0; i < len(p.tagKeys); i++ {
buf = append(buf, ' ')
buf = append(buf, p.tagKeys[i]...)
buf = append(buf, '=')
buf = append(buf, p.tagValues[i].(string)...)
}

series := string(buf[HeaderLength:])
if !s.bookClosed {
// Save point for later
if id, ok := s.book[series]; ok {
s.bookClosed = true
_, err = w.Write(s.deferred)
if err != nil {
return err
}
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
// Shortcut
s.index++
tmp := make([]byte, 0, 1024)
tmp = append(tmp, placeholderText...)
tmp = append(tmp, "*2\n"...)
tmp = append(tmp, buf[HeaderLength:]...)
tmp = append(tmp, '\n')
tmp = append(tmp, fmt.Sprintf(":%d\n", s.index)...)
s.book[series] = s.index
// Update cue
binary.LittleEndian.PutUint16(tmp[4:6], uint16(len(tmp)))
binary.LittleEndian.PutUint16(tmp[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(tmp[:4], s.index)
binary.LittleEndian.PutUint32(buf[:4], s.index)
_, err = w.Write(tmp)
if err != nil {
return err
}
deferPoint = true
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", s.index)...)
}
} else {
// Replace the series name with the value from the book
if id, ok := s.book[series]; ok {
buf = buf[:HeaderLength]
buf = append(buf, fmt.Sprintf(":%d", id)...)
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(0))
binary.LittleEndian.PutUint32(buf[:4], id)
} else {
return errors.New("unexpected series name")
}
}

buf = append(buf, '\n')

// Timestamp
buf = append(buf, ':')
buf = fastFormatAppend(p.timestamp.UTC().UnixNano(), buf)
buf = append(buf, '\n')

// Values
buf = append(buf, fmt.Sprintf("*%d\n", len(p.fieldValues))...)
for i := 0; i < len(p.fieldValues); i++ {
v := p.fieldValues[i]
switch v.(type) {
case int, int64:
buf = append(buf, ':')
case float64:
buf = append(buf, '+')
}
buf = fastFormatAppend(v, buf)
buf = append(buf, '\n')
}

// Update cue
binary.LittleEndian.PutUint16(buf[4:6], uint16(len(buf)))
binary.LittleEndian.PutUint16(buf[6:HeaderLength], uint16(len(p.fieldValues)))
if deferPoint {
s.deferred = append(s.deferred, buf...)
return nil
}
_, err = w.Write(buf)
return err
}
78 changes: 78 additions & 0 deletions cmd/tsbs_generate_data/serialize/akumuli_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package serialize

import (
"bytes"
"strings"
"testing"
)

func TestAkumuliSerializerSerialize(t *testing.T) {

serializer := NewAkumuliSerializer()

points := []*Point{
testPointDefault,
testPointInt,
testPointMultiField,
testPointDefault,
testPointInt,
testPointMultiField,
}

type testCase struct {
expCount int
expValue string
name string
}

cases := []testCase{
{
expCount: 1,
expValue: "+cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name default",
},
{
expCount: 1,
expValue: "+cpu.usage_guest hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name int",
},
{
expCount: 1,
expValue: "+cpu.big_usage_guest|cpu.usage_guest|cpu.usage_guest_nice hostname=host_0 region=eu-west-1 datacenter=eu-west-1b",
name: "series name multi-field",
},
{
expCount: 2,
expValue: "*1\n+38.24311829",
name: "value default",
},
{
expCount: 2,
expValue: "*1\n:38",
name: "value int",
},
{
expCount: 2,
expValue: "*3\n:5000000000\n:38\n+38.24311829",
name: "value multi-field",
},
{
expCount: 6,
expValue: ":1451606400000000000",
name: "timestamp",
},
}
buf := new(bytes.Buffer)
for _, point := range points {
serializer.Serialize(point, buf)
}

got := buf.String()

for _, c := range cases {
actualCnt := strings.Count(got, c.expValue)
if actualCnt != c.expCount {
t.Errorf("Output incorrect: %s expected %d times got %d times", c.name, c.expCount, actualCnt)
}
}
}
45 changes: 45 additions & 0 deletions cmd/tsbs_generate_queries/databases/akumuli/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package akumuli

import (
"time"

"github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops"
"github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils"
"github.com/timescale/tsbs/query"
)

// BaseGenerator contains settings specific for Akumuli database.
type BaseGenerator struct {
}

// GenerateEmptyQuery returns an empty query.HTTP
func (d *Devops) GenerateEmptyQuery() query.Query {
return query.NewHTTP()
}

// fillInQuery fills the query struct with data.
func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, body string, begin, end int64) {
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(humanDesc)
q.Method = []byte("POST")
q.Path = []byte("/api/query")
q.Body = []byte(body)
q.StartTimestamp = begin
q.EndTimestamp = end
}

// NewDevops makes an Devops object ready to generate Queries.
func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) {
core, err := devops.NewCore(start, end, scale)
if err != nil {
return nil, err
}

devops := &Devops{
BaseGenerator: g,
Core: core,
}

return devops, nil
}
Loading

0 comments on commit 938f015

Please sign in to comment.