Skip to content

Commit

Permalink
Merge pull request #12 from RTradeLtd/badgeropt
Browse files Browse the repository at this point in the history
Codebase Cleanup, Badger Query Optimization, PostgreSQL Index Creation
bonedaddy authored May 18, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 77686be + 29bfd3a commit bc2b375
Showing 10 changed files with 398 additions and 448 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -27,5 +27,5 @@ jobs:

# specify any bash command here prefixed with `run: `
- run: go get -v -t -d ./...
- run: go test -v -race -short -coverprofile=coverage.txt ./...
- run: go test -v -race -short -coverprofile=coverage.txt -timeout 1800s ./...
- run: bash <(curl -s https://codecov.io/bash)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# go-datastores

[![codecov](https://codecov.io/gh/RTradeLtd/go-datastores/branch/master/graph/badge.svg)](https://codecov.io/gh/RTradeLtd/go-datastores) [![Build Status](https://travis-ci.com/RTradeLtd/go-datastores.svg?branch=master)](https://travis-ci.com/RTradeLtd/go-datastores) [![RTradeLtd](https://circleci.com/gh/RTradeLtd/go-datastores.svg?style=shield)](https://app.circleci.com/pipelines/github/RTradeLtd/go-datastores) [![Go Report Card](https://goreportcard.com/badge/github.com/RTradeLtd/go-datastores)](https://goreportcard.com/report/github.com/RTradeLtd/go-datastores)


`go-datastores` is a collection of a variety of IPFS datastores to be used by TemporalX in a single monorepo for easy maintenance. A majority of these datastores are forked from upstream repositories, with minor modifications to faciltiate easier integration with TemporalX, along with performance improvements and optimizations where possible. Additionally it allows us to pull in all datastores we need from a single repository.

If you are a user of TemporalX and want to be able to use datastores that we do not yet support, you can submit a PR and we'll enable usage of the datastore within our next release
164 changes: 0 additions & 164 deletions badger/datastore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dsbadger

import (
"context"
"errors"
"log"
"time"
@@ -519,169 +518,6 @@ func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
return t.query(q)
}

func (t *txn) query(q dsq.Query) (dsq.Results, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly
prefix := ds.NewKey(q.Prefix).String()
if prefix != "/" {
opt.Prefix = []byte(prefix + "/")
}
// Handle ordering
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// We order by key by default.
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// Reverse order by key
opt.Reverse = true
default:
// Ok, we have a weird order we can't handle. Let's
// perform the _base_ query (prefix, filter, etc.), then
// handle sort/offset/limit later.

// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Limit = 0
baseQuery.Offset = 0
baseQuery.Orders = nil

// perform the base query.
res, err := t.query(baseQuery)
if err != nil {
return nil, err
}

// fix the query
res = dsq.ResultsReplaceQuery(res, q)

// Remove the parts we've already applied.
naiveQuery := q
naiveQuery.Prefix = ""
naiveQuery.Filters = nil

// Apply the rest of the query
return dsq.NaiveQueryApply(naiveQuery, res), nil
}
}
var (
it = t.txn.NewIterator(opt)
done = make(chan bool)
resultChan = make(chan dsq.Result)
entries = make([]dsq.Entry, 0)
)
go func() {
defer func() {
done <- true
}()
if t.ds.closed.IsSet() {
return
}
// this iterator is part of an implicit transaction, so when
// we're done we must discard the transaction. It's safe to
// discard the txn it because it contains the iterator only.
if t.implicit {
defer t.discard()
}
defer it.Close()
// All iterators must be started by rewinding.
it.Rewind()
// skip to the offset
for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() {
// On the happy path, we have no filters and we can go
// on our way.
if len(q.Filters) == 0 {
skipped++
continue
}
// On the sad path, we need to apply filters before
// counting the item as "skipped" as the offset comes
// _after_ the filter.
item := it.Item()
matches := true
check := func(value []byte) error {
e := dsq.Entry{Key: string(item.Key()), Value: value, Size: int(item.ValueSize())}
// Only calculate expirations if we need them.
if q.ReturnExpirations {
e.Expiration = expires(item)
}
matches = filter(q.Filters, e)
return nil
}
// Maybe check with the value, only if we need it.
var err error
if q.KeysOnly {
err = check(nil)
} else {
err = item.Value(check)
}
if err != nil {
select {
case resultChan <- dsq.Result{Error: err}:
case <-t.ds.closing:
return
case <-ctx.Done():
return
}
}
if !matches {
skipped++
}
}
for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() {
item := it.Item()
e := dsq.Entry{Key: string(item.Key())}
// Maybe get the value
var result dsq.Result
if !q.KeysOnly {
b, err := item.ValueCopy(nil)
if err != nil {
result = dsq.Result{Error: err}
} else {
e.Value = b
e.Size = len(b)
result = dsq.Result{Entry: e}
}
} else {
e.Size = int(item.ValueSize())
result = dsq.Result{Entry: e}
}

if q.ReturnExpirations {
result.Expiration = expires(item)
}

// Finally, filter it (unless we're dealing with an error).
if result.Error == nil && filter(q.Filters, e) {
continue
}

select {
case resultChan <- result:
sent++
case <-ctx.Done():
return
case <-t.ds.closing:
return
}
}
}()
for {
select {
case <-done:
goto FINISHED
case result := <-resultChan:
if result.Error != nil {
log.Println("query result failure: ", result.Error)
}
entries = append(entries, result.Entry)
}
}
FINISHED:
return dsq.ResultsWithEntries(q, entries), nil
}

func (t *txn) Commit() error {
if t.ds.closed.IsSet() {
return ErrClosed
79 changes: 13 additions & 66 deletions badger/ds_test.go
Original file line number Diff line number Diff line change
@@ -10,23 +10,12 @@ import (
"testing"
"time"

"github.com/RTradeLtd/go-datastores/testutils"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
)

var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
"/a/b/c": "abc",
"/a/b/d": "a/b/d",
"/a/c": "ac",
"/a/d": "ad",
"/e": "e",
"/f": "f",
"/g": "",
}

// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
@@ -66,26 +55,6 @@ func newDSSync(t *testing.T, sync bool) (*Datastore, func()) {
}
}

func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
t.Fatal(err)
}
}

