Skip to content

Commit b8c6173

Browse files
committed
[CF-557] Changelog updates should wait for transaction commit
1 parent 3652065 commit b8c6173

File tree

6 files changed

+191
-6
lines changed

6 files changed

+191
-6
lines changed

pkg/cmd/ctlstore/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ type supervisorCliConfig struct {
114114
type ledgerHealthConfig struct {
115115
Disable bool `conf:"disable" help:"disable ledger latency health attributing (DEPRECATED: use disable-ecs-behavior instead)"`
116116
DisableECSBehavior bool `conf:"disable-ecs-behavior" help:"disable ledger latency health attributing"`
117-
MaxHealthyLatency time.Duration `conf:"max-healty-latency" help:"Max latency considered healthy"`
117+
MaxHealthyLatency time.Duration `conf:"max-healthy-latency" help:"Max latency considered healthy"`
118118
AttributeName string `conf:"attribute-name" help:"The name of the attribute"`
119119
HealthyAttributeValue string `conf:"healthy-attribute-value" help:"The value of the attribute if healthy"`
120120
UnhealthyAttributeValue string `conf:"unhealth-attribute-value" help:"The value of the attribute if unhealthy"`

pkg/executive/db_executive.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames
319319
}
320320

321321
// We first write the column modification to the DML ledger within the transaction.
322-
// It's important that this is done befored the DDL is applied to the ctldb, as
322+
// It's important that this is done before the DDL is applied to the ctldb, as
323323
// the DDL is not able to be rolled back. In this way, if the DDL fails, the DML
324324
// can be rolled back.
325325
dlw := dmlLedgerWriter{Tx: tx, TableName: dmlLedgerTableName}

pkg/ldb/ldbs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,7 @@ func FetchSeqFromLdb(ctx context.Context, db *sql.DB) (schema.DMLSequence, error
121121
}
122122
return schema.DMLSequence(seq), err
123123
}
124+
125+
func IsInternalTable(name string) bool {
126+
return name == LDBSeqTableName || name == LDBLastUpdateTableName
127+
}

pkg/ldbwriter/ldb_callback_writer.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,52 @@ import (
77
"github.com/segmentio/ctlstore/pkg/schema"
88
"github.com/segmentio/ctlstore/pkg/sqlite"
99
"github.com/segmentio/events/v2"
10+
"github.com/segmentio/stats/v4"
1011
)
1112

1213
// CallbackWriter is an LDBWriter that delegates to another
1314
// writer and then, upon a successful write, executes N callbacks.
1415
type CallbackWriter struct {
15-
DB *sql.DB
16-
Delegate LDBWriter
17-
Callbacks []LDBWriteCallback
16+
DB *sql.DB
17+
Delegate LDBWriter
18+
Callbacks []LDBWriteCallback
19+
// Buffer between SQLite Callback and our code
1820
ChangeBuffer *sqlite.SQLChangeBuffer
21+
// Accumulated changes across multiple ApplyDMLStatement calls
22+
transactionChanges []sqlite.SQLiteWatchChange
1923
}
2024

