Skip to content

Commit 85eee12

Browse files
Handle FFTM new style acks with batchNumber
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent d8c6514 commit 85eee12

File tree

2 files changed

+90
-6
lines changed

2 files changed

+90
-6
lines changed

internal/blockchain/ethereum/ethereum.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ type queryOutput struct {
7777
}
7878

7979
type ethWSCommandPayload struct {
80-
Type string `json:"type"`
81-
Topic string `json:"topic,omitempty"`
80+
Type string `json:"type"`
81+
Topic string `json:"topic,omitempty"`
82+
BatchNumber int64 `json:"batchNumber,omitempty"`
8283
}
8384

8485
type ethError struct {
@@ -438,7 +439,6 @@ func (e *Ethereum) eventLoop() {
438439
defer close(e.closed)
439440
l := log.L(e.ctx).WithField("role", "event-loop")
440441
ctx := log.WithLogger(e.ctx, l)
441-
ack, _ := json.Marshal(map[string]string{"type": "ack", "topic": e.topic})
442442
for {
443443
select {
444444
case <-ctx.Done():
@@ -461,10 +461,32 @@ func (e *Ethereum) eventLoop() {
461461
case []interface{}:
462462
err = e.handleMessageBatch(ctx, msgTyped)
463463
if err == nil {
464+
ack, _ := json.Marshal(&ethWSCommandPayload{
465+
Type: "ack",
466+
Topic: e.topic,
467+
})
464468
err = e.wsconn.Send(ctx, ack)
465469
}
466470
case map[string]interface{}:
467-
e.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
471+
isBatch := false
472+
if batchNumber, ok := msgTyped["batchNumber"].(float64); ok {
473+
if events, ok := msgTyped["events"].([]interface{}); ok {
474+
// FFTM delivery with a batch number to use in the ack
475+
isBatch = true
476+
err = e.handleMessageBatch(ctx, events)
477+
if err == nil {
478+
ack, _ := json.Marshal(&ethWSCommandPayload{
479+
Type: "ack",
480+
Topic: e.topic,
481+
BatchNumber: int64(batchNumber),
482+
})
483+
err = e.wsconn.Send(ctx, ack)
484+
}
485+
}
486+
}
487+
if !isBatch {
488+
e.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
489+
}
468490
default:
469491
l.Errorf("Message unexpected: %+v", msgTyped)
470492
continue

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,68 @@ func TestInitMissingTopic(t *testing.T) {
162162
assert.Regexp(t, "FF10138.*topic", err)
163163
}
164164

165+
func TestInitAndStartWithEthConnect(t *testing.T) {
166+
167+
log.SetLevel("trace")
168+
e, cancel := newTestEthereum()
169+
defer cancel()
170+
171+
toServer, fromServer, wsURL, done := wsclient.NewTestWSServer(nil)
172+
defer done()
173+
174+
mockedClient := &http.Client{}
175+
httpmock.ActivateNonDefault(mockedClient)
176+
defer httpmock.DeactivateAndReset()
177+
178+
u, _ := url.Parse(wsURL)
179+
u.Scheme = "http"
180+
httpURL := u.String()
181+
182+
httpmock.RegisterResponder("GET", fmt.Sprintf("%s/eventstreams", httpURL),
183+
httpmock.NewJsonResponderOrPanic(200, []eventStream{}))
184+
httpmock.RegisterResponder("POST", fmt.Sprintf("%s/eventstreams", httpURL),
185+
httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"}))
186+
187+
resetConf(e)
188+
utEthconnectConf.Set(ffresty.HTTPConfigURL, httpURL)
189+
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
190+
utEthconnectConf.Set(EthconnectConfigInstanceDeprecated, "/instances/0x71C7656EC7ab88b098defB751B7401B5f6d8976F")
191+
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")
192+
utFFTMConf.Set(ffresty.HTTPConfigURL, "http://ethc.example.com:12345")
193+
194+
cmi := &cachemocks.Manager{}
195+
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil)
196+
err := e.Init(e.ctx, e.cancelCtx, utConfig, e.metrics, cmi)
197+
assert.NoError(t, err)
198+
assert.NotNil(t, e.fftmClient)
199+
200+
assert.Equal(t, "ethereum", e.Name())
201+
assert.Equal(t, core.VerifierTypeEthAddress, e.VerifierType())
202+
203+
assert.NoError(t, err)
204+
205+
assert.Equal(t, 2, httpmock.GetTotalCallCount())
206+
assert.Equal(t, "es12345", e.streamID)
207+
assert.NotNil(t, e.Capabilities())
208+
209+
err = e.Start()
210+
assert.NoError(t, err)
211+
212+
startupMessage := <-toServer
213+
assert.Equal(t, `{"type":"listen","topic":"topic1"}`, startupMessage)
214+
startupMessage = <-toServer
215+
assert.Equal(t, `{"type":"listenreplies"}`, startupMessage)
216+
fromServer <- `[]` // empty batch, will be ignored, but acked
217+
reply := <-toServer
218+
assert.Equal(t, `{"type":"ack","topic":"topic1"}`, reply)
219+
220+
// Bad data will be ignored
221+
fromServer <- `!json`
222+
fromServer <- `{"not": "a reply"}`
223+
fromServer <- `42`
224+
225+
}
226+
165227
func TestInitAndStartWithFFTM(t *testing.T) {
166228

167229
log.SetLevel("trace")
@@ -213,9 +275,9 @@ func TestInitAndStartWithFFTM(t *testing.T) {
213275
assert.Equal(t, `{"type":"listen","topic":"topic1"}`, startupMessage)
214276
startupMessage = <-toServer
215277
assert.Equal(t, `{"type":"listenreplies"}`, startupMessage)
216-
fromServer <- `[]` // empty batch, will be ignored, but acked
278+
fromServer <- `{"batchNumber":12345,"events":[]}` // empty batch, will be ignored, but acked
217279
reply := <-toServer
218-
assert.Equal(t, `{"topic":"topic1","type":"ack"}`, reply)
280+
assert.Equal(t, `{"type":"ack","topic":"topic1","batchNumber":12345}`, reply)
219281

220282
// Bad data will be ignored
221283
fromServer <- `!json`

0 commit comments

Comments
 (0)