Skip to content

Commit 415b73e

Browse files
authored
cmd, sqlreplay: add service mode and support to set address in replay config (#971)
Signed-off-by: Yang Keao <[email protected]>
1 parent 1c16be2 commit 415b73e

File tree

8 files changed

+168
-88
lines changed

8 files changed

+168
-88
lines changed

cmd/replayer/main.go

Lines changed: 36 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,13 @@ import (
1212
"syscall"
1313
"time"
1414

15-
"github.com/go-mysql-org/go-mysql/mysql"
1615
"github.com/pingcap/tiproxy/lib/config"
1716
"github.com/pingcap/tiproxy/lib/util/cmd"
18-
"github.com/pingcap/tiproxy/pkg/balance/router"
1917
"github.com/pingcap/tiproxy/pkg/manager/cert"
2018
"github.com/pingcap/tiproxy/pkg/manager/id"
2119
"github.com/pingcap/tiproxy/pkg/manager/logger"
2220
"github.com/pingcap/tiproxy/pkg/manager/memory"
2321
"github.com/pingcap/tiproxy/pkg/proxy/backend"
24-
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
2522
"github.com/pingcap/tiproxy/pkg/server/api"
2623
replaycmd "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
2724
mgrrp "github.com/pingcap/tiproxy/pkg/sqlreplay/manager"
@@ -60,6 +57,7 @@ func main() {
6057
replayerCount := rootCmd.PersistentFlags().Uint64("replayer-count", 1, "the total number of replayer instances running concurrently. Used only when dynamic-input is enabled.")
6158
replayerIndex := rootCmd.PersistentFlags().Uint64("replayer-index", 0, "the index of this replayer instance. Used only when dynamic-input is enabled.")
6259
outputPath := rootCmd.PersistentFlags().String("output-path", "", "the file path to store replayed sql. Empty indicates do not output replayed sql.")
60+
serviceMode := rootCmd.PersistentFlags().Bool("service-mode", false, "run replayer in service mode")
6361

6462
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
6563
// set up general managers
@@ -84,9 +82,9 @@ func main() {
8482
memMgr.Start(context.Background())
8583

8684
// create replay job manager
87-
hsHandler := newStaticHandshakeHandler(*addr)
85+
hsHandler := backend.NewStaticHandshakeHandler(*addr)
8886
idMgr := id.NewIDManager()
89-
r := mgrrp.NewJobManager(lg, cfg, &nopCertManager{}, idMgr, hsHandler)
87+
r := mgrrp.NewJobManager(lg, cfg, &nopCertManager{}, idMgr, hsHandler, true)
9088

9189
// start api server
9290
mgrs := api.Managers{
@@ -105,6 +103,7 @@ func main() {
105103

106104
// set up signal handler
107105
ctx, cancel := context.WithCancel(context.Background())
106+
closeCh := make(chan struct{})
108107
go func() {
109108
sc := make(chan os.Signal, 1)
110109
signal.Notify(sc,
@@ -118,34 +117,41 @@ func main() {
118117
r.Stop(mgrrp.CancelConfig{Type: mgrrp.Replay})
119118
case <-ctx.Done():
120119
}
120+
121+
close(closeCh)
121122
}()
122123

123-
// start replay
124-
replayCfg := replay.ReplayConfig{
125-
Input: *input,
126-
Speed: *speed,
127-
Username: *username,
128-
Password: *password,
129-
Format: *format,
130-
ReadOnly: *readonly,
131-
StartTime: time.Now(),
132-
CommandStartTime: *cmdStartTime,
133-
CommandEndTime: *cmdEndTime,
134-
IgnoreErrs: *ignoreErrs,
135-
BufSize: *bufSize,
136-
PSCloseStrategy: replaycmd.PSCloseStrategy(*psCloseStrategy),
137-
DryRun: *dryRun,
138-
CheckPointFilePath: *checkPointFilePath,
139-
DynamicInput: *dynamicInput,
140-
ReplayerCount: *replayerCount,
141-
ReplayerIndex: *replayerIndex,
142-
OutputPath: *outputPath,
143-
}
144-
if err := r.StartReplay(replayCfg); err != nil {
145-
cancel()
146-
return err
124+
if *serviceMode {
125+
// In this case, we didn't start any replay job. Just need to wait for the signal to exit.
126+
<-closeCh
127+
} else {
128+
// start replay
129+
replayCfg := replay.ReplayConfig{
130+
Input: *input,
131+
Speed: *speed,
132+
Username: *username,
133+
Password: *password,
134+
Format: *format,
135+
ReadOnly: *readonly,
136+
StartTime: time.Now(),
137+
CommandStartTime: *cmdStartTime,
138+
CommandEndTime: *cmdEndTime,
139+
IgnoreErrs: *ignoreErrs,
140+
BufSize: *bufSize,
141+
PSCloseStrategy: replaycmd.PSCloseStrategy(*psCloseStrategy),
142+
DryRun: *dryRun,
143+
CheckPointFilePath: *checkPointFilePath,
144+
DynamicInput: *dynamicInput,
145+
ReplayerCount: *replayerCount,
146+
ReplayerIndex: *replayerIndex,
147+
OutputPath: *outputPath,
148+
}
149+
if err := r.StartReplay(replayCfg); err != nil {
150+
cancel()
151+
return err
152+
}
153+
r.Wait()
147154
}
148-
r.Wait()
149155

150156
cancel()
151157
r.Close()
@@ -166,46 +172,6 @@ func (c *nopCertManager) SQLTLS() *tls.Config {
166172
return nil
167173
}
168174

169-
type staticHandshakeHandler struct {
170-
rt router.Router
171-
}
172-
173-
func newStaticHandshakeHandler(addr string) *staticHandshakeHandler {
174-
return &staticHandshakeHandler{
175-
rt: router.NewStaticRouter([]string{addr}),
176-
}
177-
}
178-
179-
func (handler *staticHandshakeHandler) HandleHandshakeResp(backend.ConnContext, *pnet.HandshakeResp) error {
180-
return nil
181-
}
182-
183-
func (handler *staticHandshakeHandler) HandleHandshakeErr(backend.ConnContext, *mysql.MyError) bool {
184-
return false
185-
}
186-
187-
func (handler *staticHandshakeHandler) GetRouter(backend.ConnContext, *pnet.HandshakeResp) (router.Router, error) {
188-
return handler.rt, nil
189-
}
190-
191-
func (handler *staticHandshakeHandler) OnHandshake(backend.ConnContext, string, error, backend.ErrorSource) {
192-
}
193-
194-
func (handler *staticHandshakeHandler) OnTraffic(backend.ConnContext) {
195-
}
196-
197-
func (handler *staticHandshakeHandler) OnConnClose(backend.ConnContext, backend.ErrorSource) error {
198-
return nil
199-
}
200-
201-
func (handler *staticHandshakeHandler) GetCapability() pnet.Capability {
202-
return backend.SupportedServerCapabilities
203-
}
204-
205-
func (handler *staticHandshakeHandler) GetServerVersion() string {
206-
return pnet.ServerVersion
207-
}
208-
209175
var _ api.ConfigManager = (*nopConfigManager)(nil)
210176

211177
type nopConfigManager struct {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package backend
5+
6+
import (
7+
"github.com/go-mysql-org/go-mysql/mysql"
8+
"github.com/pingcap/tiproxy/pkg/balance/router"
9+
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
10+
)
11+
12+
// StaticHandshakeHandler always returns a static router.
13+
var _ HandshakeHandler = (*StaticHandshakeHandler)(nil)
14+
15+
type StaticHandshakeHandler struct {
16+
rt router.Router
17+
}
18+
19+
// NewStaticHandshakeHandler creates a StaticHandshakeHandler.
20+
func NewStaticHandshakeHandler(addr string) *StaticHandshakeHandler {
21+
return &StaticHandshakeHandler{
22+
rt: router.NewStaticRouter([]string{addr}),
23+
}
24+
}
25+
26+
func (handler *StaticHandshakeHandler) HandleHandshakeResp(ConnContext, *pnet.HandshakeResp) error {
27+
return nil
28+
}
29+
30+
func (handler *StaticHandshakeHandler) HandleHandshakeErr(ConnContext, *mysql.MyError) bool {
31+
return false
32+
}
33+
34+
func (handler *StaticHandshakeHandler) GetRouter(ConnContext, *pnet.HandshakeResp) (router.Router, error) {
35+
return handler.rt, nil
36+
}
37+
38+
func (handler *StaticHandshakeHandler) OnHandshake(ConnContext, string, error, ErrorSource) {
39+
}
40+
41+
func (handler *StaticHandshakeHandler) OnTraffic(ConnContext) {
42+
}
43+
44+
func (handler *StaticHandshakeHandler) OnConnClose(ConnContext, ErrorSource) error {
45+
return nil
46+
}
47+
48+
func (handler *StaticHandshakeHandler) GetCapability() pnet.Capability {
49+
return SupportedServerCapabilities
50+
}
51+
52+
func (handler *StaticHandshakeHandler) GetServerVersion() string {
53+
return pnet.ServerVersion
54+
}

pkg/server/api/traffic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (h *Server) TrafficReplay(c *gin.Context) {
154154
cfg.ReplayerIndex = replayerIndex
155155
}
156156
cfg.OutputPath = c.PostForm("outputpath")
157+
cfg.Addr = c.PostForm("addr")
157158

158159
if err := h.mgr.ReplayJobMgr.StartReplay(cfg); err != nil {
159160
c.String(http.StatusInternalServerError, err.Error())

pkg/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error)
173173

174174
// setup capture and replay job manager
175175
{
176-
srv.replay = mgrrp.NewJobManager(lg.Named("replay"), srv.configManager.GetConfig(), srv.certManager, idMgr, hsHandler)
176+
srv.replay = mgrrp.NewJobManager(lg.Named("replay"), srv.configManager.GetConfig(), srv.certManager, idMgr, hsHandler, false)
177177
}
178178

179179
{

pkg/sqlreplay/manager/job.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ type replayJob4Marshal struct {
135135
Format string `json:"format,omitempty"`
136136
Speed float64 `json:"speed,omitempty"`
137137
ReadOnly bool `json:"readonly,omitempty"`
138+
Addr string `json:"addr,omitempty"`
138139
}
139140

140141
func (job *replayJob) Type() JobType {
@@ -152,6 +153,7 @@ func (job *replayJob) MarshalJSON() ([]byte, error) {
152153
Speed: job.cfg.Speed,
153154
ReadOnly: job.cfg.ReadOnly,
154155
Format: job.cfg.Format,
156+
Addr: job.cfg.Addr,
155157
}
156158
return json.Marshal(r)
157159
}

pkg/sqlreplay/manager/manager.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,25 @@ type JobManager interface {
4646
var _ JobManager = (*jobManager)(nil)
4747

4848
type jobManager struct {
49-
jobHistory []Job
50-
capture capture.Capture
51-
replay replay.Replay
52-
hsHandler backend.HandshakeHandler
53-
certManager CertManager
54-
cfg *config.Config
55-
lg *zap.Logger
49+
jobHistory []Job
50+
capture capture.Capture
51+
replay replay.Replay
52+
hsHandler backend.HandshakeHandler
53+
certManager CertManager
54+
cfg *config.Config
55+
lg *zap.Logger
56+
isStandalonePlayer bool
5657
}
5758

58-
func NewJobManager(lg *zap.Logger, cfg *config.Config, certMgr CertManager, idMgr *id.IDManager, hsHandler backend.HandshakeHandler) *jobManager {
59+
func NewJobManager(lg *zap.Logger, cfg *config.Config, certMgr CertManager, idMgr *id.IDManager, hsHandler backend.HandshakeHandler, isStandalonePlayer bool) *jobManager {
5960
return &jobManager{
60-
lg: lg,
61-
capture: capture.NewCapture(lg.Named("capture")),
62-
replay: replay.NewReplay(lg.Named("replay"), idMgr),
63-
hsHandler: hsHandler,
64-
cfg: cfg,
65-
certManager: certMgr,
61+
lg: lg,
62+
capture: capture.NewCapture(lg.Named("capture")),
63+
replay: replay.NewReplay(lg.Named("replay"), idMgr),
64+
hsHandler: hsHandler,
65+
cfg: cfg,
66+
certManager: certMgr,
67+
isStandalonePlayer: isStandalonePlayer,
6668
}
6769
}
6870

@@ -123,6 +125,15 @@ func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
123125
if running != nil {
124126
return errors.Errorf("a job is running: %s", running.String())
125127
}
128+
129+
if len(cfg.Addr) > 0 {
130+
if !jm.isStandalonePlayer {
131+
return errors.Errorf("Addr is not allowed in replay config in a TiProxy node")
132+
}
133+
// override the hsHandler
134+
jm.hsHandler = backend.NewStaticHandshakeHandler(cfg.Addr)
135+
}
136+
126137
newJob := &replayJob{
127138
job: job{
128139
// cfg.StartTime may act as the job ID in a TiProxy cluster.

pkg/sqlreplay/manager/manager_test.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
func TestStartAndStop(t *testing.T) {
20-
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil)
20+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil, false)
2121
defer mgr.Close()
2222
cpt, rep := &mockCapture{}, &mockReplay{}
2323
mgr.capture, mgr.replay = cpt, rep
@@ -64,7 +64,7 @@ func TestMarshalJobHistory(t *testing.T) {
6464
require.NoError(t, err)
6565
endTime, err := time.Parse("2006-01-02 15:04:05", "2020-01-01 02:01:01")
6666
require.NoError(t, err)
67-
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil)
67+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil, false)
6868
mgr.jobHistory = []Job{
6969
&captureJob{
7070
job: job{
@@ -104,6 +104,20 @@ func TestMarshalJobHistory(t *testing.T) {
104104
},
105105
lastCmdTs: endTime,
106106
},
107+
&replayJob{
108+
job: job{
109+
startTime: startTime,
110+
endTime: endTime,
111+
progress: 0,
112+
done: true,
113+
},
114+
cfg: replay.ReplayConfig{
115+
Input: "/tmp/traffic",
116+
Username: "root",
117+
Addr: "127.0.0.100:10000",
118+
},
119+
lastCmdTs: endTime,
120+
},
107121
}
108122
t.Log(mgr.Jobs())
109123
require.Equal(t, `[
@@ -136,12 +150,23 @@ func TestMarshalJobHistory(t *testing.T) {
136150
"input": "/tmp/traffic",
137151
"username": "root",
138152
"speed": 0.5
153+
},
154+
{
155+
"type": "replay",
156+
"status": "done",
157+
"start_time": "2020-01-01T00:00:00Z",
158+
"end_time": "2020-01-01T02:01:01Z",
159+
"progress": "0%",
160+
"last_cmd_ts": "2020-01-01T02:01:01Z",
161+
"input": "/tmp/traffic",
162+
"username": "root",
163+
"addr": "127.0.0.100:10000"
139164
}
140165
]`, mgr.Jobs())
141166
}
142167

143168
func TestHistoryLen(t *testing.T) {
144-
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, nil, nil)
169+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, nil, nil, false)
145170
defer mgr.Close()
146171
cpt, rep := &mockCapture{}, &mockReplay{}
147172
mgr.capture, mgr.replay = cpt, rep
@@ -154,3 +179,22 @@ func TestHistoryLen(t *testing.T) {
154179
require.Len(t, mgr.jobHistory, expectedLen)
155180
}
156181
}
182+
183+
func TestAllowAddrOnlyForStandaloneService(t *testing.T) {
184+
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil, false)
185+
defer mgr.Close()
186+
187+
err := mgr.StartReplay(replay.ReplayConfig{
188+
Addr: "127.0.0.100:10000",
189+
})
190+
require.ErrorContains(t, err, "Addr is not allowed in replay config in a TiProxy node")
191+
192+
mgr2 := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, id.NewIDManager(), nil, true)
193+
defer mgr2.Close()
194+
mgr2.capture, mgr2.replay = &mockCapture{}, &mockReplay{}
195+
196+
err = mgr2.StartReplay(replay.ReplayConfig{
197+
Addr: "127.0.0.100:10000",
198+
})
199+
require.NoError(t, err)
200+
}

pkg/sqlreplay/replay/replay.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ type ReplayConfig struct {
120120
ReplayerIndex uint64
121121
// OutputPath is the path to output replayed sql.
122122
OutputPath string
123+
// Addr is the downstream address to connect to
124+
Addr string
123125
// the following fields are for testing
124126
readers []cmd.LineReader
125127
report report.Report

0 commit comments

Comments
 (0)