Skip to content

Commit 261b13d

Browse files
authored
sqlreplay: fix Execute commands are not logged in replay output (#987)
1 parent 415b73e commit 261b13d

File tree

4 files changed

+171
-57
lines changed

4 files changed

+171
-57
lines changed

pkg/sqlreplay/cmd/audit_log_plugin.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, c
504504
CapturedPsID: stmtID,
505505
Type: pnet.ComStmtPrepare,
506506
StmtType: kvs[auditPluginKeyStmtType],
507+
PreparedStmt: sql,
507508
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, hack.Slice(sql)...),
508509
})
509510
}
@@ -517,6 +518,8 @@ func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, c
517518
CapturedPsID: stmtID,
518519
Type: pnet.ComStmtExecute,
519520
StmtType: kvs[auditPluginKeyStmtType],
521+
PreparedStmt: sql,
522+
Params: args,
520523
Payload: executeReq,
521524
})
522525

@@ -527,6 +530,7 @@ func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, c
527530
CapturedPsID: stmtID,
528531
Type: pnet.ComStmtClose,
529532
StmtType: kvs[auditPluginKeyStmtType],
533+
PreparedStmt: sql,
530534
Payload: pnet.MakeCloseStmtRequest(stmtID),
531535
})
532536
}

pkg/sqlreplay/cmd/audit_log_plugin_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ func TestDecodeSingleLine(t *testing.T) {
575575
CapturedPsID: 1,
576576
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...),
577577
StmtType: "Select",
578+
PreparedStmt: "select \"?\"",
578579
Success: true,
579580
},
580581
{
@@ -587,6 +588,8 @@ func TestDecodeSingleLine(t *testing.T) {
587588
CapturedPsID: 1,
588589
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...),
589590
StmtType: "Select",
591+
PreparedStmt: "select \"?\"",
592+
Params: []any{int64(1)},
590593
Success: true,
591594
},
592595
{
@@ -599,6 +602,7 @@ func TestDecodeSingleLine(t *testing.T) {
599602
CapturedPsID: 1,
600603
Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...),
601604
StmtType: "Select",
605+
PreparedStmt: "select \"?\"",
602606
Success: true,
603607
},
604608
},
@@ -780,6 +784,7 @@ func TestDecodeMultiLines(t *testing.T) {
780784
CapturedPsID: 1,
781785
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...),
782786
StmtType: "Select",
787+
PreparedStmt: "select \"?\"",
783788
Line: 1,
784789
Success: true,
785790
},
@@ -793,6 +798,8 @@ func TestDecodeMultiLines(t *testing.T) {
793798
CapturedPsID: 1,
794799
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...),
795800
StmtType: "Select",
801+
PreparedStmt: "select \"?\"",
802+
Params: []any{int64(1)},
796803
Line: 1,
797804
Success: true,
798805
},
@@ -806,6 +813,7 @@ func TestDecodeMultiLines(t *testing.T) {
806813
CapturedPsID: 1,
807814
Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...),
808815
StmtType: "Select",
816+
PreparedStmt: "select \"?\"",
809817
Line: 1,
810818
Success: true,
811819
},
@@ -819,6 +827,7 @@ func TestDecodeMultiLines(t *testing.T) {
819827
CapturedPsID: 2,
820828
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...),
821829
StmtType: "Select",
830+
PreparedStmt: "select \"?\"",
822831
Line: 2,
823832
Success: true,
824833
},
@@ -832,6 +841,8 @@ func TestDecodeMultiLines(t *testing.T) {
832841
CapturedPsID: 2,
833842
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{2, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...),
834843
StmtType: "Select",
844+
PreparedStmt: "select \"?\"",
845+
Params: []any{int64(1)},
835846
Line: 2,
836847
Success: true,
837848
},
@@ -845,6 +856,7 @@ func TestDecodeMultiLines(t *testing.T) {
845856
CapturedPsID: 2,
846857
Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{2, 0, 0, 0}...),
847858
StmtType: "Select",
859+
PreparedStmt: "select \"?\"",
848860
Line: 2,
849861
Success: true,
850862
},
@@ -1003,6 +1015,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
10031015
CapturedPsID: 1,
10041016
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...),
10051017
StmtType: "Select",
1018+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
10061019
Line: 1,
10071020
Success: true,
10081021
},
@@ -1016,6 +1029,8 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
10161029
CapturedPsID: 1,
10171030
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...),
10181031
StmtType: "Select",
1032+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1033+
Params: []any{int64(503784)},
10191034
Line: 1,
10201035
Success: true,
10211036
},
@@ -1050,6 +1065,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
10501065
CapturedPsID: 1,
10511066
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...),
10521067
StmtType: "Select",
1068+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
10531069
Line: 1,
10541070
Success: true,
10551071
},
@@ -1063,6 +1079,8 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
10631079
CapturedPsID: 1,
10641080
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...),
10651081
StmtType: "Select",
1082+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1083+
Params: []any{int64(503784)},
10661084
Line: 1,
10671085
Success: true,
10681086
},
@@ -1076,6 +1094,8 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
10761094
CapturedPsID: 1,
10771095
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xf9, 0xe4, 0x01, 0, 0, 0, 0, 0}...),
10781096
StmtType: "Select",
1097+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1098+
Params: []any{int64(124153)},
10791099
Line: 2,
10801100
Success: true,
10811101
},
@@ -1110,6 +1130,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
11101130
CapturedPsID: 1,
11111131
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...),
11121132
StmtType: "Select",
1133+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
11131134
Line: 1,
11141135
Success: true,
11151136
},
@@ -1123,6 +1144,8 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
11231144
CapturedPsID: 1,
11241145
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...),
11251146
StmtType: "Select",
1147+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1148+
Params: []any{int64(503784)},
11261149
Line: 1,
11271150
Success: true,
11281151
},
@@ -1149,6 +1172,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
11491172
CapturedPsID: 1,
11501173
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...),
11511174
StmtType: "Select",
1175+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
11521176
Line: 3,
11531177
Success: true,
11541178
},
@@ -1162,6 +1186,8 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) {
11621186
CapturedPsID: 1,
11631187
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xf9, 0xe4, 0x01, 0, 0, 0, 0, 0}...),
11641188
StmtType: "Select",
1189+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1190+
Params: []any{int64(124153)},
11651191
Line: 3,
11661192
Success: true,
11671193
},
@@ -1212,6 +1238,7 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) {
12121238
CapturedPsID: 1,
12131239
Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...),
12141240
StmtType: "Select",
1241+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
12151242
Line: 1,
12161243
Success: true,
12171244
},
@@ -1225,6 +1252,8 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) {
12251252
CapturedPsID: 1,
12261253
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...),
12271254
StmtType: "Select",
1255+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1256+
Params: []any{int64(503784)},
12281257
Line: 1,
12291258
Success: true,
12301259
},
@@ -1238,6 +1267,8 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) {
12381267
CapturedPsID: 1,
12391268
Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...),
12401269
StmtType: "Select",
1270+
PreparedStmt: "SELECT c FROM sbtest1 WHERE id=?",
1271+
Params: []any{int64(503784)},
12411272
Line: 2,
12421273
Success: true,
12431274
},

