@@ -52,11 +52,12 @@ type Applier struct {
52
52
rowCopyCompleteFlag int64
53
53
// copyRowsQueue should not be buffered; if buffered some non-damaging but
54
54
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
55
- copyRowsQueue chan * dumpEntry
56
- applyDataEntryQueue chan * binlog.BinlogEntry
57
- applyBinlogTxQueue chan * binlog.BinlogTx
58
- applyBinlogGroupTxQueue chan []* binlog.BinlogTx
59
- lastAppliedBinlogTx * binlog.BinlogTx
55
+ copyRowsQueue chan * dumpEntry
56
+ applyDataEntryQueue chan * binlog.BinlogEntry
57
+ applyGrouDataEntrypQueue chan []* binlog.BinlogEntry
58
+ applyBinlogTxQueue chan * binlog.BinlogTx
59
+ applyBinlogGroupTxQueue chan []* binlog.BinlogTx
60
+ lastAppliedBinlogTx * binlog.BinlogTx
60
61
61
62
natsConn * gonats.Conn
62
63
waitCh chan * models.WaitResult
@@ -84,6 +85,7 @@ func NewApplier(subject, tp string, cfg *config.MySQLDriverConfig, logger *log.L
84
85
allEventsUpToLockProcessed : make (chan string ),
85
86
copyRowsQueue : make (chan * dumpEntry , cfg .ReplChanBufferSize ),
86
87
applyDataEntryQueue : make (chan * binlog.BinlogEntry , cfg .ReplChanBufferSize ),
88
+ applyGrouDataEntrypQueue : make (chan []* binlog.BinlogEntry , cfg .ReplChanBufferSize ),
87
89
applyBinlogTxQueue : make (chan * binlog.BinlogTx , cfg .ReplChanBufferSize ),
88
90
applyBinlogGroupTxQueue : make (chan []* binlog.BinlogTx , cfg .ReplChanBufferSize ),
89
91
waitCh : make (chan * models.WaitResult , 1 ),
@@ -224,8 +226,9 @@ func (a *Applier) Run() {
224
226
if err != nil {
225
227
a .onError (TaskStateDead , err )
226
228
}
229
+
227
230
for {
228
- if completeFlag != "" {
231
+ if completeFlag != "" && a . rowCopyCompleteFlag == 1 {
229
232
switch completeFlag {
230
233
case "0" :
231
234
a .onError (TaskStateComplete , nil )
@@ -638,6 +641,7 @@ func (a *Applier) executeWriteFuncs() {
638
641
if rowCount == string (a .applyRowCount ) {
639
642
a .logger .Printf ("mysql.applier: Rows copy complete.number of rows:%d" , a .applyRowCount )
640
643
a .rowCopyComplete <- true
644
+ atomic .StoreInt64 (& a .rowCopyCompleteFlag , 1 )
641
645
break
642
646
}
643
647
if a .shutdown {
@@ -651,17 +655,31 @@ func (a *Applier) executeWriteFuncs() {
651
655
OUTER:
652
656
for {
653
657
select {
654
- case binlogEntry := <- a .applyDataEntryQueue :
658
+ case groupEntry := <- a .applyGrouDataEntrypQueue :
655
659
{
656
- if nil == binlogEntry {
660
+ if len ( groupEntry ) == 0 {
657
661
continue
658
662
}
659
663
/*if a.mysqlContext.MySQLServerUuid == binlogEntry.Coordinates.OSID {
660
664
continue
661
665
}*/
662
- if err := a .ApplyBinlogEvent (a .db , binlogEntry ); err != nil {
663
- a .onError (TaskStateDead , err )
664
- break OUTER
666
+
667
+ for idx , binlogEntry := range groupEntry {
668
+ dbApplier = a .dbs [idx % a .mysqlContext .ParallelWorkers ]
669
+ //go func(entry *binlog.BinlogEntry) {
670
+ //a.wg.Add(1)
671
+ if err := a .ApplyBinlogEvent (dbApplier , binlogEntry ); err != nil {
672
+ a .onError (TaskStateDead , err )
673
+ }
674
+ //a.wg.Done()
675
+ //}(binlogEntry)
676
+ }
677
+ //a.wg.Wait() // Waiting for all goroutines to finish
678
+
679
+ //a.logger.Debugf("mysql.applier: apply binlogEntry: %+v", groupEntry[len(groupEntry)-1].Coordinates.GNO)
680
+
681
+ if ! a .shutdown {
682
+ a .mysqlContext .Gtid = fmt .Sprintf ("%s:1-%d" , groupEntry [len (groupEntry )- 1 ].Coordinates .SID , groupEntry [len (groupEntry )- 1 ].Coordinates .GNO )
665
683
}
666
684
}
667
685
case groupTx := <- a .applyBinlogGroupTxQueue :
@@ -743,7 +761,7 @@ func (a *Applier) initiateStreaming() error {
743
761
if err := Decode (m .Data , & binlogEntry ); err != nil {
744
762
a .onError (TaskStateDead , err )
745
763
}
746
- //a.logger.Debugf("mysql.applier: received binlogEntry: %+v", binlogEntry.Coordinates.GNO)
764
+ //a.logger.Debugf("mysql.applier: received binlogEntry GNO : %+v,LastCommitted:%+v ", binlogEntry.Coordinates.GNO,binlogEntry.Coordinates.LastCommitted )
747
765
a .applyDataEntryQueue <- binlogEntry
748
766
a .currentCoordinates .RetrievedGtidSet = fmt .Sprintf ("%s:%d" , binlogEntry .Coordinates .SID , binlogEntry .Coordinates .GNO )
749
767
@@ -754,6 +772,48 @@ func (a *Applier) initiateStreaming() error {
754
772
if err != nil {
755
773
return err
756
774
}
775
+
776
+ go func () {
777
+ var lastCommitted int64
778
+ //timeout := time.After(100 * time.Millisecond)
779
+ groupEntry := []* binlog.BinlogEntry {}
780
+ OUTER:
781
+ for {
782
+ select {
783
+ case binlogEntry := <- a .applyDataEntryQueue :
784
+ if nil == binlogEntry {
785
+ continue
786
+ }
787
+ /*if a.mysqlContext.MySQLServerUuid == binlogTx.SID {
788
+ continue
789
+ }*/
790
+ if a .mysqlContext .ParallelWorkers <= 1 {
791
+ if err := a .ApplyBinlogEvent (a .dbs [0 ], binlogEntry ); err != nil {
792
+ a .onError (TaskStateDead , err )
793
+ break OUTER
794
+ }
795
+ } else {
796
+ if binlogEntry .Coordinates .LastCommitted == lastCommitted {
797
+ groupEntry = append (groupEntry , binlogEntry )
798
+ } else {
799
+ if len (groupEntry ) != 0 {
800
+ a .applyGrouDataEntrypQueue <- groupEntry
801
+ groupEntry = []* binlog.BinlogEntry {}
802
+ }
803
+ groupEntry = append (groupEntry , binlogEntry )
804
+ }
805
+ lastCommitted = binlogEntry .Coordinates .LastCommitted
806
+ }
807
+ case <- time .After (100 * time .Millisecond ):
808
+ if len (groupEntry ) != 0 {
809
+ a .applyGrouDataEntrypQueue <- groupEntry
810
+ groupEntry = []* binlog.BinlogEntry {}
811
+ }
812
+ case <- a .shutdownCh :
813
+ break OUTER
814
+ }
815
+ }
816
+ }()
757
817
} else {
758
818
_ , err := a .natsConn .Subscribe (fmt .Sprintf ("%s_incr" , a .subject ), func (m * gonats.Msg ) {
759
819
var binlogTx []* binlog.BinlogTx
@@ -1477,33 +1537,79 @@ func (a *Applier) getSharedColumns(originalColumns, columns *umconf.ColumnList,
1477
1537
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
1478
1538
// event entry on the original table.
1479
1539
func (a * Applier ) buildDMLEventQuery (dmlEvent binlog.DataEvent ) (query string , args []interface {}, rowsDelta int64 , err error ) {
1480
- destTableColumns , _ , err := a .InspectTableColumnsAndUniqueKeys (dmlEvent .DatabaseName , dmlEvent .TableName )
1481
- if err != nil {
1482
- return "" , args , 0 , err
1540
+ /*var destTableColumns *umconf.ColumnList
1541
+ if len(a.mysqlContext.ReplicateDoDb) ==0 {
1542
+ tableColumns, _, err := a.InspectTableColumnsAndUniqueKeys(dmlEvent.DatabaseName, dmlEvent.TableName)
1543
+ if err != nil {
1544
+ return "", args, 0, err
1545
+ }
1546
+ tb:=&config.Table{
1547
+ TableSchema:dmlEvent.DatabaseName,
1548
+ TableName:dmlEvent.TableName,
1549
+ OriginalTableColumns:tableColumns,
1550
+ }
1551
+ db:= &config.DataSource{
1552
+ TableSchema:dmlEvent.DatabaseName,
1553
+ Tables:[]*config.Table{tb},
1554
+ }
1555
+ a.mysqlContext.ReplicateDoDb = append(a.mysqlContext.ReplicateDoDb,db)
1556
+ }else{
1557
+ L:
1558
+ for _,db:=range a.mysqlContext.ReplicateDoDb{
1559
+ if db.TableSchema != dmlEvent.DatabaseName {
1560
+ continue
1561
+ }
1562
+ for _,tb:=range db.Tables {
1563
+ if tb.TableName == dmlEvent.TableName && tb.OriginalTableColumns!=nil{
1564
+ break L
1565
+ }
1566
+ }
1567
+ tableColumns, _, err := a.InspectTableColumnsAndUniqueKeys(dmlEvent.DatabaseName, dmlEvent.TableName)
1568
+ if err != nil {
1569
+ return "", args, 0, err
1570
+ }
1571
+ tb:=&config.Table{
1572
+ TableSchema:dmlEvent.DatabaseName,
1573
+ TableName:dmlEvent.TableName,
1574
+ OriginalTableColumns:tableColumns,
1575
+ }
1576
+ db.Tables = append(db.Tables,tb)
1577
+ }
1483
1578
}
1484
- /*_, err = getSharedUniqueKeys(destTableUniqueKeys, destTableUniqueKeys)
1485
- if err != nil {
1486
- return "", args, 0, err
1487
- }*/
1488
- /*if len(sharedUniqueKeys) == 0 {
1489
- return "", args, 0, fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
1579
+ for _,db:=range a.mysqlContext.ReplicateDoDb{
1580
+ if db.TableSchema == dmlEvent.DatabaseName {
1581
+ for _,tb:=range db.Tables {
1582
+ if tb.TableName == dmlEvent.TableName {
1583
+ destTableColumns = tb.OriginalTableColumns
1584
+ }
1585
+ }
1586
+ }
1587
+ }
1588
+ if destTableColumns ==nil{
1589
+ destTableColumns, _, err = a.InspectTableColumnsAndUniqueKeys(dmlEvent.DatabaseName, dmlEvent.TableName)
1590
+ if err != nil {
1591
+ return "", args, 0, err
1592
+ }
1490
1593
}*/
1491
- sharedColumns , mappedSharedColumns := a .getSharedColumns (destTableColumns , destTableColumns , a .parser .GetNonTrivialRenames ())
1492
- //a.logger.Printf("mysql.applier: shared columns are %s", sharedColumns)
1594
+
1493
1595
switch dmlEvent .DML {
1494
1596
case binlog .DeleteDML :
1495
1597
{
1496
- query , uniqueKeyArgs , err := sql .BuildDMLDeleteQuery (dmlEvent .DatabaseName , dmlEvent .TableName , destTableColumns , destTableColumns , dmlEvent .WhereColumnValues .GetAbstractValues ())
1598
+ tableColumns , err := base .GetTableColumns (a .db , dmlEvent .DatabaseName , dmlEvent .TableName )
1599
+ query , uniqueKeyArgs , err := sql .BuildDMLDeleteQuery (dmlEvent .DatabaseName , dmlEvent .TableName , tableColumns , dmlEvent .WhereColumnValues .GetAbstractValues ())
1497
1600
return query , uniqueKeyArgs , - 1 , err
1498
1601
}
1499
1602
case binlog .InsertDML :
1500
1603
{
1501
- query , sharedArgs , err := sql .BuildDMLInsertQuery (dmlEvent .DatabaseName , dmlEvent .TableName , destTableColumns , sharedColumns , mappedSharedColumns , dmlEvent .NewColumnValues .GetAbstractValues ())
1604
+ //query, sharedArgs,err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, dmlEvent.TableName,dmlEvent.ColumnCount,dmlEvent.NewColumnValues)
1605
+ tableColumns , err := base .GetTableColumns (a .db , dmlEvent .DatabaseName , dmlEvent .TableName )
1606
+ query , sharedArgs , err := sql .BuildDMLInsertQuery (dmlEvent .DatabaseName , dmlEvent .TableName , tableColumns , tableColumns , tableColumns , dmlEvent .NewColumnValues )
1502
1607
return query , sharedArgs , 1 , err
1503
1608
}
1504
1609
case binlog .UpdateDML :
1505
1610
{
1506
- query , sharedArgs , uniqueKeyArgs , err := sql .BuildDMLUpdateQuery (dmlEvent .DatabaseName , dmlEvent .TableName , destTableColumns , sharedColumns , mappedSharedColumns , destTableColumns , dmlEvent .NewColumnValues .GetAbstractValues (), dmlEvent .WhereColumnValues .GetAbstractValues ())
1611
+ tableColumns , err := base .GetTableColumns (a .db , dmlEvent .DatabaseName , dmlEvent .TableName )
1612
+ query , sharedArgs , uniqueKeyArgs , err := sql .BuildDMLUpdateQuery (dmlEvent .DatabaseName , dmlEvent .TableName , tableColumns , tableColumns , tableColumns , tableColumns , dmlEvent .NewColumnValues [0 ].GetAbstractValues (), dmlEvent .WhereColumnValues .GetAbstractValues ())
1507
1613
args = append (args , sharedArgs ... )
1508
1614
args = append (args , uniqueKeyArgs ... )
1509
1615
return query , args , 0 , err
@@ -1513,18 +1619,19 @@ func (a *Applier) buildDMLEventQuery(dmlEvent binlog.DataEvent) (query string, a
1513
1619
}
1514
1620
1515
1621
// ApplyEventQueries applies multiple DML queries onto the dest table
1516
- func (a * Applier ) ApplyBinlogEvent (db * gosql .DB , binlogEntry * binlog.BinlogEntry ) error {
1622
+ func (a * Applier ) ApplyBinlogEvent (dbApplier * sql .DB , binlogEntry * binlog.BinlogEntry ) error {
1517
1623
var totalDelta int64
1518
1624
1519
- interval , err := base .SelectGtidExecuted (db , binlogEntry .Coordinates .SID , binlogEntry .Coordinates .GNO )
1625
+ interval , err := base .SelectGtidExecuted (dbApplier . Db , binlogEntry .Coordinates .SID , binlogEntry .Coordinates .GNO )
1520
1626
if err != nil {
1521
1627
return err
1522
1628
}
1523
1629
if interval == "" {
1524
1630
return nil
1525
1631
}
1526
1632
1527
- tx , err := db .Begin ()
1633
+ dbApplier .DbMutex .Lock ()
1634
+ tx , err := dbApplier .Db .Begin ()
1528
1635
if err != nil {
1529
1636
return err
1530
1637
}
@@ -1538,6 +1645,7 @@ func (a *Applier) ApplyBinlogEvent(db *gosql.DB, binlogEntry *binlog.BinlogEntry
1538
1645
a .currentCoordinates .ExecutedGtidSet = fmt .Sprintf ("%s:%d" , binlogEntry .Coordinates .SID , binlogEntry .Coordinates .GNO )
1539
1646
a .mysqlContext .Gtid = fmt .Sprintf ("%s:1-%d" , binlogEntry .Coordinates .SID , binlogEntry .Coordinates .GNO )
1540
1647
}
1648
+ dbApplier .DbMutex .Unlock ()
1541
1649
}()
1542
1650
sessionQuery := `SET @@session.foreign_key_checks = 0`
1543
1651
if _ , err := tx .Exec (sessionQuery ); err != nil {
@@ -1573,6 +1681,15 @@ func (a *Applier) ApplyBinlogEvent(db *gosql.DB, binlogEntry *binlog.BinlogEntry
1573
1681
a .logger .Warnf ("mysql.applier: Ignore error: %v" , err )
1574
1682
}
1575
1683
}
1684
+ /*for _,db:=range a.mysqlContext.ReplicateDoDb{
1685
+ for _,tb:=range db.Tables {
1686
+ tableColumns, _, err := a.InspectTableColumnsAndUniqueKeys(tb.TableSchema, tb.TableName)
1687
+ if err != nil {
1688
+ return err
1689
+ }
1690
+ tb.OriginalTableColumns = tableColumns
1691
+ }
1692
+ }*/
1576
1693
default :
1577
1694
query , args , rowDelta , err := a .buildDMLEventQuery (event )
1578
1695
if err != nil {
0 commit comments