Skip to content

Commit 80f6839

Browse files
Merge pull request #256 from kaleido-io/re-sub
Re-subscribe if the instance path changes
2 parents ea1f5ca + 352d60a commit 80f6839

File tree

4 files changed

+49
-40
lines changed

4 files changed

+49
-40
lines changed

internal/apiserver/http_server.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"io/ioutil"
2525
"net"
2626
"net/http"
27+
"time"
2728

2829
"github.com/gorilla/mux"
2930
"github.com/hyperledger/firefly/internal/config"
@@ -63,14 +64,15 @@ type IServer interface {
6364
}
6465

6566
type httpServer struct {
66-
name string
67-
s IServer
68-
l net.Listener
69-
conf config.Prefix
70-
onClose chan error
71-
tlsEnabled bool
72-
tlsCertFile string
73-
tlsKeyFile string
67+
name string
68+
s IServer
69+
l net.Listener
70+
conf config.Prefix
71+
onClose chan error
72+
tlsEnabled bool
73+
tlsCertFile string
74+
tlsKeyFile string
75+
shutdownTimeout time.Duration
7476
}
7577

7678
func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) {
@@ -88,12 +90,13 @@ func initHTTPConfPrefx(prefix config.Prefix, defaultPort int) {
8890

8991
func newHTTPServer(ctx context.Context, name string, r *mux.Router, onClose chan error, conf config.Prefix) (hs *httpServer, err error) {
9092
hs = &httpServer{
91-
name: name,
92-
onClose: onClose,
93-
conf: conf,
94-
tlsEnabled: conf.GetBool(HTTPConfTLSEnabled),
95-
tlsCertFile: conf.GetString(HTTPConfTLSCertFile),
96-
tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile),
93+
name: name,
94+
onClose: onClose,
95+
conf: conf,
96+
tlsEnabled: conf.GetBool(HTTPConfTLSEnabled),
97+
tlsCertFile: conf.GetString(HTTPConfTLSCertFile),
98+
tlsKeyFile: conf.GetString(HTTPConfTLSKeyFile),
99+
shutdownTimeout: config.GetDuration(config.APIShutdownTimeout),
97100
}
98101
hs.l, err = hs.createListener(ctx)
99102
if err == nil {
@@ -172,7 +175,7 @@ func (hs *httpServer) serveHTTP(ctx context.Context) {
172175
select {
173176
case <-ctx.Done():
174177
log.L(ctx).Infof("API server context cancelled - shutting down")
175-
shutdownContext, cancel := context.WithTimeout(context.Background(), config.GetDuration(config.APIShutdownTimeout))
178+
shutdownContext, cancel := context.WithTimeout(context.Background(), hs.shutdownTimeout)
176179
defer cancel()
177180
if err := hs.s.Shutdown(shutdownContext); err != nil {
178181
hs.onClose <- err

internal/blockchain/ethereum/ethereum.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package ethereum
1818

1919
import (
2020
"context"
21+
"crypto/sha256"
2122
"encoding/hex"
2223
"encoding/json"
2324
"fmt"
@@ -215,6 +216,11 @@ func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error
215216
}
216217

217218
func (e *Ethereum) ensureSubscriptions() error {
219+
// Include a hash of the instance path in the subscription, so if we ever point at a different
220+
// contract configuration, we re-subscribe from block 0.
221+
// We don't need full strength hashing, so just use the first 16 chars for readability.
222+
instanceUniqueHash := hex.EncodeToString(sha256.New().Sum([]byte(e.instancePath)))[0:16]
223+
218224
for eventType, subDesc := range requiredSubscriptions {
219225

220226
var existingSubs []*subscription
@@ -224,15 +230,20 @@ func (e *Ethereum) ensureSubscriptions() error {
224230
}
225231

226232
var sub *subscription
233+
subName := fmt.Sprintf("%s_%s", eventType, instanceUniqueHash)
227234
for _, s := range existingSubs {
228-
if s.Name == eventType {
235+
if s.Name == subName ||
236+
/* Check for the plain name we used to use originally, before adding uniqueness qualifier.
237+
If one of these very early environments needed a new subscription, the existing one would need to
238+
be deleted manually. */
239+
s.Name == eventType {
229240
sub = s
230241
}
231242
}
232243

233244
if sub == nil {
234245
newSub := subscription{
235-
Name: eventType,
246+
Name: subName,
236247
Description: subDesc,
237248
Stream: e.initInfo.stream.ID,
238249
FromBlock: "0",

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func TestInitAllExistingStreams(t *testing.T) {
206206
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
207207
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
208208
httpmock.NewJsonResponderOrPanic(200, []subscription{
209-
{ID: "sub12345", Name: "BatchPin"},
209+
{ID: "sub12345", Name: "BatchPin_2f696e7374616e63" /* this is the subname for our combo of instance path and BatchPin */},
210210
}))
211211

212212
resetConf()

internal/config/config.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,9 @@ func MergeConfig(configRecords []*fftypes.ConfigRecord) error {
393393
return nil
394394
}
395395

396-
var root = &configPrefix{
397-
keys: map[string]bool{}, // All keys go here, including those defined in sub prefixies
398-
}
396+
var knownKeys = map[string]bool{} // All keys go here, including those defined in sub prefixies
397+
var keysMutex sync.Mutex
398+
var root = &configPrefix{}
399399

400400
// ark adds a root key, used to define the keys that are used within the core
401401
func rootKey(k string) RootKey {
@@ -405,10 +405,10 @@ func rootKey(k string) RootKey {
405405

406406
// GetKnownKeys gets the known keys
407407
func GetKnownKeys() []string {
408-
var keys []string
409-
root.keysMutex.Lock()
410-
defer root.keysMutex.Unlock()
411-
for k := range root.keys {
408+
keys := make([]string, 0, len(knownKeys))
409+
keysMutex.Lock()
410+
defer keysMutex.Unlock()
411+
for k := range knownKeys {
412412
keys = append(keys, k)
413413
}
414414
sort.Strings(keys)
@@ -417,9 +417,7 @@ func GetKnownKeys() []string {
417417

418418
// configPrefix is the main config structure passed to plugins, and used for root to wrap viper
419419
type configPrefix struct {
420-
prefix string
421-
keys map[string]bool
422-
keysMutex sync.Mutex
420+
prefix string
423421
}
424422

425423
// configPrefixArray is a point in the config that supports an array
@@ -435,15 +433,14 @@ func NewPluginConfig(prefix string) Prefix {
435433
}
436434
return &configPrefix{
437435
prefix: prefix,
438-
keys: root.keys,
439436
}
440437
}
441438

442439
func (c *configPrefix) prefixKey(k string) string {
443-
c.keysMutex.Lock()
444-
defer c.keysMutex.Unlock()
440+
keysMutex.Lock()
441+
defer keysMutex.Unlock()
445442
key := c.prefix + k
446-
if !c.keys[key] {
443+
if !knownKeys[key] {
447444
panic(fmt.Sprintf("Undefined configuration key '%s'", key))
448445
}
449446
return key
@@ -452,7 +449,6 @@ func (c *configPrefix) prefixKey(k string) string {
452449
func (c *configPrefix) SubPrefix(suffix string) Prefix {
453450
return &configPrefix{
454451
prefix: c.prefix + suffix + ".",
455-
keys: root.keys,
456452
}
457453
}
458454

@@ -476,7 +472,6 @@ func (c *configPrefixArray) ArraySize() int {
476472
func (c *configPrefixArray) ArrayEntry(i int) Prefix {
477473
cp := &configPrefix{
478474
prefix: c.base + fmt.Sprintf(".%d.", i),
479-
keys: root.keys,
480475
}
481476
for knownKey, defValue := range c.defaults {
482477
cp.AddKnownKey(knownKey, defValue...)
@@ -495,9 +490,9 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix {
495490

496491
func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) {
497492
// Put a simulated key in the known keys array, to pop into the help info.
498-
root.keysMutex.Lock()
499-
defer root.keysMutex.Unlock()
500-
root.keys[fmt.Sprintf("%s[].%s", c.base, k)] = true
493+
keysMutex.Lock()
494+
defer keysMutex.Unlock()
495+
knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true
501496
c.defaults[k] = defValue
502497
}
503498

@@ -508,9 +503,9 @@ func (c *configPrefix) AddKnownKey(k string, defValue ...interface{}) {
508503
} else if len(defValue) > 0 {
509504
c.SetDefault(k, defValue)
510505
}
511-
c.keysMutex.Lock()
512-
defer c.keysMutex.Unlock()
513-
c.keys[key] = true
506+
keysMutex.Lock()
507+
defer keysMutex.Unlock()
508+
knownKeys[key] = true
514509
}
515510

516511
func (c *configPrefix) SetDefault(k string, defValue interface{}) {

0 commit comments

Comments
 (0)