Skip to content

Commit 7423b5e

Browse files
Merge pull request #1050 from kaleido-io/fftm-acks
Handle FFTM new style acks with batchNumber
2 parents d8c6514 + 92d190b commit 7423b5e

File tree

3 files changed

+96
-12
lines changed

3 files changed

+96
-12
lines changed

internal/blockchain/ethereum/ethereum.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ type queryOutput struct {
7777
}
7878

7979
type ethWSCommandPayload struct {
80-
Type string `json:"type"`
81-
Topic string `json:"topic,omitempty"`
80+
Type string `json:"type"`
81+
Topic string `json:"topic,omitempty"`
82+
BatchNumber int64 `json:"batchNumber,omitempty"`
8283
}
8384

8485
type ethError struct {
@@ -438,7 +439,6 @@ func (e *Ethereum) eventLoop() {
438439
defer close(e.closed)
439440
l := log.L(e.ctx).WithField("role", "event-loop")
440441
ctx := log.WithLogger(e.ctx, l)
441-
ack, _ := json.Marshal(map[string]string{"type": "ack", "topic": e.topic})
442442
for {
443443
select {
444444
case <-ctx.Done():
@@ -461,10 +461,32 @@ func (e *Ethereum) eventLoop() {
461461
case []interface{}:
462462
err = e.handleMessageBatch(ctx, msgTyped)
463463
if err == nil {
464+
ack, _ := json.Marshal(&ethWSCommandPayload{
465+
Type: "ack",
466+
Topic: e.topic,
467+
})
464468
err = e.wsconn.Send(ctx, ack)
465469
}
466470
case map[string]interface{}:
467-
e.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
471+
isBatch := false
472+
if batchNumber, ok := msgTyped["batchNumber"].(float64); ok {
473+
if events, ok := msgTyped["events"].([]interface{}); ok {
474+
// FFTM delivery with a batch number to use in the ack
475+
isBatch = true
476+
err = e.handleMessageBatch(ctx, events)
477+
if err == nil {
478+
ack, _ := json.Marshal(&ethWSCommandPayload{
479+
Type: "ack",
480+
Topic: e.topic,
481+
BatchNumber: int64(batchNumber),
482+
})
483+
err = e.wsconn.Send(ctx, ack)
484+
}
485+
}
486+
}
487+
if !isBatch {
488+
e.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
489+
}
468490
default:
469491
l.Errorf("Message unexpected: %+v", msgTyped)
470492
continue

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,68 @@ func TestInitMissingTopic(t *testing.T) {
162162
assert.Regexp(t, "FF10138.*topic", err)
163163
}
164164

165+
func TestInitAndStartWithEthConnect(t *testing.T) {
166+
167+
log.SetLevel("trace")
168+
e, cancel := newTestEthereum()
169+
defer cancel()
170+
171+
toServer, fromServer, wsURL, done := wsclient.NewTestWSServer(nil)
172+
defer done()
173+
174+
mockedClient := &http.Client{}
175+
httpmock.ActivateNonDefault(mockedClient)
176+
defer httpmock.DeactivateAndReset()
177+
178+
u, _ := url.Parse(wsURL)
179+
u.Scheme = "http"
180+
httpURL := u.String()
181+
182+
httpmock.RegisterResponder("GET", fmt.Sprintf("%s/eventstreams", httpURL),
183+
httpmock.NewJsonResponderOrPanic(200, []eventStream{}))
184+
httpmock.RegisterResponder("POST", fmt.Sprintf("%s/eventstreams", httpURL),
185+
httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"}))
186+
187+
resetConf(e)
188+
utEthconnectConf.Set(ffresty.HTTPConfigURL, httpURL)
189+
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
190+
utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "/instances/0x71C7656EC7ab88b098defB751B7401B5f6d8976F")
191+
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")
192+
utFFTMConf.Set(ffresty.HTTPConfigURL, "http://ethc.example.com:12345")
193+
194+
cmi := &cachemocks.Manager{}
195+
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil)
196+
err := e.Init(e.ctx, e.cancelCtx, utConfig, e.metrics, cmi)
197+
assert.NoError(t, err)
198+
assert.NotNil(t, e.fftmClient)
199+
200+
assert.Equal(t, "ethereum", e.Name())
201+
assert.Equal(t, core.VerifierTypeEthAddress, e.VerifierType())
202+
203+
assert.NoError(t, err)
204+
205+
assert.Equal(t, 2, httpmock.GetTotalCallCount())
206+
assert.Equal(t, "es12345", e.streamID)
207+
assert.NotNil(t, e.Capabilities())
208+
209+
err = e.Start()
210+
assert.NoError(t, err)
211+
212+
startupMessage := <-toServer
213+
assert.Equal(t, `{"type":"listen","topic":"topic1"}`, startupMessage)
214+
startupMessage = <-toServer
215+
assert.Equal(t, `{"type":"listenreplies"}`, startupMessage)
216+
fromServer <- `[]` // empty batch, will be ignored, but acked
217+
reply := <-toServer
218+
assert.Equal(t, `{"type":"ack","topic":"topic1"}`, reply)
219+
220+
// Bad data will be ignored
221+
fromServer <- `!json`
222+
fromServer <- `{"not": "a reply"}`
223+
fromServer <- `42`
224+
225+
}
226+
165227
func TestInitAndStartWithFFTM(t *testing.T) {
166228

167229
log.SetLevel("trace")
@@ -213,9 +275,9 @@ func TestInitAndStartWithFFTM(t *testing.T) {
213275
assert.Equal(t, `{"type":"listen","topic":"topic1"}`, startupMessage)
214276
startupMessage = <-toServer
215277
assert.Equal(t, `{"type":"listenreplies"}`, startupMessage)
216-
fromServer <- `[]` // empty batch, will be ignored, but acked
278+
fromServer <- `{"batchNumber":12345,"events":[]}` // empty batch, will be ignored, but acked
217279
reply := <-toServer
218-
assert.Equal(t, `{"topic":"topic1","type":"ack"}`, reply)
280+
assert.Equal(t, `{"type":"ack","topic":"topic1","batchNumber":12345}`, reply)
219281

220282
// Bad data will be ignored
221283
fromServer <- `!json`

manifest.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
},
77
"evmconnect": {
88
"image": "ghcr.io/hyperledger/firefly-evmconnect",
9-
"tag": "v1.1.6",
10-
"sha": "1fc4bbf662477e96701684446f7b4639bb1cfa1c39ceb2bd0a422877f32578cd"
9+
"tag": "v1.1.7",
10+
"sha": "a7d1282c75a6a19d399829de1b2868187c41489e55c4ddd2c669131776b7c1f6"
1111
},
1212
"fabconnect": {
1313
"image": "ghcr.io/hyperledger/firefly-fabconnect",
@@ -21,13 +21,13 @@
2121
},
2222
"tokens-erc1155": {
2323
"image": "ghcr.io/hyperledger/firefly-tokens-erc1155",
24-
"tag": "v1.1.2",
25-
"sha": "552e65b971709f605fb07d98c5a8899f5f567b717c783eeb0282136dc961c6d4"
24+
"tag": "v1.1.3",
25+
"sha": "48dd255d84cf9ce3682a8b6a3379dd173ecaf48fdf1777383ac40705778ee725"
2626
},
2727
"tokens-erc20-erc721": {
2828
"image": "ghcr.io/hyperledger/firefly-tokens-erc20-erc721",
29-
"tag": "v1.1.1",
30-
"sha": "2e04f9be8ed0b72ac0754ff8bb9ce52c4e1d79b45716fd2677e5bf1e193697f3"
29+
"tag": "v1.1.2",
30+
"sha": "e1612497060206137d43bf9bdbc39907f1835a05d6c2dcb20a130c13224c68c9"
3131
},
3232
"signer": {
3333
"image": "ghcr.io/hyperledger/firefly-signer",

0 commit comments

Comments
 (0)