@@ -22,7 +22,8 @@ import (
22
22
"github.com/openark/golib/sqlutils"
23
23
)
24
24
25
- const startSlavePostWaitMilliseconds = 500 * time .Millisecond
25
+ const startReplicationPostWait = 250 * time .Millisecond
26
+ const startReplicationMaxWait = 2 * time .Second
26
27
27
28
// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
28
29
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
@@ -302,12 +303,50 @@ func (this *Inspector) restartReplication() error {
302
303
if startError != nil {
303
304
return startError
304
305
}
305
- time .Sleep (startSlavePostWaitMilliseconds )
306
+
307
+ // loop until replication is running unless we hit a max timeout.
308
+ startTime := time .Now ()
309
+ for {
310
+ replicationRunning , err := this .validateReplicationRestarted ()
311
+ if err != nil {
312
+ return fmt .Errorf ("Failed to validate if replication had been restarted: %w" , err )
313
+ }
314
+ if replicationRunning {
315
+ break
316
+ }
317
+ if time .Since (startTime ) > startReplicationMaxWait {
318
+ return fmt .Errorf ("Replication did not restart within the maximum wait time of %s" , startReplicationMaxWait )
319
+ }
320
+ this .migrationContext .Log .Debugf ("Replication not yet restarted, waiting..." )
321
+ time .Sleep (startReplicationPostWait )
322
+ }
306
323
307
324
this .migrationContext .Log .Debugf ("Replication restarted" )
308
325
return nil
309
326
}
310
327
328
+ // validateReplicationRestarted checks that the Slave_IO_Running and Slave_SQL_Running are both 'Yes'
329
+ // returns true if both are 'Yes', false otherwise
330
+ func (this * Inspector ) validateReplicationRestarted () (bool , error ) {
331
+ errNotRunning := fmt .Errorf ("Replication not running on %s" , this .connectionConfig .Key .String ())
332
+ query := `show /* gh-ost */ slave status`
333
+ err := sqlutils .QueryRowsMap (this .db , query , func (rowMap sqlutils.RowMap ) error {
334
+ if rowMap .GetString ("Slave_IO_Running" ) != "Yes" || rowMap .GetString ("Slave_SQL_Running" ) != "Yes" {
335
+ return errNotRunning
336
+ }
337
+ return nil
338
+ })
339
+
340
+ if err != nil {
341
+ // If the error is that replication is not running, return that and not an error
342
+ if errors .Is (err , errNotRunning ) {
343
+ return false , nil
344
+ }
345
+ return false , err
346
+ }
347
+ return true , nil
348
+ }
349
+
311
350
// applyBinlogFormat sets ROW binlog format and restarts replication to make
312
351
// the replication thread apply it.
313
352
func (this * Inspector ) applyBinlogFormat () error {
0 commit comments