Skip to content

Commit fc28d33

Browse files
Merge pull request #439 from kaleido-io/backport-438
v0.11.x backport: Ready state changes require a bump to the message to re-sequence it
2 parents b58663e + eab45d8 commit fc28d33

File tree

6 files changed

+92
-6
lines changed

6 files changed

+92
-6
lines changed

internal/database/sqlcommon/message_sql.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,34 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message,
183183
return s.commitTx(ctx, tx, autoCommit)
184184
}
185185

186+
// In SQL update+bump is a delete+insert within a TX
187+
func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) {
188+
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
189+
if err != nil {
190+
return err
191+
}
192+
defer s.rollbackTx(ctx, tx, autoCommit)
193+
194+
if err := s.deleteTx(ctx, tx,
195+
sq.Delete("messages").
196+
Where(sq.And{
197+
sq.Eq{"id": message.Header.ID},
198+
}),
199+
nil, // no change event
200+
); err != nil {
201+
return err
202+
}
203+
204+
if err = s.attemptMessageInsert(ctx, tx, message); err != nil {
205+
return err
206+
}
207+
208+
// Note there is no call to updateMessageDataRefs as the data refs are not allowed to change,
209+
// and are correlated by UUID (not sequence)
210+
211+
return s.commitTx(ctx, tx, autoCommit)
212+
}
213+
186214
func (s *SQLCommon) updateMessageDataRefs(ctx context.Context, tx *txWrapper, message *fftypes.Message, recreateDatarefs bool) error {
187215

188216
if recreateDatarefs {

internal/database/sqlcommon/message_sql_test.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
6161
TxType: fftypes.TransactionTypeNone,
6262
},
6363
Hash: fftypes.NewRandB32(),
64-
State: fftypes.MessageStateReady,
64+
State: fftypes.MessageStateStaged,
6565
Confirmed: nil,
6666
Data: []*fftypes.DataRef{
6767
{ID: dataID1, Hash: rand1},
6868
{ID: dataID2, Hash: rand2},
6969
},
7070
}
7171

72-
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return()
72+
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return().Twice()
7373
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeUpdated, "ns12345", msgID, mock.Anything).Return()
7474

7575
err := s.UpsertMessage(ctx, msg, database.UpsertOptimizationNew)
@@ -205,6 +205,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
205205
assert.Equal(t, 1, len(msgs))
206206
assert.Equal(t, *bid2, *msgs[0].BatchID)
207207

208+
// Bump and Update - this is for a ready transition
209+
msgUpdated.State = fftypes.MessageStateReady
210+
err = s.ReplaceMessage(context.Background(), msgUpdated)
211+
assert.NoError(t, err)
212+
msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID)
213+
msgJson, _ = json.Marshal(&msgUpdated)
214+
msgReadJson, _ = json.Marshal(msgRead)
215+
assert.Equal(t, string(msgJson), string(msgReadJson))
216+
208217
s.callbacks.AssertExpectations(t)
209218
}
210219

@@ -276,6 +285,38 @@ func TestUpsertMessageFailCommit(t *testing.T) {
276285
assert.NoError(t, mock.ExpectationsWereMet())
277286
}
278287

288+
func TestReplaceMessageFailBegin(t *testing.T) {
289+
s, mock := newMockProvider().init()
290+
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
291+
msgID := fftypes.NewUUID()
292+
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
293+
assert.Regexp(t, "FF10114", err)
294+
assert.NoError(t, mock.ExpectationsWereMet())
295+
}
296+
297+
func TestReplaceMessageFailDelete(t *testing.T) {
298+
s, mock := newMockProvider().init()
299+
mock.ExpectBegin()
300+
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
301+
mock.ExpectRollback()
302+
msgID := fftypes.NewUUID()
303+
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
304+
assert.Regexp(t, "FF10118", err)
305+
assert.NoError(t, mock.ExpectationsWereMet())
306+
}
307+
308+
func TestReplaceMessageFailInsert(t *testing.T) {
309+
s, mock := newMockProvider().init()
310+
mock.ExpectBegin()
311+
mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1))
312+
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
313+
mock.ExpectRollback()
314+
msgID := fftypes.NewUUID()
315+
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
316+
assert.Regexp(t, "FF10116", err)
317+
assert.NoError(t, mock.ExpectationsWereMet())
318+
}
319+
279320
func TestUpdateMessageDataRefsNilID(t *testing.T) {
280321
s, mock := newMockProvider().init()
281322
msgID := fftypes.NewUUID()

internal/events/tokens_transferred.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, poolProtocolID strin
128128
if msg.State == fftypes.MessageStateStaged {
129129
// Message can now be sent
130130
msg.State = fftypes.MessageStateReady
131-
if err := em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting); err != nil {
131+
if err := em.database.ReplaceMessage(ctx, msg); err != nil {
132132
return err
133133
}
134134
} else {

internal/events/tokens_transferred_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,10 +371,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) {
371371
mdi.On("UpsertTokenTransfer", em.ctx, transfer).Return(nil).Times(2)
372372
mdi.On("UpdateTokenBalances", em.ctx, transfer).Return(nil).Times(2)
373373
mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2)
374-
mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop"))
375-
mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
374+
mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
376375
return msg.State == fftypes.MessageStateReady
377-
}), database.UpsertOptimizationExisting).Return(nil)
376+
})).Return(fmt.Errorf("pop"))
378377
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool {
379378
return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace
380379
})).Return(nil).Once()

mocks/databasemocks/plugin.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/database/plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ type iMessageCollection interface {
8080
// UpdateMessage - Update message
8181
UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error)
8282

83+
// ReplaceMessage updates the message, and assigns it a new sequence number at the front of the list.
84+
// A new event is raised for the message, with the new sequence number - as if it was brand new.
85+
ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error)
86+
8387
// UpdateMessages - Update messages
8488
UpdateMessages(ctx context.Context, filter Filter, update Update) (err error)
8589

0 commit comments

Comments
 (0)