2125
func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error {
2226
err := w.Delegate.ApplyDMLStatement(ctx, statement)
2327
if err != nil {
2428
return err
2529
}
30+
31+
// If beginning a transaction then start accumulating changes
32+
if statement.Statement == schema.DMLTxBeginKey {
33+
w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0)
34+
stats.Set("ldb_changes_accumulated", len(w.transactionChanges))
35+
return nil
36+
}
37+
2638
changes := w.ChangeBuffer.Pop()
39+
40+
// Are we in a transaction?
41+
if w.transactionChanges != nil {
42+
if statement.Statement == schema.DMLTxEndKey {
43+
// Transaction done! Send out the accumulated changes
44+
changes = append(w.transactionChanges, changes...)
45+
stats.Set("ldb_changes_accumulated", len(changes))
46+
w.transactionChanges = nil
47+
} else {
48+
// Transaction isn't over yet, save the latest changes
49+
w.transactionChanges = append(w.transactionChanges, changes...)
50+
stats.Set("ldb_changes_accumulated", len(w.transactionChanges))
51+
return nil
52+
}
53+
}
54+
55+
stats.Observe("ldb_changes_written", len(changes))
2756
for _, callback := range w.Callbacks {
2857
events.Debug("Writing DML callback for %{cb}T", callback)
2958
callback.LDBWritten(ctx, LDBWriteMetadata{
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package ldbwriter
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"github.com/segmentio/ctlstore/pkg/ldb"
7+
"github.com/segmentio/ctlstore/pkg/schema"
8+
"github.com/segmentio/ctlstore/pkg/sqlite"
9+
"github.com/stretchr/testify/assert"
10+
"testing"
11+
)
12+
13+
/*
14+
* Simple LDBWriteCallback handler that just stores the changes it gets.
15+
*/
16+
type TestUpdateCallbackHandler struct {
17+
Changes []sqlite.SQLiteWatchChange
18+
}
19+
20+
func (u *TestUpdateCallbackHandler) LDBWritten(ctx context.Context, data LDBWriteMetadata) {
21+
// The [:0] slice operation will reuse the underlying array of u.Changes if it's large enough
22+
// to hold all elements of data.Changes, otherwise it will allocate a new one.
23+
u.Changes = append(u.Changes[:0], data.Changes...)
24+
}
25+
26+
func (u *TestUpdateCallbackHandler) UpdateCount() int {
27+
return len(u.Changes)
28+
}
29+
30+
func (u *TestUpdateCallbackHandler) Reset() {
31+
u.Changes = u.Changes[:0]
32+
return
33+
}
34+
35+
/*
36+
* Test strategy:
37+
* Check how many times we get callbacks while applying DML statements,
38+
* and how many updates we get per callback.
39+
*/
40+
func TestCallbackWriter_ApplyDMLStatement(t *testing.T) {
41+
// Begin boilerplate
42+
var err error
43+
ctx := context.Background()
44+
var changeBuffer sqlite.SQLChangeBuffer
45+
dbName := "test_ldb_callback_writer"
46+
_ = sqlite.RegisterSQLiteWatch(dbName, &changeBuffer)
47+
48+
db, err := sql.Open(dbName, ":memory:")
49+
if err != nil {
50+
t.Fatalf("Unexpected error: %+v", err)
51+
}
52+
defer db.Close()
53+
54+
err = ldb.EnsureLdbInitialized(ctx, db)
55+
if err != nil {
56+
t.Fatalf("Couldn't initialize SQLite db, error %v", err)
57+
}
58+
// End boilerplate
59+
60+
// Set up the callback writer with our test callback handler
61+
ldbWriteCallback := &TestUpdateCallbackHandler{}
62+
63+
writer := CallbackWriter{
64+
DB: db,
65+
Delegate: &SqlLdbWriter{Db: db},
66+
Callbacks: []LDBWriteCallback{ldbWriteCallback},
67+
ChangeBuffer: &changeBuffer,
68+
}
69+
70+
err = writer.ApplyDMLStatement(ctx, schema.NewTestDMLStatement("CREATE TABLE foo (bar VARCHAR);"))
71+
if err != nil {
72+
t.Fatalf("Could not issue CREATE TABLE statements, error %v", err)
73+
}
74+
75+
type args struct {
76+
ctx context.Context
77+
statements []schema.DMLStatement
78+
}
79+
tests := []struct {
80+
name string
81+
args args
82+
expectedCallbacks int
83+
expectedUpdatesPerCallback int
84+
wantErr bool
85+
}{
86+
{
87+
name: "Test 1",
88+
args: args{
89+
ctx: ctx,
90+
statements: []schema.DMLStatement{schema.NewTestDMLStatement("INSERT INTO foo VALUES('dummy');")},
91+
},
92+
expectedCallbacks: 1,
93+
expectedUpdatesPerCallback: 1,
94+
wantErr: false,
95+
},
96+
{
97+
name: "Test 2",
98+
args: args{
99+
ctx: ctx,
100+
statements: []schema.DMLStatement{
101+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('boston');"),
102+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('detroit');"),
103+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('chicago');"),
104+
},
105+
},
106+
// bare statements outside of a transaction should get a callback each time
107+
expectedCallbacks: 3,
108+
expectedUpdatesPerCallback: 1,
109+
wantErr: false,
110+
},
111+
{
112+
name: "Test 3",
113+
args: args{
114+
ctx: ctx,
115+
statements: []schema.DMLStatement{
116+
schema.NewTestDMLStatement(schema.DMLTxBeginKey),
117+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('asdf');"),
118+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('foo');"),
119+
schema.NewTestDMLStatement("INSERT INTO foo VALUES('bar');"),
120+
schema.NewTestDMLStatement(schema.DMLTxEndKey),
121+
},
122+
},
123+
// since it's a transaction, we expect only one callback, and it should have all 3 updates
124+
expectedCallbacks: 1,
125+
expectedUpdatesPerCallback: 3,
126+
wantErr: false,
127+
},
128+
}
129+
for _, tt := range tests {
130+
t.Run(tt.name, func(t *testing.T) {
131+
callbackCount := 0
132+
for _, statement := range tt.args.statements {
133+
if err := writer.ApplyDMLStatement(tt.args.ctx, statement); (err != nil) != tt.wantErr {
134+
t.Errorf("ApplyDMLStatement() error = %v, wantErr %v", err, tt.wantErr)
135+
}
136+
// did we get a callback from that statement being applied?
137+
if ldbWriteCallback.UpdateCount() > 0 {
138+
callbackCount++
139+
assert.Equal(t, tt.expectedUpdatesPerCallback, ldbWriteCallback.UpdateCount())
140+
// delete previous callback's update entries since we "handled" the callback
141+
ldbWriteCallback.Reset()
142+
}
143+
}
144+
assert.Equal(t, tt.expectedCallbacks, callbackCount)
145+
})
146+
}
147+
}

pkg/sqlite/sqlite_watch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package sqlite
33
import (
44
"context"
55
"database/sql"
6-
76
"github.com/pkg/errors"
7+
"github.com/segmentio/ctlstore/pkg/ldb"
88
"github.com/segmentio/ctlstore/pkg/scanfunc"
99
"github.com/segmentio/ctlstore/pkg/schema"
1010
"github.com/segmentio/go-sqlite3"
@@ -40,6 +40,11 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error {
4040
var newRow []interface{}
4141
var oldRow []interface{}
4242

43+
// Don't bother propagating updates of our internal bookkeeping tables
44+
if ldb.IsInternalTable(pud.TableName) {
45+
return
46+
}
47+
4348
if pud.Op == sqlite3.SQLITE_UPDATE || pud.Op == sqlite3.SQLITE_DELETE {
4449
oldRow = make([]interface{}, cnt)
4550
err := pud.Old(oldRow...)

0 commit comments

Comments
 (0)