for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
if err != nil {
t.Fatal(err)
}
if string(v2) != v {
t.Errorf("%s values differ: %s != %s", k, v, v2)
}
}
}

func Test_Sync(t *testing.T) {
type args struct {
sync bool
@@ -129,12 +98,12 @@ func TestQuery(t *testing.T) {
d, done := newDS(t)
defer done()

addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a/b",
"/a/b/c",
"/a/b/d",
@@ -147,7 +116,7 @@ func TestQuery(t *testing.T) {
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs)
@@ -156,7 +125,7 @@ func TestQuery(t *testing.T) {
func TestHas(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c"))
if err != nil {
@@ -180,14 +149,14 @@ func TestHas(t *testing.T) {
func TestGetSize(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

size, err := d.GetSize(ds.NewKey("/a/b/c"))
if err != nil {
t.Error(err)
}

if size != len(testcases["/a/b/c"]) {
if size != len(testutils.TestCases["/a/b/c"]) {
t.Error("")
}

@@ -200,7 +169,7 @@ func TestGetSize(t *testing.T) {
func TestNotExistGet(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c/d"))
if err != nil {
@@ -227,7 +196,7 @@ func TestNotExistGet(t *testing.T) {
func TestDelete(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c"))
if err != nil {
@@ -270,28 +239,6 @@ func TestGetEmpty(t *testing.T) {
}
}

func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}

if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for _, k := range expect {
found := false
for _, e := range actual {
if e.Key == k {
found = true
}
}
if !found {
t.Error(k, "not found")
}
}
}

func TestBatching(t *testing.T) {
d, done := newDS(t)
defer done()
@@ -301,7 +248,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

for k, v := range testcases {
for k, v := range testutils.TestCases {
err := b.Put(ds.NewKey(k), []byte(v))
if err != nil {
t.Fatal(err)
@@ -313,7 +260,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

for k, v := range testcases {
for k, v := range testutils.TestCases {
val, err := d.Get(ds.NewKey(k))
if err != nil {
t.Fatal(err)
@@ -351,7 +298,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a",
"/a/b/d",
"/a/c",
@@ -656,7 +603,7 @@ func TestDiskUsage(t *testing.T) {
if err != nil {
t.Fatal(err)
}
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)
d.Close()

d, err = NewDatastore(path, nil)
Loading

0 comments on commit bc2b375

Please sign in to comment.