@@ -24,6 +24,7 @@ import (
24
24
"context"
25
25
"errors"
26
26
"fmt"
27
+ "slices"
27
28
"sync"
28
29
"sync/atomic"
29
30
"time"
@@ -1568,10 +1569,27 @@ func (h *handlerImpl) GetReplicationMessages(
1568
1569
return nil , constants .ErrShuttingDown
1569
1570
}
1570
1571
1571
- var wg sync.WaitGroup
1572
- wg .Add (len (request .Tokens ))
1573
- result := new (sync.Map )
1572
+ msgs := h .getReplicationShardMessages (ctx , request )
1573
+ response := h .buildGetReplicationMessagesResponse (metricsScope , msgs )
1574
1574
1575
+ h .GetLogger ().Debug ("GetReplicationMessages succeeded." )
1576
+ return response , nil
1577
+ }
1578
+
1579
+ // getReplicationShardMessages gets replication messages from all the shards of the request
1580
+ // it queries the replication tasks from each shard in parallel
1581
+ // and returns the replication tasks in the order of the request tokens
1582
+ func (h * handlerImpl ) getReplicationShardMessages (
1583
+ ctx context.Context ,
1584
+ request * types.GetReplicationMessagesRequest ,
1585
+ ) []replicationShardMessages {
1586
+ var (
1587
+ wg sync.WaitGroup
1588
+ mx sync.Mutex
1589
+ results = make ([]replicationShardMessages , 0 , len (request .Tokens ))
1590
+ )
1591
+
1592
+ wg .Add (len (request .Tokens ))
1575
1593
for _ , token := range request .Tokens {
1576
1594
go func (token * types.ReplicationToken ) {
1577
1595
defer wg .Done ()
@@ -1581,7 +1599,7 @@ func (h *handlerImpl) GetReplicationMessages(
1581
1599
h .GetLogger ().Warn ("History engine not found for shard" , tag .Error (err ))
1582
1600
return
1583
1601
}
1584
- tasks , err := engine .GetReplicationMessages (
1602
+ msgs , err := engine .GetReplicationMessages (
1585
1603
ctx ,
1586
1604
request .GetClusterName (),
1587
1605
token .GetLastRetrievedMessageID (),
@@ -1591,42 +1609,112 @@ func (h *handlerImpl) GetReplicationMessages(
1591
1609
return
1592
1610
}
1593
1611
1594
- result .Store (token .GetShardID (), tasks )
1612
+ mx .Lock ()
1613
+ defer mx .Unlock ()
1614
+
1615
+ results = append (results , replicationShardMessages {
1616
+ ReplicationMessages : msgs ,
1617
+ shardID : token .GetShardID (),
1618
+ size : proto .FromReplicationMessages (msgs ).Size (),
1619
+ earliestCreationTime : msgs .GetEarliestCreationTime (),
1620
+ })
1595
1621
}(token )
1596
1622
}
1597
1623
1598
1624
wg .Wait ()
1625
+ return results
1626
+ }
1599
1627
1600
- responseSize := 0
1601
- maxResponseSize := h .config .MaxResponseSize
1602
-
1603
- messagesByShard := make (map [int32 ]* types.ReplicationMessages )
1604
- result .Range (func (key , value interface {}) bool {
1605
- shardID := key .(int32 )
1606
- tasks := value .(* types.ReplicationMessages )
1628
+ // buildGetReplicationMessagesResponse builds a new GetReplicationMessagesResponse from shard results
1629
+ // The response can be partial if the total size of the response exceeds the max size.
1630
+ // In this case, responses with oldest replication tasks will be returned
1631
+ func (h * handlerImpl ) buildGetReplicationMessagesResponse (metricsScope metrics.Scope , msgs []replicationShardMessages ) * types.GetReplicationMessagesResponse {
1632
+ // Shards with large maessages can cause the response to exceed the max size.
1633
+ // In this case, we need to skip some shard messages to make sure the result response size is within the limit.
1634
+ // To prevent a replication lag in the future, we should return the messages with the oldest replication task.
1635
+ // So we sort the shard messages by the earliest creation time of the replication task.
1636
+ // If the earliest creation time is the same, we compare the size of the message.
1637
+ // This will sure that shards with the oldest replication tasks will be processed first.
1638
+ sortReplicationShardMessages (msgs )
1639
+
1640
+ var (
1641
+ responseSize = 0
1642
+ maxResponseSize = h .config .MaxResponseSize
1643
+ messagesByShard = make (map [int32 ]* types.ReplicationMessages , len (msgs ))
1644
+ )
1607
1645
1608
- size := proto . FromReplicationMessages ( tasks ). Size ()
1609
- if (responseSize + size ) >= maxResponseSize {
1610
- metricsScope .Tagged (metrics .ShardIDTag (int (shardID ))).IncCounter (metrics .ReplicationMessageTooLargePerShard )
1646
+ for _ , m := range msgs {
1647
+ if (responseSize + m . size ) >= maxResponseSize {
1648
+ metricsScope .Tagged (metrics .ShardIDTag (int (m . shardID ))).IncCounter (metrics .ReplicationMessageTooLargePerShard )
1611
1649
1612
1650
// Log shards that did not fit for debugging purposes
1613
1651
h .GetLogger ().Warn ("Replication messages did not fit in the response (history host)" ,
1614
- tag .ShardID (int (shardID )),
1615
- tag .ResponseSize (size ),
1652
+ tag .ShardID (int (m . shardID )),
1653
+ tag .ResponseSize (m . size ),
1616
1654
tag .ResponseTotalSize (responseSize ),
1617
1655
tag .ResponseMaxSize (maxResponseSize ),
1618
1656
)
1619
- } else {
1620
- responseSize += size
1621
- messagesByShard [shardID ] = tasks
1657
+
1658
+ continue
1622
1659
}
1660
+ responseSize += m .size
1661
+ messagesByShard [m .shardID ] = m .ReplicationMessages
1662
+ }
1663
+ return & types.GetReplicationMessagesResponse {MessagesByShard : messagesByShard }
1664
+ }
1623
1665
1624
- return true
1625
- })
1666
+ // replicationShardMessages wraps types.ReplicationMessages
1667
+ // and contains some metadata of the ReplicationMessages
1668
+ type replicationShardMessages struct {
1669
+ * types.ReplicationMessages
1670
+ // shardID of the ReplicationMessages
1671
+ shardID int32
1672
+ // size of proto payload of ReplicationMessages
1673
+ size int
1674
+ // earliestCreationTime of ReplicationMessages
1675
+ earliestCreationTime * int64
1676
+ }
1626
1677
1627
- h .GetLogger ().Debug ("GetReplicationMessages succeeded." )
1678
+ // sortReplicationShardMessages sorts the peer responses by the earliest creation time of the replication tasks
1679
+ func sortReplicationShardMessages (msgs []replicationShardMessages ) {
1680
+ slices .SortStableFunc (msgs , cmpReplicationShardMessages )
1681
+ }
1682
+
1683
+ // cmpReplicationShardMessages compares
1684
+ // two replicationShardMessages objects by earliest creation time
1685
+ // it can be used as a comparison func for slices.SortStableFunc
1686
+ // if a's or b's earliestCreationTime is nil, slices.SortStableFunc will put them to the end of a slice
1687
+ // otherwise it will compare the earliestCreationTime of the replication tasks
1688
+ // if earliestCreationTime is equal, it will compare the size of the response
1689
+ func cmpReplicationShardMessages (a , b replicationShardMessages ) int {
1690
+ // a > b
1691
+ if a .earliestCreationTime == nil {
1692
+ return 1
1693
+ }
1694
+ // a < b
1695
+ if b .earliestCreationTime == nil {
1696
+ return - 1
1697
+ }
1698
+
1699
+ // if both are not nil, compare the creation time
1700
+ if * a .earliestCreationTime < * b .earliestCreationTime {
1701
+ return - 1
1702
+ }
1703
+
1704
+ if * a .earliestCreationTime > * b .earliestCreationTime {
1705
+ return 1
1706
+ }
1707
+
1708
+ // if both equal, compare the size
1709
+ if a .size < b .size {
1710
+ return - 1
1711
+ }
1712
+
1713
+ if a .size > b .size {
1714
+ return 1
1715
+ }
1628
1716
1629
- return & types. GetReplicationMessagesResponse { MessagesByShard : messagesByShard }, nil
1717
+ return 0
1630
1718
}
1631
1719
1632
1720
// GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging
0 commit comments