Skip to content

Commit 28f5b36

Browse files
authored
Merge branch 'master' into dep-updates-dec2023
2 parents d77e619 + aa010a0 commit 28f5b36

11 files changed

+180
-6
lines changed

.github/CODEOWNERS

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
* @dm-2 @rashiq @meiji163 @timvaillancourt
1+
* @rashiq @meiji163 @timvaillancourt

doc/command-line-flags.md

+3
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ But RocksDB currently lacks a few features support compared to InnoDB:
261261

262262
When `--storage-engine=rocksdb`, `gh-ost` will make some changes necessary (e.g. sets isolation level to `READ_COMMITTED`) to support RocksDB.
263263

264+
### charset
265+
The default charset for the database connection is utf8mb4, utf8, latin1. The ability to specify character set and collation is supported, eg: utf8mb4_general_ci,utf8_general_ci,latin1.
266+
264267
### test-on-replica
265268

266269
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)

doc/interactive-commands.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
1717
- `help`: shows a brief list of available commands
1818
- `status`: returns a detailed status summary of migration progress and configuration
1919
- `sup`: returns a brief status summary of migration progress
20+
- `cpu-profile`: returns a base64-encoded [`runtime/pprof`](https://pkg.go.dev/runtime/pprof) CPU profile using a duration, default: `30s`. Comma-separated options `gzip` and/or `block` (blocked profile) may follow the profile duration
2021
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
2122
- `applier`: returns the hostname of the applier
2223
- `inspector`: returns the hostname of the inspector

go/base/context.go

+9
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,15 @@ func (this *MigrationContext) SetConnectionConfig(storageEngine string) error {
305305
return nil
306306
}
307307

308+
func (this *MigrationContext) SetConnectionCharset(charset string) {
309+
if charset == "" {
310+
charset = "utf8mb4,utf8,latin1"
311+
}
312+
313+
this.InspectorConnectionConfig.Charset = charset
314+
this.ApplierConnectionConfig.Charset = charset
315+
}
316+
308317
func getSafeTableName(baseName string, suffix string) string {
309318
name := fmt.Sprintf("_%s_%s", baseName, suffix)
310319
if len(name) <= mysql.MaxTableNameLength {

go/cmd/gh-ost/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func main() {
5757
flag.StringVar(&migrationContext.CliMasterPassword, "master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host")
5858
flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file")
5959
askPass := flag.Bool("ask-pass", false, "prompt for MySQL password")
60+
charset := flag.String("charset", "utf8mb4,utf8,latin1", "The default charset for the database connection is utf8mb4, utf8, latin1.")
6061

6162
flag.BoolVar(&migrationContext.UseTLS, "ssl", false, "Enable SSL encrypted connections to MySQL hosts")
6263
flag.StringVar(&migrationContext.TLSCACertificate, "ssl-ca", "", "CA certificate in PEM format for TLS connections to MySQL hosts. Requires --ssl")
@@ -191,6 +192,8 @@ func main() {
191192
migrationContext.Log.Fatale(err)
192193
}
193194

195+
migrationContext.SetConnectionCharset(*charset)
196+
194197
if migrationContext.AlterStatement == "" {
195198
log.Fatal("--alter must be provided and statement must not be empty")
196199
}

go/logic/migrator.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,14 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10421042
)
10431043
w := io.MultiWriter(writers...)
10441044
fmt.Fprintln(w, status)
1045-
this.migrationContext.Log.Infof(status)
1045+
1046+
// This "hack" is required here because the underlying logging library
1047+
// github.com/outbrain/golib/log provides two functions Info and Infof; but the arguments of
1048+
// both these functions are eventually redirected to the same function, which internally calls
1049+
// fmt.Sprintf. So, the argument of every function called on the DefaultLogger object
1050+
// migrationContext.Log will eventually pass through fmt.Sprintf, and thus the '%' character
1051+
// needs to be escaped.
1052+
this.migrationContext.Log.Info(strings.Replace(status, "%", "%%", 1))
10461053

10471054
hooksStatusIntervalSec := this.migrationContext.HooksStatusIntervalSec
10481055
if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 {

go/logic/server.go

+69
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,30 @@ package logic
77

88
import (
99
"bufio"
10+
"bytes"
11+
"compress/gzip"
12+
"encoding/base64"
13+
"errors"
1014
"fmt"
1115
"io"
1216
"net"
1317
"os"
18+
"runtime"
19+
"runtime/pprof"
1420
"strconv"
1521
"strings"
1622
"sync/atomic"
23+
"time"
1724

1825
"github.com/github/gh-ost/go/base"
1926
)
2027

28+
var (
29+
ErrCPUProfilingBadOption = errors.New("unrecognized cpu profiling option")
30+
ErrCPUProfilingInProgress = errors.New("cpu profiling already in progress")
31+
defaultCPUProfileDuration = time.Second * 30
32+
)
33+
2134
type printStatusFunc func(PrintStatusRule, io.Writer)
2235

2336
// Server listens for requests on a socket file or via TCP
@@ -27,6 +40,7 @@ type Server struct {
2740
tcpListener net.Listener
2841
hooksExecutor *HooksExecutor
2942
printStatus printStatusFunc
43+
isCPUProfiling int64
3044
}
3145

3246
func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
@@ -37,6 +51,54 @@ func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExec
3751
}
3852
}
3953

54+
func (this *Server) runCPUProfile(args string) (io.Reader, error) {
55+
duration := defaultCPUProfileDuration
56+
57+
var err error
58+
var blockProfile, useGzip bool
59+
if args != "" {
60+
s := strings.Split(args, ",")
61+
// a duration string must be the 1st field, if any
62+
if duration, err = time.ParseDuration(s[0]); err != nil {
63+
return nil, err
64+
}
65+
for _, arg := range s[1:] {
66+
switch arg {
67+
case "block", "blocked", "blocking":
68+
blockProfile = true
69+
case "gzip":
70+
useGzip = true
71+
default:
72+
return nil, ErrCPUProfilingBadOption
73+
}
74+
}
75+
}
76+
77+
if atomic.LoadInt64(&this.isCPUProfiling) > 0 {
78+
return nil, ErrCPUProfilingInProgress
79+
}
80+
atomic.StoreInt64(&this.isCPUProfiling, 1)
81+
defer atomic.StoreInt64(&this.isCPUProfiling, 0)
82+
83+
var buf bytes.Buffer
84+
var writer io.Writer = &buf
85+
if blockProfile {
86+
runtime.SetBlockProfileRate(1)
87+
defer runtime.SetBlockProfileRate(0)
88+
}
89+
if useGzip {
90+
writer = gzip.NewWriter(writer)
91+
}
92+
if err = pprof.StartCPUProfile(writer); err != nil {
93+
return nil, err
94+
}
95+
96+
time.Sleep(duration)
97+
pprof.StopCPUProfile()
98+
this.migrationContext.Log.Infof("Captured %d byte runtime/pprof CPU profile (gzip=%v)", buf.Len(), useGzip)
99+
return &buf, nil
100+
}
101+
40102
func (this *Server) BindSocketFile() (err error) {
41103
if this.migrationContext.ServeSocketFile == "" {
42104
return nil
@@ -144,6 +206,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
144206
fmt.Fprint(writer, `available commands:
145207
status # Print a detailed status message
146208
sup # Print a short status message
209+
cpu-profile=<options> # Print a base64-encoded runtime/pprof CPU profile using a duration, default: 30s. Comma-separated options 'gzip' and/or 'block' (blocked profile) may follow the profile duration
147210
coordinates # Print the currently inspected coordinates
148211
applier # Print the hostname of the applier
149212
inspector # Print the hostname of the inspector
@@ -169,6 +232,12 @@ help # This message
169232
return ForcePrintStatusOnlyRule, nil
170233
case "info", "status":
171234
return ForcePrintStatusAndHintRule, nil
235+
case "cpu-profile":
236+
cpuProfile, err := this.runCPUProfile(arg)
237+
if err == nil {
238+
fmt.Fprint(base64.NewEncoder(base64.StdEncoding, writer), cpuProfile)
239+
}
240+
return NoPrintStatusRule, err
172241
case "coordinates":
173242
{
174243
if argIsQuestion || arg == "" {

go/logic/server_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package logic
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/github/gh-ost/go/base"
8+
"github.com/openark/golib/tests"
9+
)
10+
11+
func TestServerRunCPUProfile(t *testing.T) {
12+
t.Parallel()
13+
14+
t.Run("failed already running", func(t *testing.T) {
15+
s := &Server{isCPUProfiling: 1}
16+
profile, err := s.runCPUProfile("15ms")
17+
tests.S(t).ExpectEquals(err, ErrCPUProfilingInProgress)
18+
tests.S(t).ExpectEquals(profile, nil)
19+
})
20+
21+
t.Run("failed bad duration", func(t *testing.T) {
22+
s := &Server{isCPUProfiling: 0}
23+
profile, err := s.runCPUProfile("should-fail")
24+
tests.S(t).ExpectNotNil(err)
25+
tests.S(t).ExpectEquals(profile, nil)
26+
})
27+
28+
t.Run("failed bad option", func(t *testing.T) {
29+
s := &Server{isCPUProfiling: 0}
30+
profile, err := s.runCPUProfile("10ms,badoption")
31+
tests.S(t).ExpectEquals(err, ErrCPUProfilingBadOption)
32+
tests.S(t).ExpectEquals(profile, nil)
33+
})
34+
35+
t.Run("success", func(t *testing.T) {
36+
s := &Server{
37+
isCPUProfiling: 0,
38+
migrationContext: base.NewMigrationContext(),
39+
}
40+
defaultCPUProfileDuration = time.Millisecond * 10
41+
profile, err := s.runCPUProfile("")
42+
tests.S(t).ExpectNil(err)
43+
tests.S(t).ExpectNotEquals(profile, nil)
44+
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
45+
})
46+
47+
t.Run("success with block", func(t *testing.T) {
48+
s := &Server{
49+
isCPUProfiling: 0,
50+
migrationContext: base.NewMigrationContext(),
51+
}
52+
profile, err := s.runCPUProfile("10ms,block")
53+
tests.S(t).ExpectNil(err)
54+
tests.S(t).ExpectNotEquals(profile, nil)
55+
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
56+
})
57+
58+
t.Run("success with block and gzip", func(t *testing.T) {
59+
s := &Server{
60+
isCPUProfiling: 0,
61+
migrationContext: base.NewMigrationContext(),
62+
}
63+
profile, err := s.runCPUProfile("10ms,block,gzip")
64+
tests.S(t).ExpectNil(err)
65+
tests.S(t).ExpectNotEquals(profile, nil)
66+
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
67+
})
68+
}

go/logic/streamer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
193193
} else {
194194
successiveFailures = 0
195195
}
196-
if successiveFailures > this.migrationContext.MaxRetries() {
196+
if successiveFailures >= this.migrationContext.MaxRetries() {
197197
return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, this.GetReconnectBinlogCoordinates())
198198
}
199199

go/mysql/connection.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type ConnectionConfig struct {
3030
tlsConfig *tls.Config
3131
Timeout float64
3232
TransactionIsolation string
33+
Charset string
3334
}
3435

3536
func NewConnectionConfig() *ConnectionConfig {
@@ -49,6 +50,7 @@ func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionC
4950
tlsConfig: this.tlsConfig,
5051
Timeout: this.Timeout,
5152
TransactionIsolation: this.TransactionIsolation,
53+
Charset: this.Charset,
5254
}
5355
config.ImpliedKey = &config.Key
5456
return config
@@ -122,10 +124,15 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
122124
if this.tlsConfig != nil {
123125
tlsOption = TLS_CONFIG_KEY
124126
}
127+
128+
if this.Charset == "" {
129+
this.Charset = "utf8mb4,utf8,latin1"
130+
}
131+
125132
connectionParams := []string{
126133
"autocommit=true",
127-
"charset=utf8mb4,utf8,latin1",
128134
"interpolateParams=true",
135+
fmt.Sprintf("charset=%s", this.Charset),
129136
fmt.Sprintf("tls=%s", tlsOption),
130137
fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation),
131138
fmt.Sprintf("timeout=%fs", this.Timeout),

go/mysql/connection_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func TestNewConnectionConfig(t *testing.T) {
3030
test.S(t).ExpectEquals(c.User, "")
3131
test.S(t).ExpectEquals(c.Password, "")
3232
test.S(t).ExpectEquals(c.TransactionIsolation, "")
33+
test.S(t).ExpectEquals(c.Charset, "")
3334
}
3435

3536
func TestDuplicateCredentials(t *testing.T) {
@@ -42,6 +43,7 @@ func TestDuplicateCredentials(t *testing.T) {
4243
ServerName: "feathers",
4344
}
4445
c.TransactionIsolation = transactionIsolation
46+
c.Charset = "utf8mb4"
4547

4648
dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310})
4749
test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost")
@@ -52,6 +54,7 @@ func TestDuplicateCredentials(t *testing.T) {
5254
test.S(t).ExpectEquals(dup.Password, "penguin")
5355
test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig)
5456
test.S(t).ExpectEquals(dup.TransactionIsolation, c.TransactionIsolation)
57+
test.S(t).ExpectEquals(dup.Charset, c.Charset)
5558
}
5659

5760
func TestDuplicate(t *testing.T) {
@@ -60,6 +63,7 @@ func TestDuplicate(t *testing.T) {
6063
c.User = "gromit"
6164
c.Password = "penguin"
6265
c.TransactionIsolation = transactionIsolation
66+
c.Charset = "utf8mb4"
6367

6468
dup := c.Duplicate()
6569
test.S(t).ExpectEquals(dup.Key.Hostname, "myhost")
@@ -69,6 +73,7 @@ func TestDuplicate(t *testing.T) {
6973
test.S(t).ExpectEquals(dup.User, "gromit")
7074
test.S(t).ExpectEquals(dup.Password, "penguin")
7175
test.S(t).ExpectEquals(dup.TransactionIsolation, transactionIsolation)
76+
test.S(t).ExpectEquals(dup.Charset, "utf8mb4")
7277
}
7378

7479
func TestGetDBUri(t *testing.T) {
@@ -78,9 +83,10 @@ func TestGetDBUri(t *testing.T) {
7883
c.Password = "penguin"
7984
c.Timeout = 1.2345
8085
c.TransactionIsolation = transactionIsolation
86+
c.Charset = "utf8mb4,utf8,latin1"
8187

8288
uri := c.GetDBUri("test")
83-
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
89+
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
8490
}
8591

8692
func TestGetDBUriWithTLSSetup(t *testing.T) {
@@ -91,7 +97,8 @@ func TestGetDBUriWithTLSSetup(t *testing.T) {
9197
c.Timeout = 1.2345
9298
c.tlsConfig = &tls.Config{}
9399
c.TransactionIsolation = transactionIsolation
100+
c.Charset = "utf8mb4_general_ci,utf8_general_ci,latin1"
94101

95102
uri := c.GetDBUri("test")
96-
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
103+
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
97104
}

0 commit comments

Comments
 (0)