Skip to content

Commit abd8d2d

Browse files
author
Rene Kaufmann
committed
add nats kv support
1 parent c9cc147 commit abd8d2d

File tree

6 files changed

+298
-6
lines changed

6 files changed

+298
-6
lines changed

README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ type ReadWatcher interface {
2222

2323
## Compatibility matrix
2424

25-
| Calls | Consul | Etcdv2 | Etcdv3 | env | file | redis | vault | zookeeper |
26-
|-----------------------|:----------:|:------:|:-------:|:-----:|:----:|:-------:|:-------:|:----------:|
27-
| GetValues | X | X | X | X | X | X | X | X |
28-
| WatchPrefix | X | X | X | | X | | | X |
29-
| Close | X | X | X | X | X | X | X | X |
25+
| Calls | Consul | Etcdv2 | Etcdv3 | env | file | redis | vault | zookeeper | nats kv |
26+
|-----------------------|:----------:|:------:|:-------:|:-----:|:----:|:-------:|:-------:|:----------:|:-------:|
27+
| GetValues | X | X | X | X | X | X | X | X | X |
28+
| WatchPrefix | X | X | X | | X | | | X | X |
29+
| Close | X | X | X | X | X | X | X | X | X |

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/mattn/go-colorable v0.1.11 // indirect
2323
github.com/mitchellh/copystructure v1.2.0 // indirect
2424
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
25+
github.com/nats-io/nats.go v1.13.0 // indirect
2526
github.com/oklog/run v1.1.0 // indirect
2627
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
2728
github.com/tevino/go-zookeeper v0.0.0-20170512024026-c218ec636bef
@@ -30,7 +31,7 @@ require (
3031
go.etcd.io/etcd/client/v3 v3.5.2
3132
go.uber.org/multierr v1.7.0 // indirect
3233
go.uber.org/zap v1.21.0 // indirect
33-
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
34+
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
3435
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
3536
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
3637
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect

go.sum

+12
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9
265265
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
266266
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
267267
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
268+
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
269+
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
270+
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
271+
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
272+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
273+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
268274
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
269275
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
270276
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
@@ -355,9 +361,12 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
355361
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
356362
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
357363
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
364+
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
358365
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
359366
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
360367
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
368+
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE=
369+
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
361370
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
362371
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
363372
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -386,6 +395,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
386395
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
387396
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
388397
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
398+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
389399
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
390400
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
391401
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -425,6 +435,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
425435
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
426436
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
427437
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
438+
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
428439
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
429440
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
430441
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -439,6 +450,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
439450
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
440451
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
441452
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
453+
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
442454
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
443455
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
444456
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

nats/client.go

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* This file is part of easyKV.
3+
* © 2022 The easyKV Authors
4+
*
5+
* For the full copyright and license information, please view the LICENSE
6+
* file that was distributed with this source code.
7+
*/
8+
9+
package nats
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"strings"
15+
16+
"github.com/HeavyHorst/easykv"
17+
"github.com/nats-io/nats.go"
18+
)
19+
20+
var cleanReplacer = strings.NewReplacer(".", "/")
21+
22+
// Client provides a shell for the env client
23+
type Client struct {
24+
nc *nats.Conn
25+
kv nats.KeyValue
26+
}
27+
28+
// New returns a new client
29+
func New(nodes []string, bucket string, opts ...Option) (*Client, error) {
30+
var options Options
31+
for _, o := range opts {
32+
o(&options)
33+
}
34+
35+
if len(nodes) == 0 {
36+
nodes = append(nodes, nats.DefaultURL)
37+
}
38+
39+
natsOptions := []nats.Option{nats.MaxReconnects(-1)}
40+
41+
// override authentication, if any was specified
42+
if options.Auth.Username != "" && options.Auth.Password != "" {
43+
natsOptions = append(natsOptions, nats.UserInfo(options.Auth.Username, options.Auth.Password))
44+
}
45+
46+
if options.Token != "" {
47+
natsOptions = append(natsOptions, nats.Token(options.Token))
48+
}
49+
50+
if options.Creds != "" {
51+
natsOptions = append(natsOptions, nats.UserCredentials(options.Creds))
52+
}
53+
54+
nc, err := nats.Connect(strings.Join(nodes, ","), natsOptions...)
55+
if err != nil {
56+
return nil, fmt.Errorf("could't connect to nats: %w", err)
57+
}
58+
59+
js, err := nc.JetStream()
60+
if err != nil {
61+
return nil, fmt.Errorf("could't initialize jetstream: %w", err)
62+
}
63+
64+
kv, err := js.KeyValue(bucket)
65+
if err != nil {
66+
return nil, fmt.Errorf("could't open kv bucket: %w", err)
67+
}
68+
69+
return &Client{
70+
nc: nc,
71+
kv: kv,
72+
}, nil
73+
}
74+
75+
// Close is only meant to fulfill the easykv.ReadWatcher interface.
76+
// Does nothing.
77+
func (c *Client) Close() {
78+
c.nc.Close()
79+
}
80+
81+
func clean(key string) string {
82+
newKey := "/" + key
83+
return cleanReplacer.Replace(strings.ToLower(newKey))
84+
}
85+
86+
// GetValues is used to lookup all keys with a prefix.
87+
// Several prefixes can be specified in the keys array.
88+
func (c *Client) GetValues(keys []string) (map[string]string, error) {
89+
allKeys, err := c.kv.Keys()
90+
if err != nil {
91+
return nil, fmt.Errorf("couldn't get keys: %w", err)
92+
}
93+
94+
// filter keys
95+
var filteredKeys []string
96+
for _, key := range keys {
97+
for _, k := range allKeys {
98+
if strings.HasPrefix(clean(k), key) {
99+
filteredKeys = append(filteredKeys, k)
100+
}
101+
}
102+
}
103+
104+
vars := make(map[string]string)
105+
for _, key := range filteredKeys {
106+
val, err := c.kv.Get(key)
107+
if err != nil {
108+
return nil, fmt.Errorf("couldn't get key: %v %w", key, err)
109+
}
110+
vars[clean(key)] = string(val.Value())
111+
}
112+
113+
return vars, nil
114+
}
115+
116+
// WatchPrefix
117+
func (c *Client) WatchPrefix(ctx context.Context, prefix string, opts ...easykv.WatchOption) (uint64, error) {
118+
var (
119+
options easykv.WatchOptions
120+
watcher nats.KeyWatcher
121+
err error
122+
)
123+
for _, o := range opts {
124+
o(&options)
125+
}
126+
127+
watcher, err = c.kv.WatchAll(nats.Context(ctx))
128+
if err != nil {
129+
return 0, fmt.Errorf("couldn't create nats watcher: %w", err)
130+
}
131+
132+
defer watcher.Stop()
133+
for v := range watcher.Updates() {
134+
if v == nil {
135+
break
136+
}
137+
for _, k := range options.Keys {
138+
if strings.HasPrefix(clean(string(v.Key())), k) {
139+
return v.Revision(), nil
140+
}
141+
}
142+
}
143+
144+
if ctx.Err() == context.Canceled {
145+
return options.WaitIndex, easykv.ErrWatchCanceled
146+
}
147+
return 0, err
148+
}

nats/client_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* This file is part of easyKV.
3+
* © 2022 The easyKV Authors
4+
*
5+
* For the full copyright and license information, please view the LICENSE
6+
* file that was distributed with this source code.
7+
*/
8+
9+
package nats
10+
11+
import (
12+
"context"
13+
"sync"
14+
"testing"
15+
"time"
16+
17+
"github.com/HeavyHorst/easykv/testutils"
18+
19+
. "gopkg.in/check.v1"
20+
)
21+
22+
// Hook up gocheck into the "go test" runner.
23+
func Test(t *testing.T) { TestingT(t) }
24+
25+
type FilterSuite struct{}
26+
27+
var _ = Suite(&FilterSuite{})
28+
29+
func (s *FilterSuite) TestGetValues(t *C) {
30+
c, err := New([]string{}, "config")
31+
if err != nil {
32+
t.Fatal(err)
33+
}
34+
35+
c.kv.PutString("premtest.database.url", "www.google.de")
36+
c.kv.PutString("premtest.database.user", "Boris")
37+
c.kv.PutString("remtest.database.hosts.0.name", "test1")
38+
c.kv.PutString("remtest.database.hosts.0.ip", "192.168.0.1")
39+
c.kv.PutString("remtest.database.hosts.0.size", "60")
40+
c.kv.PutString("remtest.database.hosts.1.name", "test2")
41+
c.kv.PutString("remtest.database.hosts.1.ip", "192.168.0.2")
42+
c.kv.PutString("remtest.database.hosts.1.size", "80")
43+
44+
testutils.GetValues(t, c)
45+
}
46+
47+
func (s *FilterSuite) TestWatchPrefix(t *C) {
48+
c, err := New([]string{}, "config")
49+
if err != nil {
50+
t.Fatal(err)
51+
}
52+
defer c.Close()
53+
54+
wg := sync.WaitGroup{}
55+
wg.Add(1)
56+
go func() {
57+
defer wg.Done()
58+
testutils.WatchPrefix(context.Background(), t, c, "", []string{"/"})
59+
}()
60+
61+
time.Sleep(100 * time.Millisecond)
62+
c.kv.Put("remtest.database.hosts.192.168.0.3", []byte("test3"))
63+
c.kv.Delete("remtest.database.hosts.192.168.0.3")
64+
wg.Wait()
65+
}
66+
67+
func (s *FilterSuite) TestWatchPrefixCancel(t *C) {
68+
c, err := New([]string{}, "config")
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
defer c.Close()
73+
74+
ctx, cancel := context.WithCancel(context.Background())
75+
wg := sync.WaitGroup{}
76+
wg.Add(1)
77+
go func() {
78+
defer wg.Done()
79+
testutils.WatchPrefix(ctx, t, c, "", []string{"/"})
80+
}()
81+
82+
cancel()
83+
wg.Wait()
84+
}

nats/options.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* This file is part of easyKV.
3+
* © 2022 The easyKV Authors
4+
*
5+
* For the full copyright and license information, please view the LICENSE
6+
* file that was distributed with this source code.
7+
*/
8+
9+
package nats
10+
11+
// Options contains all values that are needed to connect to nats.
12+
type Options struct {
13+
Nodes []string
14+
Auth BasicAuthOptions
15+
Token string
16+
Creds string
17+
}
18+
19+
// BasicAuthOptions contains options regarding to basic authentication.
20+
type BasicAuthOptions struct {
21+
Username string
22+
Password string
23+
}
24+
25+
// Option configures the nats client.
26+
type Option func(*Options)
27+
28+
// WithBasicAuth enables the basic authentication and sets the username and password.
29+
func WithBasicAuth(b BasicAuthOptions) Option {
30+
return func(o *Options) {
31+
o.Auth = b
32+
}
33+
}
34+
35+
// WithCredentials enables the NATS 2.0 and NATS NGS compatible user credentials and sets the path to the credentials file
36+
func WithCredentials(c string) Option {
37+
return func(o *Options) {
38+
o.Creds = c
39+
}
40+
}
41+
42+
// WithToken enables the token authentication and sets the token.
43+
func WithToken(t string) Option {
44+
return func(o *Options) {
45+
o.Token = t
46+
}
47+
}

0 commit comments

Comments
 (0)