From 862d4022e15fcfa7cccc90498872c5379574d34a Mon Sep 17 00:00:00 2001 From: postables <postables@rtradetechnologies.com> Date: Thu, 7 May 2020 13:16:26 -0700 Subject: [PATCH 1/3] cleanup git branch for leveldb stuff --- go.mod | 2 + go.sum | 29 ++++ leveldb/LICENSE.orig | 21 +++ leveldb/batch.go | 37 ++++ leveldb/datastore.go | 157 +++++++++++++++++ leveldb/ds_test.go | 386 +++++++++++++++++++++++++++++++++++++++++ leveldb/helpers.go | 58 +++++++ leveldb/transaction.go | 80 +++++++++ 8 files changed, 770 insertions(+) create mode 100644 leveldb/LICENSE.orig create mode 100644 leveldb/batch.go create mode 100644 leveldb/datastore.go create mode 100644 leveldb/ds_test.go create mode 100644 leveldb/helpers.go create mode 100644 leveldb/transaction.go diff --git a/go.mod b/go.mod index 7dd0a3a..64f3ed6 100644 --- a/go.mod +++ b/go.mod @@ -6,5 +6,7 @@ require ( github.com/cockroachdb/pebble v0.0.0-20200414215203-ce3b19e019b3 github.com/dgraph-io/badger/v2 v2.0.3 github.com/ipfs/go-datastore v0.4.4 + github.com/syndtr/goleveldb v1.0.0 github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 + go.uber.org/atomic v1.6.0 ) diff --git a/go.sum b/go.sum index 1edf17c..c06bb8c 100644 --- a/go.sum +++ b/go.sum @@ -31,12 +31,14 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= @@ -46,6 +48,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/go-datastore v0.4.4 h1:rjvQ9+muFaJ+QZ7dN5B1MSDNQ0JVZKkkES/rMZmA8X8= github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= @@ -62,6 +66,11 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -83,33 +92,53 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 h1:hNna6Fi0eP1f2sMBe/rJicDmaHmoXGe1Ta84FPYHLuE= github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5/go.mod h1:f1SCnEOt6sc3fOJfPQDRDzHOtSXuTtnz0ImG9kPRDV0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190426190305-956cc1757749 h1:Bduxdpx1O6126WsH6F6NwKywZ/FPncphlTduoPxFG78= golang.org/x/exp v0.0.0-20190426190305-956cc1757749/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/leveldb/LICENSE.orig b/leveldb/LICENSE.orig new file mode 100644 index 0000000..6152c32 --- /dev/null +++ b/leveldb/LICENSE.orig @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2016 Jeromy Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/leveldb/batch.go b/leveldb/batch.go new file mode 100644 index 0000000..8c50d2e --- /dev/null +++ b/leveldb/batch.go @@ -0,0 +1,37 @@ +package leveldb + +import ( + ds "github.com/ipfs/go-datastore" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +type leveldbBatch struct { + b *leveldb.Batch + db *leveldb.DB + syncWrites bool +} + +// Batch returns a new levelDB batcher +func (d *Datastore) Batch() (ds.Batch, error) { + return &leveldbBatch{ + b: new(leveldb.Batch), + db: d.db, + syncWrites: d.syncWrites, + }, nil +} + +func (b *leveldbBatch) Put(key ds.Key, value []byte) error { + b.b.Put(key.Bytes(), value) + return nil +} + +func (b *leveldbBatch) Commit() error { + return b.db.Write(b.b, &opt.WriteOptions{Sync: b.syncWrites}) +} + +func (b *leveldbBatch) Delete(key ds.Key) error { + b.b.Delete(key.Bytes()) + return nil +} diff --git a/leveldb/datastore.go b/leveldb/datastore.go new file mode 100644 index 0000000..1f9b99a --- /dev/null +++ b/leveldb/datastore.go @@ -0,0 +1,157 @@ +package leveldb + +import ( + "errors" + "os" + "path/filepath" + "sync" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" + "go.uber.org/atomic" +) + +var ( + _ ds.Datastore = (*Datastore)(nil) + _ ds.TxnDatastore = (*Datastore)(nil) + // ErrClosed is an error message returned when the datastore is no longer open + ErrClosed = errors.New("datastore closed") +) + +// Datastore is a go-datastore implement using leveldb +type Datastore struct { + db *leveldb.DB + path string + closed *atomic.Bool + syncWrites bool + close sync.Once + closeLock sync.RWMutex +} + +// Options is an alias of syndtr/goleveldb/opt.Options which might be extended +// in the future. +type Options = opt.Options + +// NewDatastore returns a new datastore backed by leveldb +func NewDatastore(path string, opts *Options) (*Datastore, error) { + noSync := opts.NoSync + db, err := leveldb.OpenFile(path, opts) + if err != nil { + return nil, err + } + + ds := Datastore{ + db: db, + path: path, + closed: atomic.NewBool(false), + // if noSync is false this will be true + syncWrites: !noSync, + } + return &ds, nil +} + +// Put stores a key-value pair in leveldb +func (d *Datastore) Put(key ds.Key, value []byte) (err error) { + if d.closed.Load() { + return ErrClosed + } + return d.db.Put(key.Bytes(), value, &opt.WriteOptions{Sync: d.syncWrites}) +} + +// Sync is a noop +func (d *Datastore) Sync(prefix ds.Key) error { + return nil +} + +// Get returns the value corresponding to the key +func (d *Datastore) Get(key ds.Key) (value []byte, err error) { + if d.closed.Load() { + return nil, ErrClosed + } + val, err := d.db.Get(key.Bytes(), nil) + if err != nil { + return nil, handleGetError(err) + } + return val, nil +} + +// Has returns whether or not we have the key +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + if d.closed.Load() { + return false, ErrClosed + } + return ds.GetBackedHas(d, key) +} + +// GetSize returns the size of the associated key +func (d *Datastore) GetSize(key ds.Key) (size int, err error) { + if d.closed.Load() { + return 0, ErrClosed + } + return ds.GetBackedSize(d, key) +} + +// Delete removed the key from our datastore +func (d *Datastore) Delete(key ds.Key) (err error) { + if d.closed.Load() { + return ErrClosed + } + return d.db.Delete(key.Bytes(), &opt.WriteOptions{Sync: d.syncWrites}) +} + +// Query searches for keys in our datastore +func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { + if d.closed.Load() { + return nil, ErrClosed + } + // closing is only unsafe when there are pending iterators + // so we only lock when closing, and invoking iterators (query) + d.closeLock.Lock() + var rnge *util.Range + + // make a copy of the query for the fallback naive query implementation. + // don't modify the original so res.Query() returns the correct results. + qNaive := q + prefix := ds.NewKey(q.Prefix).String() + if prefix != "/" { + rnge = util.BytesPrefix([]byte(prefix + "/")) + qNaive.Prefix = "" + } + iter := d.db.NewIterator(rnge, nil) + res, err := query(iter, q, qNaive) + d.closeLock.Unlock() + return res, err +} + +// DiskUsage returns the current disk size used by this levelDB. +// For in-mem datastores, it will return 0. +func (d *Datastore) DiskUsage() (du uint64, err error) { + if d.closed.Load() { + return 0, ErrClosed + } + err = filepath.Walk(d.path, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + du += uint64(info.Size()) + return nil + }) + return +} + +// Close shuts down leveldb +func (d *Datastore) Close() (err error) { + if d.closed.Load() { + err = ErrClosed + } + d.close.Do(func() { + d.closeLock.Lock() + err = d.db.Close() + d.closed.Store(true) + d.closeLock.Unlock() + }) + return +} diff --git a/leveldb/ds_test.go b/leveldb/ds_test.go new file mode 100644 index 0000000..66a86c8 --- /dev/null +++ b/leveldb/ds_test.go @@ -0,0 +1,386 @@ +package leveldb + +import ( + "bytes" + "fmt" + "os" + "sort" + "testing" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + dstest "github.com/ipfs/go-datastore/test" +) + +var testcases = map[string]string{ + "/a": "a", + "/a/b": "ab", + "/a/b/c": "abc", + "/a/b/d": "a/b/d", + "/a/c": "ac", + "/a/d": "ad", + "/e": "e", + "/f": "f", +} + +// returns datastore, and a function to call on exit. +// (this garbage collects). So: +// +// d, close := newDS(t) +// defer close() +func newDS(t *testing.T) (*Datastore, func()) { + path := "testpath" + opts := Options{} + opts.NoSync = true + d, err := NewDatastore(path, &opts) + if err != nil { + t.Fatal(err) + } + return d, func() { + d.Close() + os.RemoveAll(path) + } +} + +func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) { + for k, v := range testcases { + dsk := ds.NewKey(k) + if err := d.Put(dsk, []byte(v)); err != nil { + t.Fatal(err) + } + } + + for k, v := range testcases { + dsk := ds.NewKey(k) + v2, err := d.Get(dsk) + if err != nil { + t.Fatal(err) + } + if string(v2) != v { + t.Errorf("%s values differ: %s != %s", k, v, v2) + } + } + +} + +func testQuery(t *testing.T, d *Datastore) { + addTestCases(t, d, testcases) + + rs, err := d.Query(dsq.Query{Prefix: "/a/"}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/b", + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }, rs) + + // test offset and limit + + rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/b/d", + "/a/c", + }, rs) + + // test order + + rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKey{}}}) + if err != nil { + t.Fatal(err) + } + + keys := make([]string, 0, len(testcases)) + for k := range testcases { + keys = append(keys, k) + } + sort.Strings(keys) + + expectOrderedMatches(t, keys, rs) + + rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKeyDescending{}}}) + if err != nil { + t.Fatal(err) + } + + // reverse + for i, j := 0, len(keys)-1; i < j; i, j = i+1, j-1 { + keys[i], keys[j] = keys[j], keys[i] + } + + expectOrderedMatches(t, keys, rs) +} + +func TestQuery(t *testing.T) { + d, close := newDS(t) + defer close() + testQuery(t, d) +} + +func TestQueryRespectsProcess(t *testing.T) { + d, close := newDS(t) + defer close() + addTestCases(t, d, testcases) +} + +func TestCloseRace(t *testing.T) { + d, close := newDS(t) + for n := 0; n < 100; n++ { + d.Put(ds.NewKey(fmt.Sprintf("%d", n)), []byte(fmt.Sprintf("test%d", n))) + } + tx, err := d.NewTransaction(false) + if err != nil { + t.Fatal(err) + } + tx.Put(ds.NewKey("txnversion"), []byte("bump")) + closeCh := make(chan interface{}) + + go func() { + close() + closeCh <- nil + }() + for k := range testcases { + tx.Get(ds.NewKey(k)) + } + tx.Commit() + <-closeCh +} + +func TestCloseSafety(t *testing.T) { + d, close := newDS(t) + addTestCases(t, d, testcases) + + tx, _ := d.NewTransaction(false) + err := tx.Put(ds.NewKey("test"), []byte("test")) + if err != nil { + t.Error("Failed to put in a txn.") + } + close() + err = tx.Commit() + if err == nil { + t.Error("committing after close should fail.") + } + if err := d.Close(); err != ErrClosed { + t.Fatal("bad error returned, got: ", err) + } +} + +func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { + t.Helper() + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + + if len(actual) != len(expect) { + t.Error("not enough", expect, actual) + } + for _, k := range expect { + found := false + for _, e := range actual { + if e.Key == k { + found = true + } + } + if !found { + t.Error(k, "not found") + } + } +} + +func expectOrderedMatches(t *testing.T, expect []string, actualR dsq.Results) { + t.Helper() + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + + if len(actual) != len(expect) { + t.Error("not enough", expect, actual) + } + for i := range expect { + if expect[i] != actual[i].Key { + t.Errorf("expected %q, got %q", expect[i], actual[i].Key) + } + } +} + +func testBatching(t *testing.T, d *Datastore) { + b, err := d.Batch() + if err != nil { + t.Fatal(err) + } + + for k, v := range testcases { + err := b.Put(ds.NewKey(k), []byte(v)) + if err != nil { + t.Fatal(err) + } + } + + err = b.Commit() + if err != nil { + t.Fatal(err) + } + + for k, v := range testcases { + val, err := d.Get(ds.NewKey(k)) + if err != nil { + t.Fatal(err) + } + + if v != string(val) { + t.Fatal("got wrong data!") + } + } +} + +func TestBatching(t *testing.T) { + d, done := newDS(t) + defer done() + testBatching(t, d) +} + +func TestDiskUsage(t *testing.T) { + d, done := newDS(t) + addTestCases(t, d, testcases) + du, err := d.DiskUsage() + if err != nil { + t.Fatal(err) + } + + if du == 0 { + t.Fatal("expected some disk usage") + } + + k := ds.NewKey("more") + err = d.Put(k, []byte("value")) + if err != nil { + t.Fatal(err) + } + + du2, err := d.DiskUsage() + if err != nil { + t.Fatal(err) + } + if du2 <= du { + t.Fatal("size should have increased") + } + + done() + + // This should fail + _, err = d.DiskUsage() + if err == nil { + t.Fatal("DiskUsage should fail when we cannot walk path") + } +} + +func TestTransactionCommit(t *testing.T) { + key := ds.NewKey("/test/key1") + + d, done := newDS(t) + defer done() + + txn, err := d.NewTransaction(false) + if err != nil { + t.Fatal(err) + } + defer txn.Discard() + + if err := txn.Put(key, []byte("hello")); err != nil { + t.Fatal(err) + } + if val, err := d.Get(key); err != ds.ErrNotFound { + t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val) + } + if err := txn.Commit(); err != nil { + t.Fatal(err) + } + if val, err := d.Get(key); err != nil || !bytes.Equal(val, []byte("hello")) { + t.Fatalf("expected entry present after commit, got err: %v, value: %v", err, val) + } +} + +func TestTransactionDiscard(t *testing.T) { + key := ds.NewKey("/test/key1") + + d, done := newDS(t) + defer done() + + txn, err := d.NewTransaction(false) + if err != nil { + t.Fatal(err) + } + defer txn.Discard() + + if err := txn.Put(key, []byte("hello")); err != nil { + t.Fatal(err) + } + if val, err := d.Get(key); err != ds.ErrNotFound { + t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val) + } + if txn.Discard(); err != nil { + t.Fatal(err) + } + if val, err := d.Get(key); err != ds.ErrNotFound { + t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val) + } +} + +func TestTransactionManyOperations(t *testing.T) { + keys := []ds.Key{ds.NewKey("/test/key1"), ds.NewKey("/test/key2"), ds.NewKey("/test/key3"), ds.NewKey("/test/key4"), ds.NewKey("/test/key5")} + + d, done := newDS(t) + defer done() + + txn, err := d.NewTransaction(false) + if err != nil { + t.Fatal(err) + } + defer txn.Discard() + + // Insert all entries. + for i := 0; i < 5; i++ { + if err := txn.Put(keys[i], []byte(fmt.Sprintf("hello%d", i))); err != nil { + t.Fatal(err) + } + } + + // Remove the third entry. + if err := txn.Delete(keys[2]); err != nil { + t.Fatal(err) + } + + // Check existences. + if has, err := txn.Has(keys[1]); err != nil || !has { + t.Fatalf("expected key[1] to be present, err: %v, has: %v", err, has) + } + if has, err := txn.Has(keys[2]); err != nil || has { + t.Fatalf("expected key[2] to be absent, err: %v, has: %v", err, has) + } + + var res dsq.Results + if res, err = txn.Query(dsq.Query{Prefix: "/test"}); err != nil { + t.Fatalf("query failed, err: %v", err) + } + if entries, err := res.Rest(); err != nil || len(entries) != 4 { + t.Fatalf("query failed or contained unexpected number of entries, err: %v, results: %v", err, entries) + } + + txn.Discard() +} + +func TestSuite(t *testing.T) { + d, close := newDS(t) + defer close() + dstest.SubtestAll(t, d) +} diff --git a/leveldb/helpers.go b/leveldb/helpers.go new file mode 100644 index 0000000..60667b1 --- /dev/null +++ b/leveldb/helpers.go @@ -0,0 +1,58 @@ +package leveldb + +import ( + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +func query(i iterator.Iterator, q dsq.Query, qNaive dsq.Query) (dsq.Results, error) { + next := i.Next + if len(q.Orders) > 0 { + switch q.Orders[0].(type) { + case dsq.OrderByKey, *dsq.OrderByKey: + qNaive.Orders = nil + case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: + next = func() bool { + next = i.Prev + return i.Last() + } + qNaive.Orders = nil + default: + } + } + r := dsq.ResultsFromIterator(q, dsq.Iterator{ + Next: func() (dsq.Result, bool) { + if !next() { + return dsq.Result{}, false + } + k := string(i.Key()) + e := dsq.Entry{Key: k, Size: len(i.Value())} + + if !q.KeysOnly { + buf := make([]byte, len(i.Value())) + copy(buf, i.Value()) + e.Value = buf + } + return dsq.Result{Entry: e}, true + }, + Close: func() error { + i.Release() + return nil + }, + }) + return dsq.NaiveQueryApply(qNaive, r), nil +} + +// handle get error is a helper function +// to return the right error +func handleGetError(err error) error { + if err == nil { + return nil + } + if err == leveldb.ErrNotFound { + return ds.ErrNotFound + } + return err +} diff --git a/leveldb/transaction.go b/leveldb/transaction.go new file mode 100644 index 0000000..338b5db --- /dev/null +++ b/leveldb/transaction.go @@ -0,0 +1,80 @@ +package leveldb + +import ( + ds "github.com/ipfs/go-datastore" + + dsq "github.com/ipfs/go-datastore/query" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +type transaction struct { + ds *Datastore + tx *leveldb.Transaction +} + +func (t *transaction) Commit() error { + t.ds.closeLock.RLock() + err := t.tx.Commit() + t.ds.closeLock.RUnlock() + return err +} + +func (t *transaction) Discard() { + t.tx.Discard() +} + +func (t *transaction) Delete(key ds.Key) error { + err := t.tx.Delete(key.Bytes(), &opt.WriteOptions{Sync: false}) + return err +} +func (t *transaction) Get(key ds.Key) ([]byte, error) { + val, err := t.tx.Get(key.Bytes(), nil) + err = handleGetError(err) + return val, err +} + +func (t *transaction) Has(key ds.Key) (bool, error) { + return ds.GetBackedHas(t, key) +} + +func (t *transaction) GetSize(key ds.Key) (size int, err error) { + return ds.GetBackedSize(t, key) +} +func (t *transaction) Put(key ds.Key, value []byte) (err error) { + return t.tx.Put(key.Bytes(), value, &opt.WriteOptions{Sync: false}) +} + +func (t *transaction) Query(q dsq.Query) (dsq.Results, error) { + // so we only lock when closing, and invoking iterators (query & x commit) + t.ds.closeLock.Lock() + var rnge *util.Range + // make a copy of the query for the fallback naive query implementation. + // don't modify the original so res.Query() returns the correct results. + qNaive := q + prefix := ds.NewKey(q.Prefix).String() + if prefix != "/" { + rnge = util.BytesPrefix([]byte(prefix + "/")) + qNaive.Prefix = "" + } + iter := t.tx.NewIterator(rnge, nil) + res, err := query(iter, q, qNaive) + t.ds.closeLock.Unlock() + return res, err +} + +// NewTransaction returns a new transaction handler +func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) { + if d.closed.Load() { + return nil, ErrClosed + } + d.closeLock.RLock() + tx, err := d.db.OpenTransaction() + if err != nil { + return nil, err + } + txx := &transaction{d, tx} + d.closeLock.RUnlock() + return txx, nil +} From cc52fd8771c2ff427dba4a29ec7ca6d4c12ab95d Mon Sep 17 00:00:00 2001 From: postables <postables@rtradetechnologies.com> Date: Fri, 8 May 2020 17:26:36 -0700 Subject: [PATCH 2/3] leveldb: fix empty opts causing panic and add test --- leveldb/datastore.go | 5 ++++- leveldb/ds_test.go | 12 ++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/leveldb/datastore.go b/leveldb/datastore.go index 1f9b99a..f3c752a 100644 --- a/leveldb/datastore.go +++ b/leveldb/datastore.go @@ -37,7 +37,10 @@ type Options = opt.Options // NewDatastore returns a new datastore backed by leveldb func NewDatastore(path string, opts *Options) (*Datastore, error) { - noSync := opts.NoSync + var noSync bool + if opts != nil { + noSync = opts.NoSync + } db, err := leveldb.OpenFile(path, opts) if err != nil { return nil, err diff --git a/leveldb/ds_test.go b/leveldb/ds_test.go index 66a86c8..f5844c0 100644 --- a/leveldb/ds_test.go +++ b/leveldb/ds_test.go @@ -119,6 +119,18 @@ func testQuery(t *testing.T, d *Datastore) { expectOrderedMatches(t, keys, rs) } +func TestEmptyOpts(t *testing.T) { + path := "emptyoptstest" + ds, err := NewDatastore(path, nil) + if err != nil { + t.Fatal(err) + } + if err := ds.Close(); err != nil { + t.Fatal(err) + } + os.RemoveAll(path) +} + func TestQuery(t *testing.T) { d, close := newDS(t) defer close() From 4d57573bac128b7ecacaf1336993d8030734dd7d Mon Sep 17 00:00:00 2001 From: postables <postables@rtradetechnologies.com> Date: Fri, 8 May 2020 23:42:09 -0700 Subject: [PATCH 3/3] leveldb: implement code review suggestions --- leveldb/datastore.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/leveldb/datastore.go b/leveldb/datastore.go index f3c752a..c041be3 100644 --- a/leveldb/datastore.go +++ b/leveldb/datastore.go @@ -147,9 +147,7 @@ func (d *Datastore) DiskUsage() (du uint64, err error) { // Close shuts down leveldb func (d *Datastore) Close() (err error) { - if d.closed.Load() { - err = ErrClosed - } + err = ErrClosed d.close.Do(func() { d.closeLock.Lock() err = d.db.Close()