From 8962e2e1bfbbe2a3336e42edfdd40b1be16e6269 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 10 May 2020 15:07:04 -0700 Subject: [PATCH 01/12] badger: try and optimize query function --- badger/datastore.go | 164 -------------------------------------------- badger/query.go | 121 ++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 164 deletions(-) create mode 100644 badger/query.go diff --git a/badger/datastore.go b/badger/datastore.go index 254d872..9ef5861 100644 --- a/badger/datastore.go +++ b/badger/datastore.go @@ -1,7 +1,6 @@ package dsbadger import ( - "context" "errors" "log" "time" @@ -519,169 +518,6 @@ func (t *txn) Query(q dsq.Query) (dsq.Results, error) { return t.query(q) } -func (t *txn) query(q dsq.Query) (dsq.Results, error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - opt := badger.DefaultIteratorOptions - opt.PrefetchValues = !q.KeysOnly - prefix := ds.NewKey(q.Prefix).String() - if prefix != "/" { - opt.Prefix = []byte(prefix + "/") - } - // Handle ordering - if len(q.Orders) > 0 { - switch q.Orders[0].(type) { - case dsq.OrderByKey, *dsq.OrderByKey: - // We order by key by default. - case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: - // Reverse order by key - opt.Reverse = true - default: - // Ok, we have a weird order we can't handle. Let's - // perform the _base_ query (prefix, filter, etc.), then - // handle sort/offset/limit later. - - // Skip the stuff we can't apply. - baseQuery := q - baseQuery.Limit = 0 - baseQuery.Offset = 0 - baseQuery.Orders = nil - - // perform the base query. - res, err := t.query(baseQuery) - if err != nil { - return nil, err - } - - // fix the query - res = dsq.ResultsReplaceQuery(res, q) - - // Remove the parts we've already applied. - naiveQuery := q - naiveQuery.Prefix = "" - naiveQuery.Filters = nil - - // Apply the rest of the query - return dsq.NaiveQueryApply(naiveQuery, res), nil - } - } - var ( - it = t.txn.NewIterator(opt) - done = make(chan bool) - resultChan = make(chan dsq.Result) - entries = make([]dsq.Entry, 0) - ) - go func() { - defer func() { - done <- true - }() - if t.ds.closed.IsSet() { - return - } - // this iterator is part of an implicit transaction, so when - // we're done we must discard the transaction. It's safe to - // discard the txn it because it contains the iterator only. - if t.implicit { - defer t.discard() - } - defer it.Close() - // All iterators must be started by rewinding. - it.Rewind() - // skip to the offset - for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() { - // On the happy path, we have no filters and we can go - // on our way. - if len(q.Filters) == 0 { - skipped++ - continue - } - // On the sad path, we need to apply filters before - // counting the item as "skipped" as the offset comes - // _after_ the filter. - item := it.Item() - matches := true - check := func(value []byte) error { - e := dsq.Entry{Key: string(item.Key()), Value: value, Size: int(item.ValueSize())} - // Only calculate expirations if we need them. - if q.ReturnExpirations { - e.Expiration = expires(item) - } - matches = filter(q.Filters, e) - return nil - } - // Maybe check with the value, only if we need it. - var err error - if q.KeysOnly { - err = check(nil) - } else { - err = item.Value(check) - } - if err != nil { - select { - case resultChan <- dsq.Result{Error: err}: - case <-t.ds.closing: - return - case <-ctx.Done(): - return - } - } - if !matches { - skipped++ - } - } - for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() { - item := it.Item() - e := dsq.Entry{Key: string(item.Key())} - // Maybe get the value - var result dsq.Result - if !q.KeysOnly { - b, err := item.ValueCopy(nil) - if err != nil { - result = dsq.Result{Error: err} - } else { - e.Value = b - e.Size = len(b) - result = dsq.Result{Entry: e} - } - } else { - e.Size = int(item.ValueSize()) - result = dsq.Result{Entry: e} - } - - if q.ReturnExpirations { - result.Expiration = expires(item) - } - - // Finally, filter it (unless we're dealing with an error). - if result.Error == nil && filter(q.Filters, e) { - continue - } - - select { - case resultChan <- result: - sent++ - case <-ctx.Done(): - return - case <-t.ds.closing: - return - } - } - }() - for { - select { - case <-done: - goto FINISHED - case result := <-resultChan: - if result.Error != nil { - log.Println("query result failure: ", result.Error) - } - entries = append(entries, result.Entry) - } - } -FINISHED: - return dsq.ResultsWithEntries(q, entries), nil -} - func (t *txn) Commit() error { if t.ds.closed.IsSet() { return ErrClosed diff --git a/badger/query.go b/badger/query.go new file mode 100644 index 0000000..536d9ba --- /dev/null +++ b/badger/query.go @@ -0,0 +1,121 @@ +// Copyright 2020 RTrade Technologies Ltd +// +// licensed under GNU AFFERO GENERAL PUBLIC LICENSE; +// you may not use this file except in compliance with the License; +// You may obtain the license via the LICENSE file in the repository root; +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dsbadger + +import ( + badger "github.com/dgraph-io/badger/v2" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +/* + This file contians an optimize query function as the one included from upstream allocates a lot of memory + when parsing very large datasets. A key-value store with approximately 1.7TB of data stored requires a runtime allocation + of 2GB to simply iterate over all of its keys +*/ + +func (t *txn) query(q dsq.Query) (dsq.Results, error) { + if t.ds.closed.IsSet() { + return nil, ErrClosed + } + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = !q.KeysOnly + prefix := ds.NewKey(q.Prefix).String() + if prefix != "/" { + opt.Prefix = []byte(prefix + "/") + } + // Handle ordering + if len(q.Orders) > 0 { + switch q.Orders[0].(type) { + case dsq.OrderByKey, *dsq.OrderByKey: + case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: + opt.Reverse = true + default: + baseQuery := q + baseQuery.Limit = 0 + baseQuery.Offset = 0 + baseQuery.Orders = nil + res, err := t.query(baseQuery) + if err != nil { + return nil, err + } + res = dsq.ResultsReplaceQuery(res, q) + naiveQuery := q + naiveQuery.Prefix = "" + naiveQuery.Filters = nil + return dsq.NaiveQueryApply(naiveQuery, res), nil + } + } + var ( + it = t.txn.NewIterator(opt) + item *badger.Item + entries []dsq.Entry + ) + if t.implicit { + defer t.discard() + } + defer it.Close() + it.Rewind() + for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() { + if len(q.Filters) == 0 { + skipped++ + continue + } + item = it.Item() + matches := true + check := func(value []byte) error { + e := dsq.Entry{Key: string(item.Key()), Value: value, Size: int(item.ValueSize())} + // Only calculate expirations if we need them. + if q.ReturnExpirations { + e.Expiration = expires(item) + } + matches = filter(q.Filters, e) + return nil + } + if q.KeysOnly { + _ = check(nil) + } else { + _ = item.Value(check) + } + if !matches { + skipped++ + } + } + var ( + value []byte + dest []byte + ) + for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() { + item = it.Item() + entry := dsq.Entry{Key: string(item.Key())} + var copyErr error + if !q.KeysOnly { + value, copyErr = item.ValueCopy(dest) + if copyErr == nil { + entry.Value = value + } + entry.Size = len(entry.Value) + } else { + entry.Size = int(item.ValueSize()) + } + if q.ReturnExpirations { + entry.Expiration = expires(item) + } + if copyErr == nil && filter(q.Filters, entry) { + continue + } + entries = append(entries, entry) + sent++ + } + return dsq.ResultsWithEntries(q, entries), nil +} From f01b45462eaaf04e4993d764a9bd780309749715 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 10 May 2020 15:12:02 -0700 Subject: [PATCH 02/12] badger: make sure to reuse the value slice --- badger/query.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/badger/query.go b/badger/query.go index 536d9ba..db73d21 100644 --- a/badger/query.go +++ b/badger/query.go @@ -91,16 +91,13 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) { skipped++ } } - var ( - value []byte - dest []byte - ) + var value []byte for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() { item = it.Item() entry := dsq.Entry{Key: string(item.Key())} var copyErr error if !q.KeysOnly { - value, copyErr = item.ValueCopy(dest) + value, copyErr = item.ValueCopy(value) if copyErr == nil { entry.Value = value } From aea32e489883146b901c8b803e72efb328715ddf Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 10 May 2020 15:38:38 -0700 Subject: [PATCH 03/12] remove license header --- badger/query.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/badger/query.go b/badger/query.go index db73d21..30683f9 100644 --- a/badger/query.go +++ b/badger/query.go @@ -1,15 +1,3 @@ -// Copyright 2020 RTrade Technologies Ltd -// -// licensed under GNU AFFERO GENERAL PUBLIC LICENSE; -// you may not use this file except in compliance with the License; -// You may obtain the license via the LICENSE file in the repository root; -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package dsbadger import ( From 2720d4509c3285d33dd62f293f53c2dd232f344a Mon Sep 17 00:00:00 2001 From: postables Date: Mon, 11 May 2020 14:12:26 -0700 Subject: [PATCH 04/12] badger: fix query --- badger/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/badger/query.go b/badger/query.go index 30683f9..35deb22 100644 --- a/badger/query.go +++ b/badger/query.go @@ -85,7 +85,7 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) { entry := dsq.Entry{Key: string(item.Key())} var copyErr error if !q.KeysOnly { - value, copyErr = item.ValueCopy(value) + value, copyErr = item.ValueCopy(nil) if copyErr == nil { entry.Value = value } From 4ce9416f1d6158386b0f007a85b51a22268f05dc Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 12 May 2020 22:32:13 -0700 Subject: [PATCH 05/12] switch to test utils package --- badger/ds_test.go | 79 ++++----------------- leveldb/ds_test.go | 99 ++++---------------------- sql/postgres/postgres_test.go | 129 ++++------------------------------ testutils/testutil.go | 116 ++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 265 deletions(-) create mode 100644 testutils/testutil.go diff --git a/badger/ds_test.go b/badger/ds_test.go index 48730f4..0b8fe2c 100644 --- a/badger/ds_test.go +++ b/badger/ds_test.go @@ -10,23 +10,12 @@ import ( "testing" "time" + "github.com/RTradeLtd/go-datastores/testutils" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dstest "github.com/ipfs/go-datastore/test" ) -var testcases = map[string]string{ - "/a": "a", - "/a/b": "ab", - "/a/b/c": "abc", - "/a/b/d": "a/b/d", - "/a/c": "ac", - "/a/d": "ad", - "/e": "e", - "/f": "f", - "/g": "", -} - // returns datastore, and a function to call on exit. // (this garbage collects). So: // @@ -66,26 +55,6 @@ func newDSSync(t *testing.T, sync bool) (*Datastore, func()) { } } -func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) { - for k, v := range testcases { - dsk := ds.NewKey(k) - if err := d.Put(dsk, []byte(v)); err != nil { - t.Fatal(err) - } - } - - for k, v := range testcases { - dsk := ds.NewKey(k) - v2, err := d.Get(dsk) - if err != nil { - t.Fatal(err) - } - if string(v2) != v { - t.Errorf("%s values differ: %s != %s", k, v, v2) - } - } -} - func Test_Sync(t *testing.T) { type args struct { sync bool @@ -129,12 +98,12 @@ func TestQuery(t *testing.T) { d, done := newDS(t) defer done() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) rs, err := d.Query(dsq.Query{Prefix: "/a/"}) if err != nil { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b", "/a/b/c", "/a/b/d", @@ -147,7 +116,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b/d", "/a/c", }, rs) @@ -156,7 +125,7 @@ func TestQuery(t *testing.T) { func TestHas(t *testing.T) { d, done := newDS(t) defer done() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(ds.NewKey("/a/b/c")) if err != nil { @@ -180,14 +149,14 @@ func TestHas(t *testing.T) { func TestGetSize(t *testing.T) { d, done := newDS(t) defer done() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) size, err := d.GetSize(ds.NewKey("/a/b/c")) if err != nil { t.Error(err) } - if size != len(testcases["/a/b/c"]) { + if size != len(testutils.TestCases["/a/b/c"]) { t.Error("") } @@ -200,7 +169,7 @@ func TestGetSize(t *testing.T) { func TestNotExistGet(t *testing.T) { d, done := newDS(t) defer done() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(ds.NewKey("/a/b/c/d")) if err != nil { @@ -227,7 +196,7 @@ func TestNotExistGet(t *testing.T) { func TestDelete(t *testing.T) { d, done := newDS(t) defer done() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(ds.NewKey("/a/b/c")) if err != nil { @@ -270,28 +239,6 @@ func TestGetEmpty(t *testing.T) { } } -func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { - actual, err := actualR.Rest() - if err != nil { - t.Error(err) - } - - if len(actual) != len(expect) { - t.Error("not enough", expect, actual) - } - for _, k := range expect { - found := false - for _, e := range actual { - if e.Key == k { - found = true - } - } - if !found { - t.Error(k, "not found") - } - } -} - func TestBatching(t *testing.T) { d, done := newDS(t) defer done() @@ -301,7 +248,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { err := b.Put(ds.NewKey(k), []byte(v)) if err != nil { t.Fatal(err) @@ -313,7 +260,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { val, err := d.Get(ds.NewKey(k)) if err != nil { t.Fatal(err) @@ -351,7 +298,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a", "/a/b/d", "/a/c", @@ -656,7 +603,7 @@ func TestDiskUsage(t *testing.T) { if err != nil { t.Fatal(err) } - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) d.Close() d, err = NewDatastore(path, nil) diff --git a/leveldb/ds_test.go b/leveldb/ds_test.go index 7886859..bc827b0 100644 --- a/leveldb/ds_test.go +++ b/leveldb/ds_test.go @@ -8,23 +8,13 @@ import ( "sort" "testing" + "github.com/RTradeLtd/go-datastores/testutils" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dstest "github.com/ipfs/go-datastore/test" "github.com/ucwong/goleveldb/leveldb" ) -var testcases = map[string]string{ - "/a": "a", - "/a/b": "ab", - "/a/b/c": "abc", - "/a/b/d": "a/b/d", - "/a/c": "ac", - "/a/d": "ad", - "/e": "e", - "/f": "f", -} - // returns datastore, and a function to call on exit. // (this garbage collects). So: // @@ -44,36 +34,15 @@ func newDS(t *testing.T) (*Datastore, func()) { } } -func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) { - for k, v := range testcases { - dsk := ds.NewKey(k) - if err := d.Put(dsk, []byte(v)); err != nil { - t.Fatal(err) - } - } - - for k, v := range testcases { - dsk := ds.NewKey(k) - v2, err := d.Get(dsk) - if err != nil { - t.Fatal(err) - } - if string(v2) != v { - t.Errorf("%s values differ: %s != %s", k, v, v2) - } - } - -} - func testQuery(t *testing.T, d *Datastore) { - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) rs, err := d.Query(dsq.Query{Prefix: "/a/"}) if err != nil { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b", "/a/b/c", "/a/b/d", @@ -88,7 +57,7 @@ func testQuery(t *testing.T, d *Datastore) { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b/d", "/a/c", }, rs) @@ -100,13 +69,13 @@ func testQuery(t *testing.T, d *Datastore) { t.Fatal(err) } - keys := make([]string, 0, len(testcases)) - for k := range testcases { + keys := make([]string, 0, len(testutils.TestCases)) + for k := range testutils.TestCases { keys = append(keys, k) } sort.Strings(keys) - expectOrderedMatches(t, keys, rs) + testutils.ExpectKeyOrderMatches(t, rs, keys) rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKeyDescending{}}}) if err != nil { @@ -118,7 +87,7 @@ func testQuery(t *testing.T, d *Datastore) { keys[i], keys[j] = keys[j], keys[i] } - expectOrderedMatches(t, keys, rs) + testutils.ExpectKeyOrderMatches(t, rs, keys) } func TestEmptyOpts(t *testing.T) { @@ -142,7 +111,7 @@ func TestQuery(t *testing.T) { func TestQueryRespectsProcess(t *testing.T) { d, close := newDS(t) defer close() - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) } func TestCloseRace(t *testing.T) { @@ -161,7 +130,7 @@ func TestCloseRace(t *testing.T) { close() closeCh <- nil }() - for k := range testcases { + for k := range testutils.TestCases { tx.Get(ds.NewKey(k)) } tx.Commit() @@ -170,7 +139,7 @@ func TestCloseRace(t *testing.T) { func TestCloseSafety(t *testing.T) { d, close := newDS(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) tx, _ := d.NewTransaction(false) err := tx.Put(ds.NewKey("test"), []byte("test")) @@ -187,53 +156,13 @@ func TestCloseSafety(t *testing.T) { } } -func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { - t.Helper() - actual, err := actualR.Rest() - if err != nil { - t.Error(err) - } - - if len(actual) != len(expect) { - t.Error("not enough", expect, actual) - } - for _, k := range expect { - found := false - for _, e := range actual { - if e.Key == k { - found = true - } - } - if !found { - t.Error(k, "not found") - } - } -} - -func expectOrderedMatches(t *testing.T, expect []string, actualR dsq.Results) { - t.Helper() - actual, err := actualR.Rest() - if err != nil { - t.Error(err) - } - - if len(actual) != len(expect) { - t.Error("not enough", expect, actual) - } - for i := range expect { - if expect[i] != actual[i].Key { - t.Errorf("expected %q, got %q", expect[i], actual[i].Key) - } - } -} - func testBatching(t *testing.T, d *Datastore) { b, err := d.Batch() if err != nil { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { err := b.Put(ds.NewKey(k), []byte(v)) if err != nil { t.Fatal(err) @@ -245,7 +174,7 @@ func testBatching(t *testing.T, d *Datastore) { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { val, err := d.Get(ds.NewKey(k)) if err != nil { t.Fatal(err) @@ -265,7 +194,7 @@ func TestBatching(t *testing.T) { func TestDiskUsage(t *testing.T) { d, done := newDS(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) du, err := d.DiskUsage() if err != nil { t.Fatal(err) diff --git a/sql/postgres/postgres_test.go b/sql/postgres/postgres_test.go index c59e53f..f772571 100644 --- a/sql/postgres/postgres_test.go +++ b/sql/postgres/postgres_test.go @@ -11,22 +11,11 @@ import ( dstest "github.com/ipfs/go-datastore/test" sqlds "github.com/RTradeLtd/go-datastores/sql" + "github.com/RTradeLtd/go-datastores/testutils" "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" ) -var testcases = map[string]string{ - "/a": "a", - "/a/b": "ab", - "/a/b/c": "abc", - "/a/b/d": "a/b/d", - "/a/c": "ac", - "/a/d": "ad", - "/e": "e", - "/f": "f", - "/g": "", -} - func newDS(t *testing.T) (*sqlds.Datastore, func(t *testing.T)) { opts := &Options{ Host: "127.0.0.1", @@ -49,27 +38,6 @@ func newDS(t *testing.T) (*sqlds.Datastore, func(t *testing.T)) { } } -func addTestCases(t *testing.T, d *sqlds.Datastore, testcases map[string]string) { - for k, v := range testcases { - dsk := datastore.NewKey(k) - if err := d.Put(dsk, []byte(v)); err != nil { - t.Fatal(err) - } - } - - for k, v := range testcases { - dsk := datastore.NewKey(k) - v2, err := d.Get(dsk) - if err != nil { - t.Fatal(err) - } - v2b := v2 - if string(v2b) != v { - t.Errorf("%s values differ: %s != %s", k, v, v2) - } - } -} - func TestPostgres_Queries(t *testing.T) { tableName := "querytabletest" queries := NewQueries(tableName) @@ -159,14 +127,14 @@ func TestQuery(t *testing.T) { d, done := newDS(t) defer done(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) // test prefix rs, err := d.Query(dsq.Query{Prefix: "/a/"}) if err != nil { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b", "/a/b/c", "/a/b/d", @@ -179,7 +147,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a/b/d", "/a/c", }, rs) @@ -191,7 +159,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectKeyOrderMatches(t, rs, []string{ + testutils.ExpectKeyOrderMatches(t, rs, []string{ "/a/b", "/a/b/c", "/a/b/d", @@ -205,7 +173,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectKeyOrderMatches(t, rs, []string{ + testutils.ExpectKeyOrderMatches(t, rs, []string{ "/a/d", "/a/c", "/a/b/d", @@ -220,7 +188,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectKeyFilterMatches(t, rs, []string{"/a/b"}) + testutils.ExpectKeyFilterMatches(t, rs, []string{"/a/b"}) greaterThanFilter := dsq.FilterKeyCompare{Op: dsq.GreaterThan, Key: "/a/b"} greaterThanFilters := []dsq.Filter{greaterThanFilter} @@ -228,7 +196,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectKeyFilterMatches(t, rs, []string{ + testutils.ExpectKeyFilterMatches(t, rs, []string{ "/a/b/c", "/a/b/d", "/a/c", @@ -241,7 +209,7 @@ func TestQuery(t *testing.T) { if err != nil { t.Fatal(err) } - expectKeyFilterMatches(t, rs, []string{ + testutils.ExpectKeyFilterMatches(t, rs, []string{ "/a/b", "/a/b/c", }) @@ -250,7 +218,7 @@ func TestQuery(t *testing.T) { func TestHas(t *testing.T) { d, done := newDS(t) defer done(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(datastore.NewKey("/a/b/c")) if err != nil { @@ -274,7 +242,7 @@ func TestHas(t *testing.T) { func TestNotExistGet(t *testing.T) { d, done := newDS(t) defer done(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(datastore.NewKey("/a/b/c/d")) if err != nil { @@ -301,7 +269,7 @@ func TestNotExistGet(t *testing.T) { func TestDelete(t *testing.T) { d, done := newDS(t) defer done(t) - addTestCases(t, d, testcases) + testutils.AddTestCases(t, d, testutils.TestCases) has, err := d.Has(datastore.NewKey("/a/b/c")) if err != nil { @@ -353,7 +321,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { err := b.Put(datastore.NewKey(k), []byte(v)) if err != nil { t.Fatal(err) @@ -365,7 +333,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - for k, v := range testcases { + for k, v := range testutils.TestCases { val, err := d.Get(datastore.NewKey(k)) if err != nil { t.Fatal(err) @@ -402,7 +370,7 @@ func TestBatching(t *testing.T) { t.Fatal(err) } - expectMatches(t, []string{ + testutils.ExpectMatches(t, []string{ "/a", "/a/b/d", "/a/c", @@ -810,70 +778,3 @@ func TestSuite(t *testing.T) { defer done(t) dstest.SubtestAll(t, d) } - -func expectKeyFilterMatches(t *testing.T, actual dsq.Results, expect []string) { - t.Helper() - actualE, err := actual.Rest() - if err != nil { - t.Error(err) - return - } - actualS := make([]string, len(actualE)) - for i, e := range actualE { - actualS[i] = e.Key - } - - if len(actualS) != len(expect) { - t.Error("length doesn't match.", expect, actualS) - return - } - - if strings.Join(actualS, "") != strings.Join(expect, "") { - t.Error("expect != actual.", expect, actualS) - return - } -} - -func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { - t.Helper() - actual, err := actualR.Rest() - if err != nil { - t.Error(err) - } - - if len(actual) != len(expect) { - t.Error("not enough", expect, actual) - } - for _, k := range expect { - found := false - for _, e := range actual { - if e.Key == k { - found = true - } - } - if !found { - t.Error(k, "not found") - } - } -} - -func expectKeyOrderMatches(t *testing.T, actual dsq.Results, expect []string) { - t.Helper() - rs, err := actual.Rest() - if err != nil { - t.Error("error fetching dsq.Results", expect, actual) - return - } - - if len(rs) != len(expect) { - t.Error("expect != actual.", expect, actual) - return - } - - for i, r := range rs { - if r.Key != expect[i] { - t.Error("expect != actual.", expect, actual) - return - } - } -} diff --git a/testutils/testutil.go b/testutils/testutil.go new file mode 100644 index 0000000..ba6dfc8 --- /dev/null +++ b/testutils/testutil.go @@ -0,0 +1,116 @@ +package testutils + +import ( + "strings" + "testing" + + "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +var ( + // TestCases are key-values to use in datastore testing + TestCases = map[string]string{ + "/a": "a", + "/a/b": "ab", + "/a/b/c": "abc", + "/a/b/d": "a/b/d", + "/a/c": "ac", + "/a/d": "ad", + "/e": "e", + "/f": "f", + "/g": "", + } +) + +// AddTestCases stores the key-value pairs in a datastore for testing +func AddTestCases(t *testing.T, d datastore.Datastore, testcases map[string]string) { + t.Helper() + for k, v := range testcases { + dsk := datastore.NewKey(k) + if err := d.Put(dsk, []byte(v)); err != nil { + t.Fatal(err) + } + } + + for k, v := range testcases { + dsk := datastore.NewKey(k) + v2, err := d.Get(dsk) + if err != nil { + t.Fatal(err) + } + if string(v2) != v { + t.Errorf("%s values differ: %s != %s", k, v, v2) + } + } +} + +// ExpectMatches is used to match expected results with actual results +func ExpectMatches(t *testing.T, expect []string, actualR dsq.Results) { + t.Helper() + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + + if len(actual) != len(expect) { + t.Error("not enough", expect, actual) + } + for _, k := range expect { + found := false + for _, e := range actual { + if e.Key == k { + found = true + } + } + if !found { + t.Error(k, "not found") + } + } +} + +// ExpectKeyFilterMatches is used to verify filtering results +func ExpectKeyFilterMatches(t *testing.T, actual dsq.Results, expect []string) { + t.Helper() + actualE, err := actual.Rest() + if err != nil { + t.Error(err) + return + } + actualS := make([]string, len(actualE)) + for i, e := range actualE { + actualS[i] = e.Key + } + + if len(actualS) != len(expect) { + t.Error("length doesn't match.", expect, actualS) + return + } + + if strings.Join(actualS, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actualS) + return + } +} + +// ExpectKeyOrderMatches is used to verify ordering results +func ExpectKeyOrderMatches(t *testing.T, actual dsq.Results, expect []string) { + t.Helper() + rs, err := actual.Rest() + if err != nil { + t.Error("error fetching dsq.Results", expect, actual) + return + } + + if len(rs) != len(expect) { + t.Error("expect != actual.", expect, actual) + return + } + + for i, r := range rs { + if r.Key != expect[i] { + t.Error("expect != actual.", expect, actual) + return + } + } +} From 865e3df02ae78b229ea452f8a14676549ff1ecd1 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 12 May 2020 22:57:51 -0700 Subject: [PATCH 06/12] sql/postgres: make it harder to drop tables --- sql/README.md | 52 +++++++++++++++++++++++++++++++++++ sql/postgres/postgres.go | 43 ++++++++++++++++++++++------- sql/postgres/postgres_test.go | 18 ++++++------ 3 files changed, 95 insertions(+), 18 deletions(-) create mode 100644 sql/README.md diff --git a/sql/README.md b/sql/README.md new file mode 100644 index 0000000..e996ff9 --- /dev/null +++ b/sql/README.md @@ -0,0 +1,52 @@ +# SQL Datastore + +`sqlds` allows using arbitrary SQL databases as an IPFS datastore. It defines a set of functionality that can be used by any golang client that satisfies the `database/sql` interface. Included in an implementation for the PostgreSQL database. This datastore is forked from `ipfs/go-ds-sql` with a few modifications, and cleanup to the codebase, consisting of: + +* Enable table recreation, migration, and index creation as an optional capability when using PostgreSQL +* Change testing scheme to ensure that the postgres functionality is tested and subsequently the datastore implementation is tested + * Upstream doesn't specifically test the postgres implementation which is likely to introduce bugs. +* Use the `uber-go/multierr` package to combine errors where appropriatey + * In transaction mode if any errors happen, the error returned from the rollback was not returned, instead we combine the error for better visibility + +# Usage (Postgres) + +## Easy (Automatic) + + +The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. + +## Hard (Manual) + +Ensure a database is created and a table exists with `key` and `data` columns. For example, in PostgreSQL you can create a table with the following structure (replacing `table_name` with the name of the table the datastore will use - by default this is `blocks`): + +```sql +CREATE TABLE IF NOT EXISTS table_name (key TEXT NOT NULL UNIQUE, data BYTEA) +``` + +It's recommended to create an index on the `key` column that is optimised for prefix scans. For example, in PostgreSQL you can create a `text_pattern_ops` index on the table: + +```sql +CREATE INDEX IF NOT EXISTS table_name_key_text_pattern_ops_idx ON table_name (key text_pattern_ops) +``` + +Import and use in your application: + +```go +import ( + "database/sql" + "github.com/RTradeLtd/go-datastores/sql" + pg "github.com/RTradeLtd/go-datastores/sql/postgres" +) + +mydb, _ := sql.Open("yourdb", "yourdbparameters") + +// Implement the Queries interface for your SQL impl. +// ...or use the provided PostgreSQL queries +queries := pg.NewQueries("blocks") + +ds := sqlds.NewDatastore(mydb, queries) +``` + +# license + +As this is forked from upstream, it retains its existing license which can be found in `LICENSE.orig`. \ No newline at end of file diff --git a/sql/postgres/postgres.go b/sql/postgres/postgres.go index cdf423a..31990b6 100644 --- a/sql/postgres/postgres.go +++ b/sql/postgres/postgres.go @@ -10,17 +10,28 @@ import ( _ "github.com/lib/pq" //postgres driver ) +var ( + // AcceptTableRecreationWarning indicates that you accept all consequences from dropping your table and recreating it + AcceptTableRecreationWarning = "destructive warning accepted" +) + // Options are the postgres datastore options, reexported here for convenience. type Options struct { - Host string - Port string - User string - Password string - Database string - Table string - SSLMode string - RunMigrations bool - RecreateTables bool + // must be set to "warning is accepted" + // this will prevent anyone from accidentally nuking their database + // the act of a specific string ensures that its more difficult + // to take this action + AcceptRecreateWarning string + Host string + Port string + User string + Password string + Database string + Table string + SSLMode string + RunMigrations bool + RecreateTables bool + CreateIndex bool } // Queries are the postgres queries for a given table. @@ -106,7 +117,8 @@ func (opts *Options) Create() (*sqlds.Datastore, error) { if err != nil { return nil, err } - if opts.RecreateTables { + // only recreate the tables *IF* warning is accepted + if opts.RecreateTables && opts.AcceptRecreateWarning == AcceptTableRecreationWarning { if _, err := db.Exec( "DROP TABLE IF EXISTS blocks", ); err != nil { @@ -120,6 +132,17 @@ func (opts *Options) Create() (*sqlds.Datastore, error) { return nil, multierr.Combine(err, db.Close()) } } + if opts.CreateIndex { + if _, err := db.Exec( + fmt.Sprintf( + "CREATE INDEX IF NOT EXISTS %s_key_text_pattern_ops_idx ON %s (key text_pattern_ops)", + opts.Table, opts.Table, + ), + ); err != nil { + return nil, multierr.Combine(err, db.Close()) + } + + } return sqlds.NewDatastore(db, NewQueries(opts.Table)), nil } diff --git a/sql/postgres/postgres_test.go b/sql/postgres/postgres_test.go index f772571..82cc35b 100644 --- a/sql/postgres/postgres_test.go +++ b/sql/postgres/postgres_test.go @@ -18,14 +18,16 @@ import ( func newDS(t *testing.T) (*sqlds.Datastore, func(t *testing.T)) { opts := &Options{ - Host: "127.0.0.1", - Port: "5432", - User: "postgres", - Database: "datastores", - Password: "password123", - SSLMode: "disable", - RunMigrations: true, - RecreateTables: true, + Host: "127.0.0.1", + Port: "5432", + User: "postgres", + Database: "datastores", + Password: "password123", + SSLMode: "disable", + AcceptRecreateWarning: AcceptTableRecreationWarning, + RunMigrations: true, + RecreateTables: true, + CreateIndex: true, } ds, err := opts.Create() if err != nil { From a7bb2e22094ee8788fe1b4ef6b7045c9fbd996ee Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 12 May 2020 23:02:42 -0700 Subject: [PATCH 07/12] sql: update readme --- sql/README.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/README.md b/sql/README.md index e996ff9..e5980be 100644 --- a/sql/README.md +++ b/sql/README.md @@ -13,7 +13,39 @@ ## Easy (Automatic) -The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. +The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. The following will make it easy to do this: + + +```Go +import ( + pgds "github.com/RTradeLtd/go-datastores/sql/postgres" + "github.com/ipfs/go-datastore" +) +opts := &pgds.Options{ + Host: "127.0.0.1", + Port: "5432", + User: "postgres", + Database: "datastores", + Password: "password123", + SSLMode: "disable", + Table: "blocks", + // this will dropthe existing table + AcceptRecreateWarning: pgds.AcceptTableRecreationWarning, + RunMigrations: true, + RecreateTables: true, + CreateIndex: true, +} +ds, err := opts.Create() +if err != nil { + log.Fatal(err) +} +if err := ds.Put( + datastore.NewKey("much test very wow"), + []byte("whose even going to read this anyways"), +); err != nil { + log.Fatal(err) +} +``` ## Hard (Manual) From 9dcc20f1796e49285c37e2a812ecfc96cdca2500 Mon Sep 17 00:00:00 2001 From: postables Date: Tue, 12 May 2020 23:14:27 -0700 Subject: [PATCH 08/12] update main readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index b17d061..d7c769a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # go-datastores +[![codecov](https://codecov.io/gh/RTradeLtd/go-datastores/branch/master/graph/badge.svg)](https://codecov.io/gh/RTradeLtd/go-datastores) [![Build Status](https://travis-ci.com/RTradeLtd/go-datastores.svg?branch=master)](https://travis-ci.com/RTradeLtd/go-datastores) [![RTradeLtd](https://circleci.com/gh/RTradeLtd/go-datastores.svg?style=shield)](https://app.circleci.com/pipelines/github/RTradeLtd/go-datastores) [![Go Report Card](https://goreportcard.com/badge/github.com/RTradeLtd/go-datastores)](https://goreportcard.com/report/github.com/RTradeLtd/go-datastores) + + `go-datastores` is a collection of a variety of IPFS datastores to be used by TemporalX in a single monorepo for easy maintenance. A majority of these datastores are forked from upstream repositories, with minor modifications to faciltiate easier integration with TemporalX, along with performance improvements and optimizations where possible. Additionally it allows us to pull in all datastores we need from a single repository. If you are a user of TemporalX and want to be able to use datastores that we do not yet support, you can submit a PR and we'll enable usage of the datastore within our next release From d14979ef680f40254ff40f34fa08c5e90b5799d0 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 17 May 2020 15:20:59 -0700 Subject: [PATCH 09/12] sql: implement code review changes --- sql/README.md | 6 +++--- sql/postgres/postgres.go | 15 +++++++-------- sql/postgres/postgres_test.go | 2 +- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/README.md b/sql/README.md index e5980be..9bd57d9 100644 --- a/sql/README.md +++ b/sql/README.md @@ -13,7 +13,7 @@ ## Easy (Automatic) -The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. The following will make it easy to do this: +The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. Note that this setting requires `RunMigrations` also set to true. If not the `CreateIndex` setting will be ignored. The following options can be used to enable table creation, and index creation: ```Go @@ -29,8 +29,8 @@ opts := &pgds.Options{ Password: "password123", SSLMode: "disable", Table: "blocks", - // this will dropthe existing table - AcceptRecreateWarning: pgds.AcceptTableRecreationWarning, + // this will drop the existing table + AcceptRecreateWarning: pgds.RecreateTables, RunMigrations: true, RecreateTables: true, CreateIndex: true, diff --git a/sql/postgres/postgres.go b/sql/postgres/postgres.go index 31990b6..0dad245 100644 --- a/sql/postgres/postgres.go +++ b/sql/postgres/postgres.go @@ -11,16 +11,15 @@ import ( ) var ( - // AcceptTableRecreationWarning indicates that you accept all consequences from dropping your table and recreating it - AcceptTableRecreationWarning = "destructive warning accepted" + // RecreateTables indicates that you accept all consequences from dropping your table and recreating it + RecreateTables = "recreate tables" ) // Options are the postgres datastore options, reexported here for convenience. type Options struct { - // must be set to "warning is accepted" - // this will prevent anyone from accidentally nuking their database - // the act of a specific string ensures that its more difficult - // to take this action + // AcceptRecreateWarning is used as a safety check to pevent accidental deletion of existing tables and data + // To accept the warning, you must set the value of this field to `recreate tables`, which can be done manually + // or via the usage of the public `RecreateTables` variable AcceptRecreateWarning string Host string Port string @@ -118,7 +117,7 @@ func (opts *Options) Create() (*sqlds.Datastore, error) { return nil, err } // only recreate the tables *IF* warning is accepted - if opts.RecreateTables && opts.AcceptRecreateWarning == AcceptTableRecreationWarning { + if opts.RecreateTables && opts.AcceptRecreateWarning == RecreateTables { if _, err := db.Exec( "DROP TABLE IF EXISTS blocks", ); err != nil { @@ -132,7 +131,7 @@ func (opts *Options) Create() (*sqlds.Datastore, error) { return nil, multierr.Combine(err, db.Close()) } } - if opts.CreateIndex { + if opts.CreateIndex && opts.RunMigrations { if _, err := db.Exec( fmt.Sprintf( "CREATE INDEX IF NOT EXISTS %s_key_text_pattern_ops_idx ON %s (key text_pattern_ops)", diff --git a/sql/postgres/postgres_test.go b/sql/postgres/postgres_test.go index 82cc35b..dec4f71 100644 --- a/sql/postgres/postgres_test.go +++ b/sql/postgres/postgres_test.go @@ -24,7 +24,7 @@ func newDS(t *testing.T) (*sqlds.Datastore, func(t *testing.T)) { Database: "datastores", Password: "password123", SSLMode: "disable", - AcceptRecreateWarning: AcceptTableRecreationWarning, + AcceptRecreateWarning: RecreateTables, RunMigrations: true, RecreateTables: true, CreateIndex: true, From 411f621b86c20c4199cfbded65465d77f8898142 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 17 May 2020 15:22:37 -0700 Subject: [PATCH 10/12] sql/postgres: cleanup index check --- sql/postgres/postgres.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/postgres/postgres.go b/sql/postgres/postgres.go index 0dad245..18166ac 100644 --- a/sql/postgres/postgres.go +++ b/sql/postgres/postgres.go @@ -130,17 +130,17 @@ func (opts *Options) Create() (*sqlds.Datastore, error) { ); err != nil { return nil, multierr.Combine(err, db.Close()) } - } - if opts.CreateIndex && opts.RunMigrations { - if _, err := db.Exec( - fmt.Sprintf( - "CREATE INDEX IF NOT EXISTS %s_key_text_pattern_ops_idx ON %s (key text_pattern_ops)", - opts.Table, opts.Table, - ), - ); err != nil { - return nil, multierr.Combine(err, db.Close()) - } + if opts.CreateIndex { + if _, err := db.Exec( + fmt.Sprintf( + "CREATE INDEX IF NOT EXISTS %s_key_text_pattern_ops_idx ON %s (key text_pattern_ops)", + opts.Table, opts.Table, + ), + ); err != nil { + return nil, multierr.Combine(err, db.Close()) + } + } } return sqlds.NewDatastore(db, NewQueries(opts.Table)), nil } From d5f08d33b4971826c1748d8ad8776d804ab5ebb4 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 17 May 2020 16:04:12 -0700 Subject: [PATCH 11/12] update circlci test timeout --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6e7620b..e357901 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,5 +27,5 @@ jobs: # specify any bash command here prefixed with `run: ` - run: go get -v -t -d ./... - - run: go test -v -race -short -coverprofile=coverage.txt ./... + - run: go test -v -race -short -coverprofile=coverage.txt -timeout 1800s ./... - run: bash <(curl -s https://codecov.io/bash) \ No newline at end of file From 29bfd3aaf79f969eba366c60653bc35583647a96 Mon Sep 17 00:00:00 2001 From: postables Date: Sun, 17 May 2020 19:10:04 -0700 Subject: [PATCH 12/12] sql: update readme to note table deletion mechanics --- sql/README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/README.md b/sql/README.md index 9bd57d9..4eaafac 100644 --- a/sql/README.md +++ b/sql/README.md @@ -13,7 +13,11 @@ ## Easy (Automatic) -The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. Note that this setting requires `RunMigrations` also set to true. If not the `CreateIndex` setting will be ignored. The following options can be used to enable table creation, and index creation: +The `postgres` datastore has some helper utility to enable automatic table and index creation, as well as the ability to recreate the table by dropping it, and creating it. Note that this setting requires `RunMigrations` also set to true. If not the `CreateIndex` setting will be ignored. Additionally note that we will never delete tables unless specified. + +Note that we will never delete tables automatically, and tables will only be deleted when `RecreateTables` is set to true, and `AcceptRecreateWarning` is set to the value of `RecreateTables`. Additionally you must make sure to set `RunMigrations` otherwise we wont actually create the tables, and instead just delete the existing tables. + +The following options can be used to enable table creation, and index creation: ```Go