diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..6e7620b --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,31 @@ +# Golang CircleCI 2.0 configuration file +# +# Check https://circleci.com/docs/2.0/language-go/ for more details +version: 2 +jobs: + build: + docker: + # specify the version + - image: circleci/golang:1.14.2 + + # Specify service dependencies here if necessary + # CircleCI maintains a library of pre-built images + # documented at https://circleci.com/docs/2.0/circleci-images/ + - image: circleci/postgres:latest + environment: + POSTGRES_HOST_AUTH_METHOD: trust + POSTGRES_DB: datastores + + + #### TEMPLATE_NOTE: go expects specific checkout path representing url + #### expecting it in the form of + #### /go/src/github.com/circleci/go-tool + #### /go/src/bitbucket.org/circleci/go-tool + working_directory: /go/src/github.com/{{ORG_NAME}}/{{REPO_NAME}} + steps: + - checkout + + # specify any bash command here prefixed with `run: ` + - run: go get -v -t -d ./... + - run: go test -v -race -short -coverprofile=coverage.txt ./... + - run: bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 17701d9..60551f6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,9 +7,16 @@ env: global: - GO111MODULE=on +services: + - docker + +sudo: required + install: - go get honnef.co/go/tools/cmd/staticcheck - go mod download +# run this in cricleci +# - (cd sql ; make start-testenv ) before_script: - go vet ./... @@ -18,7 +25,8 @@ before_script: script: - make verifiers -- go test -v -race -short -coverprofile=coverage.txt ./... +# run this in circleci +# - go test -v -race -short -coverprofile=coverage.txt -timeout 1800s ./... after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/README.md b/README.md index 655c29e..b17d061 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,44 @@ # go-datastores -`go-datastores` is a collection of a variety of IPFS datastores to be used by TemporalX in a single monorepo for easy maintenance. If you are a user of TemporalX and want to use a datastore we dont currently support, you can submit PRs to this repository and they will be included in TemporalX's configuration. \ No newline at end of file +`go-datastores` is a collection of a variety of IPFS datastores to be used by TemporalX in a single monorepo for easy maintenance. A majority of these datastores are forked from upstream repositories, with minor modifications to faciltiate easier integration with TemporalX, along with performance improvements and optimizations where possible. Additionally it allows us to pull in all datastores we need from a single repository. + +If you are a user of TemporalX and want to be able to use datastores that we do not yet support, you can submit a PR and we'll enable usage of the datastore within our next release + +# supported datastores + +## full support + +The following datastores are marked as "fully supported" and should have no problems with usage outside of edge case bugs + +* badger +* leveldb +* sql + * includes a postgresql implementation + * note this may have unknown issues + +## partial support + +The following datastores are marked as "partially supported" and may have problems with usage, namely related to query functionality. + +* pebble + * functions supported: + * Get + * GetSize + * Delete + * Put + * Has + * problematic: + * Query +* nutsdb + * functions supported: + * Get + * GetSize + * Delete + * Put + * Has + * problematic: + * Query + +# license + +Some of the datastores were forked from upstream, and as such are licensed under the upstream licenses, any forked datastores will have a `LICENSE.orig` in their folder to indicate the corresponding upstream license. Any datastores without the `LICENSE.orig` in their folders are licensed under AGPL-v3 license, for which you can find in the `LICENSE` file in the repository root. \ No newline at end of file diff --git a/badger/LICENSE.orig b/badger/LICENSE.orig new file mode 100644 index 0000000..1e2cfe1 --- /dev/null +++ b/badger/LICENSE.orig @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2016 Ɓukasz Magiera + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/go.mod b/go.mod index 9ec5b0c..e587a78 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,9 @@ require ( github.com/cockroachdb/pebble v0.0.0-20200414215203-ce3b19e019b3 github.com/dgraph-io/badger/v2 v2.0.3 github.com/ipfs/go-datastore v0.4.4 + github.com/lib/pq v1.5.2 github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 github.com/ucwong/goleveldb v1.0.3-0.20200508074755-578cba616f37 go.uber.org/atomic v1.6.0 + go.uber.org/multierr v1.5.0 ) diff --git a/go.sum b/go.sum index 72d35c9..685921e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= @@ -47,6 +48,7 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws= github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= @@ -65,6 +67,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.5.2 h1:yTSXVswvWUOQ3k1sd7vJfDrbSl8lKuscqFJRqjC0ifw= +github.com/lib/pq v1.5.2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -81,6 +85,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -106,16 +111,23 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190426190305-956cc1757749 h1:Bduxdpx1O6126WsH6F6NwKywZ/FPncphlTduoPxFG78= golang.org/x/exp v0.0.0-20190426190305-956cc1757749/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f h1:QBjCr1Fz5kw158VqdE9JfI9cJnl/ymnJWAdMuinqL7Y= @@ -126,6 +138,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -141,14 +154,19 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= @@ -158,3 +176,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/leveldb/ds_test.go b/leveldb/ds_test.go index f5844c0..7886859 100644 --- a/leveldb/ds_test.go +++ b/leveldb/ds_test.go @@ -2,6 +2,7 @@ package leveldb import ( "bytes" + "errors" "fmt" "os" "sort" @@ -10,6 +11,7 @@ import ( ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dstest "github.com/ipfs/go-datastore/test" + "github.com/ucwong/goleveldb/leveldb" ) var testcases = map[string]string{ @@ -391,6 +393,24 @@ func TestTransactionManyOperations(t *testing.T) { txn.Discard() } +func TestHandleGetErr(t *testing.T) { + if err := handleGetError(nil); err != nil { + t.Fatal(err) + } + if err := handleGetError( + leveldb.ErrNotFound, + ); err != ds.ErrNotFound { + t.Fatal("bad error") + } + if err := handleGetError( + errors.New("misc err"), + ); err == nil { + t.Fatal("error expected") + } else if err.Error() != "misc err" { + t.Fatal("bad error returned") + } +} + func TestSuite(t *testing.T) { d, close := newDS(t) defer close() diff --git a/sql/LICENSE.orig b/sql/LICENSE.orig new file mode 100644 index 0000000..2610033 --- /dev/null +++ b/sql/LICENSE.orig @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Jeromy Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/sql/batching.go b/sql/batching.go new file mode 100644 index 0000000..4c71c3b --- /dev/null +++ b/sql/batching.go @@ -0,0 +1,63 @@ +package sqlds + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + "go.uber.org/multierr" +) + +type op struct { + delete bool + value []byte +} + +type batch struct { + ds *Datastore + ops map[ds.Key]op +} + +// Batch creates a set of deferred updates to the database. +// Since SQL does not support a true batch of updates, +// operations are buffered and then executed sequentially +// over a single connection when Commit is called. +func (d *Datastore) Batch() (ds.Batch, error) { + return &batch{ + ds: d, + ops: make(map[ds.Key]op), + }, nil +} + +func (bt *batch) Put(key ds.Key, val []byte) error { + bt.ops[key] = op{value: val} + return nil +} + +func (bt *batch) Delete(key ds.Key) error { + bt.ops[key] = op{delete: true} + return nil +} + +func (bt *batch) Commit() error { + return bt.CommitContext(context.Background()) +} + +func (bt *batch) CommitContext(ctx context.Context) error { + conn, err := bt.ds.db.Conn(ctx) + if err != nil { + return err + } + + for k, op := range bt.ops { + if op.delete { + _, err = conn.ExecContext(ctx, bt.ds.queries.Delete(), k.String()) + } else { + _, err = conn.ExecContext(ctx, bt.ds.queries.Put(), k.String(), op.value) + } + if err != nil { + break + } + } + + return multierr.Combine(err, conn.Close()) +} diff --git a/sql/datastore.go b/sql/datastore.go new file mode 100644 index 0000000..4575919 --- /dev/null +++ b/sql/datastore.go @@ -0,0 +1,200 @@ +package sqlds + +import ( + "database/sql" + "fmt" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +var ( + _ ds.Datastore = (*Datastore)(nil) + _ ds.Batching = (*Datastore)(nil) + _ ds.TxnDatastore = (*Datastore)(nil) + // ErrNotImplemented is returned when the SQL datastore does not yet implement the function call. + ErrNotImplemented = fmt.Errorf("not implemented") +) + +// Queries generates SQL queries for datastore operations. +type Queries interface { + Delete() string + Exists() string + Get() string + Put() string + Query() string + Prefix() string + Limit() string + Offset() string + GetSize() string +} + +// Datastore is a SQL backed datastore. +type Datastore struct { + db *sql.DB + queries Queries +} + +// NewDatastore returns a new SQL datastore. +func NewDatastore(db *sql.DB, queries Queries) *Datastore { + return &Datastore{db: db, queries: queries} +} + +// Close closes the underying SQL database. +func (d *Datastore) Close() error { + return d.db.Close() +} + +// Delete removes a row from the SQL database by the given key. +func (d *Datastore) Delete(key ds.Key) error { + _, err := d.db.Exec(d.queries.Delete(), key.String()) + return err +} + +// Get retrieves a value from the SQL database by the given key. +func (d *Datastore) Get(key ds.Key) (value []byte, err error) { + var out []byte + row := d.db.QueryRow(d.queries.Get(), key.String()) + + switch err := row.Scan(&out); err { + case sql.ErrNoRows: + return nil, ds.ErrNotFound + case nil: + return out, nil + default: + return nil, err + } +} + +// Has determines if a value for the given key exists in the SQL database. +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + row := d.db.QueryRow(d.queries.Exists(), key.String()) + + switch err := row.Scan(&exists); err { + case sql.ErrNoRows: + return exists, nil + case nil: + return exists, nil + default: + return exists, err + } +} + +// Put "upserts" a row into the SQL database. +func (d *Datastore) Put(key ds.Key, value []byte) error { + _, err := d.db.Exec(d.queries.Put(), key.String(), value) + return err +} + +// Query returns multiple rows from the SQL database based on the passed query parameters. +func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { + raw, err := d.rawQuery(q) + if err != nil { + return nil, err + } + + for _, f := range q.Filters { + raw = dsq.NaiveFilter(raw, f) + } + + raw = dsq.NaiveOrder(raw, q.Orders...) + + // if we have filters or orders, offset and limit won't have been applied in the query + if len(q.Filters) > 0 || len(q.Orders) > 0 { + if q.Offset != 0 { + raw = dsq.NaiveOffset(raw, q.Offset) + } + if q.Limit != 0 { + raw = dsq.NaiveLimit(raw, q.Limit) + } + } + + return raw, nil +} + +func (d *Datastore) rawQuery(q dsq.Query) (dsq.Results, error) { + var rows *sql.Rows + var err error + + rows, err = queryWithParams(d, q) + if err != nil { + return nil, err + } + + it := dsq.Iterator{ + Next: func() (dsq.Result, bool) { + if !rows.Next() { + return dsq.Result{}, false + } + + var key string + var out []byte + + err := rows.Scan(&key, &out) + if err != nil { + return dsq.Result{Error: err}, false + } + + entry := dsq.Entry{Key: key} + + if !q.KeysOnly { + entry.Value = out + } + if q.ReturnsSizes { + entry.Size = len(out) + } + + return dsq.Result{Entry: entry}, true + }, + Close: func() error { + return rows.Close() + }, + } + + return dsq.ResultsFromIterator(q, it), nil +} + +// Sync is noop for SQL databases. +func (d *Datastore) Sync(key ds.Key) error { + return nil +} + +// GetSize determines the size in bytes of the value for a given key. +func (d *Datastore) GetSize(key ds.Key) (int, error) { + var size int + row := d.db.QueryRow(d.queries.GetSize(), key.String()) + + switch err := row.Scan(&size); err { + case sql.ErrNoRows: + return -1, ds.ErrNotFound + case nil: + return size, nil + default: + return 0, err + } +} + +// queryWithParams applies prefix, limit, and offset params in pg query +func queryWithParams(d *Datastore, q dsq.Query) (*sql.Rows, error) { + var qNew = d.queries.Query() + + if q.Prefix != "" { + // normalize + prefix := ds.NewKey(q.Prefix).String() + if prefix != "/" { + qNew += fmt.Sprintf(d.queries.Prefix(), prefix+"/") + } + } + + // only apply limit and offset if we do not have to naive filter/order the results + if len(q.Filters) == 0 && len(q.Orders) == 0 { + if q.Limit != 0 { + qNew += fmt.Sprintf(d.queries.Limit(), q.Limit) + } + if q.Offset != 0 { + qNew += fmt.Sprintf(d.queries.Offset(), q.Offset) + } + } + + return d.db.Query(qNew) +} diff --git a/sql/postgres/postgres.go b/sql/postgres/postgres.go new file mode 100644 index 0000000..cdf423a --- /dev/null +++ b/sql/postgres/postgres.go @@ -0,0 +1,149 @@ +package postgres + +import ( + "database/sql" + "fmt" + + sqlds "github.com/RTradeLtd/go-datastores/sql" + "go.uber.org/multierr" + + _ "github.com/lib/pq" //postgres driver +) + +// Options are the postgres datastore options, reexported here for convenience. +type Options struct { + Host string + Port string + User string + Password string + Database string + Table string + SSLMode string + RunMigrations bool + RecreateTables bool +} + +// Queries are the postgres queries for a given table. +type Queries struct { + deleteQuery string + existsQuery string + getQuery string + putQuery string + queryQuery string + prefixQuery string + limitQuery string + offsetQuery string + getSizeQuery string +} + +// NewQueries creates a new PostgreSQL set of queries for the passed table +func NewQueries(tbl string) Queries { + return Queries{ + deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), + existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), + getQuery: fmt.Sprintf("SELECT data FROM %s WHERE key = $1", tbl), + putQuery: fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", tbl), + queryQuery: fmt.Sprintf("SELECT key, data FROM %s", tbl), + prefixQuery: ` WHERE key LIKE '%s%%' ORDER BY key`, + limitQuery: ` LIMIT %d`, + offsetQuery: ` OFFSET %d`, + getSizeQuery: fmt.Sprintf("SELECT octet_length(data) FROM %s WHERE key = $1", tbl), + } +} + +// Delete returns the postgres query for deleting a row. +func (q Queries) Delete() string { + return q.deleteQuery +} + +// Exists returns the postgres query for determining if a row exists. +func (q Queries) Exists() string { + return q.existsQuery +} + +// Get returns the postgres query for getting a row. +func (q Queries) Get() string { + return q.getQuery +} + +// Put returns the postgres query for putting a row. +func (q Queries) Put() string { + return q.putQuery +} + +// Query returns the postgres query for getting multiple rows. +func (q Queries) Query() string { + return q.queryQuery +} + +// Prefix returns the postgres query fragment for getting a rows with a key prefix. +func (q Queries) Prefix() string { + return q.prefixQuery +} + +// Limit returns the postgres query fragment for limiting results. +func (q Queries) Limit() string { + return q.limitQuery +} + +// Offset returns the postgres query fragment for returning rows from a given offset. +func (q Queries) Offset() string { + return q.offsetQuery +} + +// GetSize returns the postgres query for determining the size of a value. +func (q Queries) GetSize() string { + return q.getSizeQuery +} + +// Create returns a datastore connected to postgres +func (opts *Options) Create() (*sqlds.Datastore, error) { + opts.setDefaults() + db, err := sql.Open("postgres", fmt.Sprintf( + "host=%s port=%s user=%s dbname=%s password=%s sslmode=%s", + opts.Host, opts.Port, opts.User, opts.Database, opts.Password, opts.SSLMode, + )) + if err != nil { + return nil, err + } + if opts.RecreateTables { + if _, err := db.Exec( + "DROP TABLE IF EXISTS blocks", + ); err != nil { + return nil, err + } + } + if opts.RunMigrations { + if _, err := db.Exec( + "CREATE TABLE IF NOT EXISTS blocks (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL)", + ); err != nil { + return nil, multierr.Combine(err, db.Close()) + } + } + return sqlds.NewDatastore(db, NewQueries(opts.Table)), nil +} + +func (opts *Options) setDefaults() { + if opts.Host == "" { + opts.Host = "127.0.0.1" + } + + if opts.Port == "" { + opts.Port = "5432" + } + + if opts.User == "" { + opts.User = "postgres" + } + + if opts.Database == "" { + opts.Database = "datastore" + } + + if opts.Table == "" { + opts.Table = "blocks" + } + if opts.SSLMode == "" { + opts.SSLMode = "disable" + } +} diff --git a/sql/postgres/postgres_test.go b/sql/postgres/postgres_test.go new file mode 100644 index 0000000..c59e53f --- /dev/null +++ b/sql/postgres/postgres_test.go @@ -0,0 +1,879 @@ +package postgres + +import ( + "bytes" + "crypto/rand" + "fmt" + "sort" + "strings" + "testing" + + dstest "github.com/ipfs/go-datastore/test" + + sqlds "github.com/RTradeLtd/go-datastores/sql" + "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +var testcases = map[string]string{ + "/a": "a", + "/a/b": "ab", + "/a/b/c": "abc", + "/a/b/d": "a/b/d", + "/a/c": "ac", + "/a/d": "ad", + "/e": "e", + "/f": "f", + "/g": "", +} + +func newDS(t *testing.T) (*sqlds.Datastore, func(t *testing.T)) { + opts := &Options{ + Host: "127.0.0.1", + Port: "5432", + User: "postgres", + Database: "datastores", + Password: "password123", + SSLMode: "disable", + RunMigrations: true, + RecreateTables: true, + } + ds, err := opts.Create() + if err != nil { + t.Fatal(err) + } + return ds, func(t *testing.T) { + if err := ds.Close(); err != nil { + t.Fatal(err) + } + } +} + +func addTestCases(t *testing.T, d *sqlds.Datastore, testcases map[string]string) { + for k, v := range testcases { + dsk := datastore.NewKey(k) + if err := d.Put(dsk, []byte(v)); err != nil { + t.Fatal(err) + } + } + + for k, v := range testcases { + dsk := datastore.NewKey(k) + v2, err := d.Get(dsk) + if err != nil { + t.Fatal(err) + } + v2b := v2 + if string(v2b) != v { + t.Errorf("%s values differ: %s != %s", k, v, v2) + } + } +} + +func TestPostgres_Queries(t *testing.T) { + tableName := "querytabletest" + queries := NewQueries(tableName) + if !strings.Contains(queries.deleteQuery, tableName) { + t.Fatal("bad query") + } + if !strings.Contains(queries.existsQuery, tableName) { + t.Fatal("bad query") + } + if !strings.Contains(queries.getQuery, tableName) { + t.Fatal("bad query") + } + if !strings.Contains(queries.putQuery, tableName) { + t.Fatal("bad query") + } + if !strings.Contains(queries.queryQuery, tableName) { + t.Fatal("bad query") + } + if !strings.Contains(queries.prefixQuery, "WHERE key LIKE") { + t.Fatal("bad query") + } + if !strings.Contains(queries.limitQuery, "LIMIT") { + t.Fatal("bad query") + } + if !strings.Contains(queries.offsetQuery, "OFFSET") { + t.Fatal("bad query") + } + if !strings.Contains(queries.getSizeQuery, tableName) { + t.Fatal("bad query") + } + if queries.Delete() != queries.deleteQuery { + t.Fatal("bad query returned") + } + if queries.Exists() != queries.existsQuery { + t.Fatal("bad query returned") + } + if queries.Get() != queries.getQuery { + t.Fatal("bad query returned") + } + if queries.Put() != queries.putQuery { + t.Fatal("bad query returned") + } + if queries.Query() != queries.queryQuery { + t.Fatal("bad query returned") + } + if queries.Prefix() != queries.prefixQuery { + t.Fatal("bad query returned") + } + if queries.Limit() != queries.limitQuery { + t.Fatal("bad query returned") + } + if queries.Offset() != queries.offsetQuery { + t.Fatal("bad query returned") + } + if queries.GetSize() != queries.getSizeQuery { + t.Fatal("bad query returned") + } +} + +func TestSetDefaultOptions(t *testing.T) { + opts := &Options{} + opts.setDefaults() + if opts.Host != "127.0.0.1" { + t.Fatal("bad host") + } + if opts.Port != "5432" { + t.Fatal("bad ports") + } + if opts.User != "postgres" { + t.Fatal("bad user") + } + if opts.Database != "datastore" { + t.Fatal("bad database") + } + if opts.Table != "blocks" { + t.Fatal("bad table") + } + if opts.SSLMode != "disable" { + t.Fatal("badd sslmode") + } + if opts.RunMigrations { + t.Fatal("run migrations should be false") + } +} + +func TestQuery(t *testing.T) { + d, done := newDS(t) + defer done(t) + + addTestCases(t, d, testcases) + + // test prefix + rs, err := d.Query(dsq.Query{Prefix: "/a/"}) + if err != nil { + t.Fatal(err) + } + expectMatches(t, []string{ + "/a/b", + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }, rs) + + // test offset and limit + rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2}) + if err != nil { + t.Fatal(err) + } + expectMatches(t, []string{ + "/a/b/d", + "/a/c", + }, rs) + + // test orders + orbk := dsq.OrderByKey{} + orderByKey := []dsq.Order{orbk} + rs, err = d.Query(dsq.Query{Prefix: "/a/", Orders: orderByKey}) + if err != nil { + t.Fatal(err) + } + expectKeyOrderMatches(t, rs, []string{ + "/a/b", + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }) + + orbkd := dsq.OrderByKeyDescending{} + orderByDesc := []dsq.Order{orbkd} + rs, err = d.Query(dsq.Query{Prefix: "/a/", Orders: orderByDesc}) + if err != nil { + t.Fatal(err) + } + expectKeyOrderMatches(t, rs, []string{ + "/a/d", + "/a/c", + "/a/b/d", + "/a/b/c", + "/a/b", + }) + + // test filters + equalFilter := dsq.FilterKeyCompare{Op: dsq.Equal, Key: "/a/b"} + equalFilters := []dsq.Filter{equalFilter} + rs, err = d.Query(dsq.Query{Prefix: "/a/", Filters: equalFilters}) + if err != nil { + t.Fatal(err) + } + expectKeyFilterMatches(t, rs, []string{"/a/b"}) + + greaterThanFilter := dsq.FilterKeyCompare{Op: dsq.GreaterThan, Key: "/a/b"} + greaterThanFilters := []dsq.Filter{greaterThanFilter} + rs, err = d.Query(dsq.Query{Prefix: "/a/", Filters: greaterThanFilters}) + if err != nil { + t.Fatal(err) + } + expectKeyFilterMatches(t, rs, []string{ + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }) + + lessThanFilter := dsq.FilterKeyCompare{Op: dsq.LessThanOrEqual, Key: "/a/b/c"} + lessThanFilters := []dsq.Filter{lessThanFilter} + rs, err = d.Query(dsq.Query{Prefix: "/a/", Filters: lessThanFilters}) + if err != nil { + t.Fatal(err) + } + expectKeyFilterMatches(t, rs, []string{ + "/a/b", + "/a/b/c", + }) +} + +func TestHas(t *testing.T) { + d, done := newDS(t) + defer done(t) + addTestCases(t, d, testcases) + + has, err := d.Has(datastore.NewKey("/a/b/c")) + if err != nil { + t.Error(err) + } + + if !has { + t.Error("Key should be found") + } + + has, err = d.Has(datastore.NewKey("/a/b/c/d")) + if err != nil { + t.Error(err) + } + + if has { + t.Error("Key should not be found") + } +} + +func TestNotExistGet(t *testing.T) { + d, done := newDS(t) + defer done(t) + addTestCases(t, d, testcases) + + has, err := d.Has(datastore.NewKey("/a/b/c/d")) + if err != nil { + t.Error(err) + } + + if has { + t.Error("Key should not be found") + } + + val, err := d.Get(datastore.NewKey("/a/b/c/d")) + if val != nil { + t.Error("Key should not be found") + } + + if err != datastore.ErrNotFound { + t.Error("Error was not set to datastore.ErrNotFound") + if err != nil { + t.Error(err) + } + } +} + +func TestDelete(t *testing.T) { + d, done := newDS(t) + defer done(t) + addTestCases(t, d, testcases) + + has, err := d.Has(datastore.NewKey("/a/b/c")) + if err != nil { + t.Error(err) + } + if !has { + t.Error("Key should be found") + } + + err = d.Delete(datastore.NewKey("/a/b/c")) + if err != nil { + t.Error(err) + } + + has, err = d.Has(datastore.NewKey("/a/b/c")) + if err != nil { + t.Error(err) + } + if has { + t.Error("Key should not be found") + } +} + +func TestGetEmpty(t *testing.T) { + d, done := newDS(t) + defer done(t) + + err := d.Put(datastore.NewKey("/a"), []byte{}) + if err != nil { + t.Error(err) + } + + v, err := d.Get(datastore.NewKey("/a")) + if err != nil { + t.Error(err) + } + + if len(v) != 0 { + t.Error("expected 0 len []byte form get") + } +} + +func TestBatching(t *testing.T) { + d, done := newDS(t) + defer done(t) + + b, err := d.Batch() + if err != nil { + t.Fatal(err) + } + + for k, v := range testcases { + err := b.Put(datastore.NewKey(k), []byte(v)) + if err != nil { + t.Fatal(err) + } + } + + err = b.Commit() + if err != nil { + t.Fatal(err) + } + + for k, v := range testcases { + val, err := d.Get(datastore.NewKey(k)) + if err != nil { + t.Fatal(err) + } + + if v != string(val) { + t.Fatal("got wrong data!") + } + } + + //Test delete + b, err = d.Batch() + if err != nil { + t.Fatal(err) + } + + err = b.Delete(datastore.NewKey("/a/b")) + if err != nil { + t.Fatal(err) + } + + err = b.Delete(datastore.NewKey("/a/b/c")) + if err != nil { + t.Fatal(err) + } + + err = b.Commit() + if err != nil { + t.Fatal(err) + } + + rs, err := d.Query(dsq.Query{Prefix: "/"}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a", + "/a/b/d", + "/a/c", + "/a/d", + "/e", + "/f", + "/g", + }, rs) +} + +func SubtestBasicPutGet(t *testing.T) { + d, done := newDS(t) + defer done(t) + + k := datastore.NewKey("foo") + val := []byte("Hello Datastore!") + + err := d.Put(k, val) + if err != nil { + t.Fatal("error putting to datastore: ", err) + } + + have, err := d.Has(k) + if err != nil { + t.Fatal("error calling has on key we just put: ", err) + } + + if !have { + t.Fatal("should have key foo, has returned false") + } + + size, err := d.GetSize(k) + if err != nil { + t.Fatal("error getting size after put: ", err) + } + if size != len(val) { + t.Fatalf("incorrect size: expected %d, got %d", len(val), size) + } + + out, err := d.Get(k) + if err != nil { + t.Fatal("error getting value after put: ", err) + } + + if !bytes.Equal(out, val) { + t.Fatal("value received on get wasnt what we expected:", out) + } + + have, err = d.Has(k) + if err != nil { + t.Fatal("error calling has after get: ", err) + } + + if !have { + t.Fatal("should have key foo, has returned false") + } + + size, err = d.GetSize(k) + if err != nil { + t.Fatal("error getting size after get: ", err) + } + if size != len(val) { + t.Fatalf("incorrect size: expected %d, got %d", len(val), size) + } + + err = d.Delete(k) + if err != nil { + t.Fatal("error calling delete: ", err) + } + + have, err = d.Has(k) + if err != nil { + t.Fatal("error calling has after delete: ", err) + } + + if have { + t.Fatal("should not have key foo, has returned true") + } + + size, err = d.GetSize(k) + switch err { + case datastore.ErrNotFound: + case nil: + t.Fatal("expected error getting size after delete") + default: + t.Fatal("wrong error getting size after delete: ", err) + } + if size != -1 { + t.Fatal("expected missing size to be -1") + } +} + +func TestNotFounds(t *testing.T) { + d, done := newDS(t) + defer done(t) + + badk := datastore.NewKey("notreal") + + val, err := d.Get(badk) + if err != datastore.ErrNotFound { + t.Fatal("expected ErrNotFound for key that doesnt exist, got: ", err) + } + + if val != nil { + t.Fatal("get should always return nil for not found values") + } + + have, err := d.Has(badk) + if err != nil { + t.Fatal("error calling has on not found key: ", err) + } + if have { + t.Fatal("has returned true for key we don't have") + } + + size, err := d.GetSize(badk) + switch err { + case datastore.ErrNotFound: + case nil: + t.Fatal("expected error getting size after delete") + default: + t.Fatal("wrong error getting size after delete: ", err) + } + if size != -1 { + t.Fatal("expected missing size to be -1") + } +} + +func SubtestManyKeysAndQuery(t *testing.T) { + d, done := newDS(t) + defer done(t) + + var keys []datastore.Key + var keystrs []string + var values [][]byte + count := 100 + for i := 0; i < count; i++ { + s := fmt.Sprintf("%dkey%d", i, i) + dsk := datastore.NewKey(s) + keystrs = append(keystrs, dsk.String()) + keys = append(keys, dsk) + buf := make([]byte, 64) + _, _ = rand.Read(buf) + values = append(values, buf) + } + + t.Logf("putting %d values", count) + for i, k := range keys { + err := d.Put(k, values[i]) + if err != nil { + t.Fatalf("error on put[%d]: %s", i, err) + } + } + + t.Log("getting values back") + for i, k := range keys { + val, err := d.Get(k) + if err != nil { + t.Fatalf("error on get[%d]: %s", i, err) + } + + if !bytes.Equal(val, values[i]) { + t.Fatal("input value didnt match the one returned from Get") + } + } + + t.Log("querying values") + q := dsq.Query{KeysOnly: true} + resp, err := d.Query(q) + if err != nil { + t.Fatal("calling query: ", err) + } + + t.Log("aggregating query results") + var outkeys []string + for { + res, ok := resp.NextSync() + if res.Error != nil { + t.Fatal("query result error: ", res.Error) + } + if !ok { + break + } + + outkeys = append(outkeys, res.Key) + } + + t.Log("verifying query output") + sort.Strings(keystrs) + sort.Strings(outkeys) + + if len(keystrs) != len(outkeys) { + t.Fatal("got wrong number of keys back") + } + + for i, s := range keystrs { + if outkeys[i] != s { + t.Fatalf("in key output, got %s but expected %s", outkeys[i], s) + } + } + + t.Log("deleting all keys") + for _, k := range keys { + if err := d.Delete(k); err != nil { + t.Fatal(err) + } + } +} + +// Tests from basic_tests from go-datastore +func TestBasicPutGet(t *testing.T) { + d, done := newDS(t) + defer done(t) + + k := datastore.NewKey("foo") + val := []byte("Hello Datastore!") + + err := d.Put(k, val) + if err != nil { + t.Fatal("error putting to datastore: ", err) + } + + have, err := d.Has(k) + if err != nil { + t.Fatal("error calling has on key we just put: ", err) + } + + if !have { + t.Fatal("should have key foo, has returned false") + } + + out, err := d.Get(k) + if err != nil { + t.Fatal("error getting value after put: ", err) + } + + if !bytes.Equal(out, val) { + t.Fatal("value received on get wasnt what we expected:", out) + } + + have, err = d.Has(k) + if err != nil { + t.Fatal("error calling has after get: ", err) + } + + if !have { + t.Fatal("should have key foo, has returned false") + } + + err = d.Delete(k) + if err != nil { + t.Fatal("error calling delete: ", err) + } + + have, err = d.Has(k) + if err != nil { + t.Fatal("error calling has after delete: ", err) + } + + if have { + t.Fatal("should not have key foo, has returned true") + } + SubtestBasicPutGet(t) +} + +func TestManyKeysAndQuery(t *testing.T) { + d, done := newDS(t) + defer done(t) + + var keys []datastore.Key + var keystrs []string + var values [][]byte + count := 100 + for i := 0; i < count; i++ { + s := fmt.Sprintf("%dkey%d", i, i) + dsk := datastore.NewKey(s) + keystrs = append(keystrs, dsk.String()) + keys = append(keys, dsk) + buf := make([]byte, 64) + _, _ = rand.Read(buf) + values = append(values, buf) + } + + t.Logf("putting %d values", count) + for i, k := range keys { + err := d.Put(k, values[i]) + if err != nil { + t.Fatalf("error on put[%d]: %s", i, err) + } + } + + t.Log("getting values back") + for i, k := range keys { + val, err := d.Get(k) + if err != nil { + t.Fatalf("error on get[%d]: %s", i, err) + } + + if !bytes.Equal(val, values[i]) { + t.Fatal("input value didnt match the one returned from Get") + } + } + + t.Log("querying values") + q := dsq.Query{KeysOnly: true} + resp, err := d.Query(q) + if err != nil { + t.Fatal("calling query: ", err) + } + + t.Log("aggregating query results") + var outkeys []string + for { + res, ok := resp.NextSync() + if res.Error != nil { + t.Fatal("query result error: ", res.Error) + } + if !ok { + break + } + + outkeys = append(outkeys, res.Key) + } + + t.Log("verifying query output") + sort.Strings(keystrs) + sort.Strings(outkeys) + + if len(keystrs) != len(outkeys) { + t.Fatalf("got wrong number of keys back, %d != %d", len(keystrs), len(outkeys)) + } + + for i, s := range keystrs { + if outkeys[i] != s { + t.Fatalf("in key output, got %s but expected %s", outkeys[i], s) + } + } + + t.Log("deleting all keys") + for _, k := range keys { + if err := d.Delete(k); err != nil { + t.Fatal(err) + } + } + + SubtestManyKeysAndQuery(t) +} + +func TestTxn(t *testing.T) { + d, done := newDS(t) + defer done(t) + txn, err := d.NewTransaction(false) + if err != nil { + t.Fatal(err) + } + testKey := datastore.NewKey("helloworld") + failKey := datastore.NewKey("donothave") + testValue := []byte("hello world") + if err := txn.Put(testKey, testValue); err != nil { + t.Fatal(err) + } + if err := txn.Delete(testKey); err != nil { + t.Fatal(err) + } + if err := txn.Put(testKey, testValue); err != nil { + t.Fatal(err) + } + if _, err := txn.Query(dsq.Query{}); err == nil { + t.Fatal("error expected") + } + if size, err := txn.GetSize(testKey); err != nil { + t.Fatal(err) + } else if size != len(testValue) { + t.Fatalf("bad size, got %v, wanted %v", size, len(testValue)) + } + if has, err := txn.Has(testKey); err != nil { + t.Fatal(err) + } else if !has { + t.Fatal("should have key") + } + if has, err := txn.Has(failKey); err != nil { + t.Fatal(err) + } else if has { + t.Fatal("should not have key") + } + if val, err := txn.Get(testKey); err != nil { + t.Fatal(err) + } else if string(val) != string(testValue) { + t.Fatal("bad value returned") + } + if _, err := txn.Get(failKey); err != datastore.ErrNotFound { + t.Fatal("bad error returned") + } + if err := txn.Commit(); err != nil { + t.Fatal(err) + } + if err := txn.Commit(); err == nil { + t.Fatal("error expected") + } + txn.Discard() +} + +func TestSuite(t *testing.T) { + d, done := newDS(t) + defer done(t) + dstest.SubtestAll(t, d) +} + +func expectKeyFilterMatches(t *testing.T, actual dsq.Results, expect []string) { + t.Helper() + actualE, err := actual.Rest() + if err != nil { + t.Error(err) + return + } + actualS := make([]string, len(actualE)) + for i, e := range actualE { + actualS[i] = e.Key + } + + if len(actualS) != len(expect) { + t.Error("length doesn't match.", expect, actualS) + return + } + + if strings.Join(actualS, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actualS) + return + } +} + +func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { + t.Helper() + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + + if len(actual) != len(expect) { + t.Error("not enough", expect, actual) + } + for _, k := range expect { + found := false + for _, e := range actual { + if e.Key == k { + found = true + } + } + if !found { + t.Error(k, "not found") + } + } +} + +func expectKeyOrderMatches(t *testing.T, actual dsq.Results, expect []string) { + t.Helper() + rs, err := actual.Rest() + if err != nil { + t.Error("error fetching dsq.Results", expect, actual) + return + } + + if len(rs) != len(expect) { + t.Error("expect != actual.", expect, actual) + return + } + + for i, r := range rs { + if r.Key != expect[i] { + t.Error("expect != actual.", expect, actual) + return + } + } +} diff --git a/sql/transaction.go b/sql/transaction.go new file mode 100644 index 0000000..22e437d --- /dev/null +++ b/sql/transaction.go @@ -0,0 +1,112 @@ +package sqlds + +import ( + "database/sql" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + "go.uber.org/multierr" +) + +type txn struct { + db *sql.DB + queries Queries + txn *sql.Tx +} + +// NewTransaction creates a new database transaction, note the readOnly parameter is ignored by this implementation. +func (ds *Datastore) NewTransaction(_ bool) (ds.Txn, error) { + sqlTxn, err := ds.db.Begin() + if err != nil { + if sqlTxn != nil { + // wrap errors + err = multierr.Combine(err, sqlTxn.Rollback()) + } + return nil, err + } + + return &txn{ + db: ds.db, + queries: ds.queries, + txn: sqlTxn, + }, nil +} + +func (t *txn) Get(key ds.Key) ([]byte, error) { + var out []byte + row := t.txn.QueryRow(t.queries.Get(), key.String()) + + switch err := row.Scan(&out); err { + case sql.ErrNoRows: + return nil, ds.ErrNotFound + case nil: + return out, nil + default: + return nil, err + } +} + +func (t *txn) Has(key ds.Key) (bool, error) { + var exists bool + row := t.txn.QueryRow(t.queries.Exists(), key.String()) + + switch err := row.Scan(&exists); err { + case sql.ErrNoRows: + return exists, nil + case nil: + return exists, nil + default: + return exists, err + } +} + +func (t *txn) GetSize(key ds.Key) (int, error) { + var size int + row := t.txn.QueryRow(t.queries.GetSize(), key.String()) + + switch err := row.Scan(&size); err { + case sql.ErrNoRows: + return -1, ds.ErrNotFound + case nil: + return size, nil + default: + return 0, err + } +} + +func (t *txn) Query(q dsq.Query) (dsq.Results, error) { + return nil, ErrNotImplemented +} + +// Put adds a value to the datastore identified by the given key. +func (t *txn) Put(key ds.Key, val []byte) error { + _, err := t.txn.Exec(t.queries.Put(), key.String(), val) + if err != nil { + err = multierr.Combine(err, t.txn.Rollback()) + } + return err +} + +// Delete removes a value from the datastore that matches the given key. +func (t *txn) Delete(key ds.Key) error { + _, err := t.txn.Exec(t.queries.Delete(), key.String()) + if err != nil { + err = multierr.Combine(err, t.txn.Rollback()) + } + return err +} + +// Commit finalizes a transaction. +func (t *txn) Commit() error { + err := t.txn.Commit() + if err != nil { + err = multierr.Combine(err, t.txn.Rollback()) + } + return err +} + +// Discard throws away changes recorded in a transaction without committing +// them to the underlying Datastore. +func (t *txn) Discard() { + _ = t.txn.Rollback() +}