pkg/sqlreplay/conn/conn.go

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,8 @@ func (c *conn) Run(ctx context.Context) {
182182
// If the backend is upgrading, the connections may drop but the QPS should not drop too much.
183183
if !connected {
184184
if err := c.backendConn.Connect(ctx, command.Value.CurDB); err != nil {
185-
c.lg.Debug("failed to connect backend", zap.String("db", command.Value.CurDB), zap.Error(err))
186-
c.replayStats.ExceptionCmds.Add(1)
187-
c.exceptionCh <- NewOtherException(err, c.upstreamConnID)
185+
c.lg.Info("failed to connect backend", zap.String("db", command.Value.CurDB), zap.Error(err))
186+
c.onDisconnected(err)
188187
continue
189188
}
190189
connected = true
@@ -201,32 +200,32 @@ func (c *conn) Run(ctx context.Context) {
201200
}
202201
curDB = command.Value.CurDB
203202
}
204-
if !c.updateExecuteStmt(command.Value) {
203+
// update psID, SQL, and params for the command.
204+
if err := c.updateExecuteStmt(command.Value); err != nil {
205205
c.replayStats.ExceptionCmds.Add(1)
206-
c.exceptionCh <- NewFailException(errors.Errorf("prepared statement ID %d not found", command.Value.CapturedPsID), command.Value)
206+
c.exceptionCh <- NewFailException(err, command.Value)
207207
continue
208208
}
209209
startTime := time.Now()
210-
if resp := c.backendConn.ExecuteCmd(ctx, command.Value.Payload); resp.Err != nil {
210+
resp := c.backendConn.ExecuteCmd(ctx, command.Value.Payload)
211+
latency := time.Since(startTime)
212+
if resp.Err != nil {
211213
if errors.Is(resp.Err, backend.ErrClosing) || pnet.IsDisconnectError(resp.Err) {
212-
c.replayStats.ExceptionCmds.Add(1)
213-
c.exceptionCh <- NewOtherException(resp.Err, c.upstreamConnID)
214-
c.lg.Debug("backend connection disconnected", zap.Error(resp.Err))
214+
c.onDisconnected(resp.Err)
215+
c.lg.Info("backend connection disconnected", zap.Error(resp.Err))
215216
connected = false
216217
curDB = ""
217218
continue
218219
}
219-
if c.updateCmdForExecuteStmt(command.Value) {
220-
c.replayStats.ExceptionCmds.Add(1)
221-
c.exceptionCh <- NewFailException(resp.Err, command.Value)
222-
}
220+
c.replayStats.ExceptionCmds.Add(1)
221+
c.exceptionCh <- NewFailException(resp.Err, command.Value)
223222
} else {
224223
c.updatePreparedStmts(command.Value.CapturedPsID, command.Value.Payload, resp)
225224
}
226225
c.execInfoCh <- ExecInfo{
227226
Command: command.Value,
228227
StartTime: startTime,
229-
CostTime: time.Since(startTime),
228+
CostTime: latency,
230229
}
231230
c.replayStats.ReplayedCmds.Add(1)
232231
}
@@ -255,34 +254,6 @@ func (c *conn) isReadOnly(command *cmd.Command) bool {
255254
return true
256255
}
257256

258-
// update the params and sql text for the ComStmtExecute for recording errors.
259-
func (c *conn) updateCmdForExecuteStmt(command *cmd.Command) bool {
260-
// updated before
261-
if command.PreparedStmt != "" {
262-
return true
263-
}
264-
switch command.Type {
265-
case pnet.ComStmtExecute, pnet.ComStmtClose, pnet.ComStmtSendLongData, pnet.ComStmtReset, pnet.ComStmtFetch:
266-
stmtID := binary.LittleEndian.Uint32(command.Payload[1:5])
267-
ps := c.preparedStmts[stmtID]
268-
if len(ps.text) == 0 {
269-
// Maybe the connection is reconnected after disconnection and the prepared statements are lost.
270-
return false
271-
}
272-
if command.Type == pnet.ComStmtExecute {
273-
_, args, _, err := pnet.ParseExecuteStmtRequest(command.Payload, ps.paramNum, ps.paramTypes)
274-
if err != nil {
275-
// Failing to parse the request is not critical, so don't return false.
276-
c.lg.Error("parsing ComExecuteStmt request failed", zap.Uint32("stmt_id", stmtID), zap.String("sql", ps.text),
277-
zap.Int("param_num", ps.paramNum), zap.ByteString("param_types", ps.paramTypes), zap.Error(err))
278-
}
279-
command.Params = args
280-
}
281-
command.PreparedStmt = ps.text
282-
}
283-
return true
284-
}
285-
286257
// maintain prepared statement info so that we can find its info when:
287258
// - Judge whether an EXECUTE command is readonly
288259
// - Get the error message when an EXECUTE command fails
@@ -311,9 +282,8 @@ func (c *conn) updatePreparedStmts(capturedPsID uint32, request []byte, resp Exe
311282
delete(c.preparedStmts, stmtID)
312283
delete(c.psIDMapping, capturedPsID)
313284
case pnet.ComChangeUser.Byte(), pnet.ComResetConnection.Byte():
314-
for stmtID := range c.preparedStmts {
315-
delete(c.preparedStmts, stmtID)
316-
}
285+
clear(c.preparedStmts)
286+
clear(c.psIDMapping)
317287
case pnet.ComQuery.Byte():
318288
if len(request[1:]) > len(setSessionStates) && strings.EqualFold(hack.String(request[1:len(setSessionStates)+1]), setSessionStates) {
319289
query := request[len(setSessionStates)+1:]
@@ -332,23 +302,53 @@ func (c *conn) updatePreparedStmts(capturedPsID uint32, request []byte, resp Exe
332302
}
333303
}
334304

335-
// Update the prepared statement ID in the EXECUTE/FETCH/RESET/SEND_LONG_DATA/CLOSE command.
305+
// Update the prepared statement ID and SQL text in the EXECUTE/FETCH/RESET/SEND_LONG_DATA/CLOSE command.
336306
// If the prepared statement is not found, maybe the previous PREPARE failed or the connection
337-
// is reconnected after disconnection, so return false and do not continue.
338-
func (c *conn) updateExecuteStmt(command *cmd.Command) bool {
339-
// Native traffic replay doesn't set the CapturedPsID yet.
340-
if command.CapturedPsID == 0 {
341-
return true
342-
}
307+
// is reconnected after disconnection, so return error and do not continue.
308+
func (c *conn) updateExecuteStmt(command *cmd.Command) error {
343309
switch command.Type {
344310
case pnet.ComStmtExecute, pnet.ComStmtFetch, pnet.ComStmtClose, pnet.ComStmtReset, pnet.ComStmtSendLongData:
345-
replayID, ok := c.psIDMapping[command.CapturedPsID]
346-
if !ok {
347-
return false
311+
default:
312+
return nil
313+
}
314+
315+
var replayPsID uint32
316+
if command.CapturedPsID != 0 {
317+
var ok bool
318+
if replayPsID, ok = c.psIDMapping[command.CapturedPsID]; !ok {
319+
// Maybe the connection is reconnected after disconnection and the prepared statements are lost.
320+
return errors.Errorf("prepared statement ID %d not found", command.CapturedPsID)
348321
}
349-
binary.LittleEndian.PutUint32(command.Payload[1:], replayID)
322+
binary.LittleEndian.PutUint32(command.Payload[1:], replayPsID)
323+
} else {
324+
// Native traffic replay doesn't set the CapturedPsID yet.
325+
replayPsID = binary.LittleEndian.Uint32(command.Payload[1:5])
350326
}
351-
return true
327+
328+
ps := c.preparedStmts[replayPsID]
329+
if len(ps.text) == 0 {
330+
return errors.Errorf("prepared statement text is empty. capturePsID: %d, replayPsID: %d", command.CapturedPsID, replayPsID)
331+
}
332+
// Actually the PreparedStmt and Params are already set for audit-log based replay.
333+
command.PreparedStmt = ps.text
334+
if command.Type == pnet.ComStmtExecute && command.Params == nil {
335+
// Native capture only contains the binary request. We may need the args to report errors.
336+
_, args, _, err := pnet.ParseExecuteStmtRequest(command.Payload, ps.paramNum, ps.paramTypes)
337+
if err != nil {
338+
// Failing to parse the request is not critical, so don't return false.
339+
c.lg.Error("parsing ComExecuteStmt request failed", zap.Uint32("replay_stmt_id", replayPsID), zap.String("sql", ps.text),
340+
zap.Int("param_num", ps.paramNum), zap.ByteString("param_types", ps.paramTypes), zap.Error(err))
341+
}
342+
command.Params = args
343+
}
344+
return nil
345+
}
346+
347+
func (c *conn) onDisconnected(err error) {
348+
c.replayStats.ExceptionCmds.Add(1)
349+
c.exceptionCh <- NewOtherException(err, c.upstreamConnID)
350+
clear(c.psIDMapping)
351+
clear(c.preparedStmts)
352352
}
353353

354354
// ExecuteCmd executes a command asynchronously by adding it to the list.

0 commit comments

Comments
 (0)