Skip to content

Commit d071d61

Browse files
craig[bot]jeffswensonmw5hmiraradevasumeerbhola
committed
158341: distmerge: add merge processor scaffolding r=mw5h a=mw5h #### bulkmerge: add processors and DistSQL physical planning This commit introduces the initial implementation of bulkmerge processors for distributed merge operations. It adds three new processor types: MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL physical planning infrastructure to orchestrate them. The MergeLoopback processor runs on the coordinator node and generates initial tasks. The BulkMerge processors run on each SQL instance to perform merge operations. The MergeCoordinator collects results from all merge processors. This commit also updates the vectorized execution engine (execplan.go) to recognize the new processor cores, preventing panics when these processors are encountered. Informs #156580 Release note: None #### backup: deduplicate routing logic with physicalplan package Previously, the backup package contained duplicate implementations of routingDatumsForSQLInstance and routingSpanForSQLInstance that were identical to functions recently added to pkg/sql/physicalplan. This commit removes these duplicates and uses the shared implementations: RoutingDatumsForSQLInstance and MakeInstanceRouter. This reduces code duplication and ensures consistent routing behavior across DistSQL physical planning for both backup/restore and bulk merge operations. Informs #156580 Release note: None #### physicalplan: set DefaultDest to nil to catch routing bugs Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0. When DefaultDest is nil, any routing key that doesn't match a span will produce an error rather than silently routing to an arbitrary stream. This helps catch coordination bugs early. Updates merge_loopback.go to generate a routing key for its SQL instance using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded "loopback" key. This ensures the routing key matches one of the spans in the router rather than relying on the DefaultDest fallback. The test expectation is updated from "loopback->merge->coordinator" to "node1->merge->coordinator" to reflect the explicit routing behavior. Fixes #156580 Release note: None 158596: kvnemesis: correctly verify the span config of the system range r=miraradeva a=miraradeva Previously, kvnemesis verified the correct span config for some critical ranges: meta (r1), liveness (r2) and system (r3). However, as of 8844739, meta1 and meta2 start off as separate ranges, pushing the system range to r4. It's important for this range to be replicated correctly in kvnemesis to ensure it's available in the presence of network partitions. Unavailability of the system range can result in splits not able to allocate new range IDs. This commit bumps up the max range ID used for span config verification from 3 to 4. Informs: #158366 Release note: None 158606: mma: always incorporate RangeMsg.RangeLoad r=wenyihu6 a=sumeerbhola Epic: CRDB-55052 Release note: None 158610: mma: tweak the comment of MakeStoreLoadMsg r=wenyihu6 a=sumeerbhola This is mainly to clarify that we sum the load and capacity reported per store inside MMA, so tweaking the capacity calculation approach may not be possible in isolation. Epic: CRDB-55052 Release note: None Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Matt White <[email protected]> Co-authored-by: Mira Radeva <[email protected]> Co-authored-by: sumeerbhola <[email protected]>
5 parents fb7e7cc + d96f819 + 1072916 + 0dced9f + 494323e commit d071d61

25 files changed

+881
-135
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ ALL_TESTS = [
397397
"//pkg/sql/auditlogging:auditlogging_test",
398398
"//pkg/sql/backfill:backfill_test",
399399
"//pkg/sql/bulkingest:bulkingest_test",
400+
"//pkg/sql/bulkmerge:bulkmerge_test",
400401
"//pkg/sql/bulksst:bulksst_test",
401402
"//pkg/sql/bulkutil:bulkutil_test",
402403
"//pkg/sql/cacheutil:cacheutil_test",
@@ -1885,6 +1886,8 @@ GO_TARGETS = [
18851886
"//pkg/sql/backfill:backfill_test",
18861887
"//pkg/sql/bulkingest:bulkingest",
18871888
"//pkg/sql/bulkingest:bulkingest_test",
1889+
"//pkg/sql/bulkmerge:bulkmerge",
1890+
"//pkg/sql/bulkmerge:bulkmerge_test",
18881891
"//pkg/sql/bulksst:bulksst",
18891892
"//pkg/sql/bulksst:bulksst_test",
18901893
"//pkg/sql/bulkutil:bulkutil",

pkg/backup/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ go_library(
8383
"//pkg/sql",
8484
"//pkg/sql/catalog",
8585
"//pkg/sql/catalog/catalogkeys",
86-
"//pkg/sql/catalog/catenumpb",
8786
"//pkg/sql/catalog/catpb",
8887
"//pkg/sql/catalog/colinfo",
8988
"//pkg/sql/catalog/dbdesc",

pkg/backup/generative_split_and_scatter_processor.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package backup
77

88
import (
99
"context"
10-
"fmt"
1110
"hash/fnv"
1211
"math/rand"
1312
"strings"
@@ -22,9 +21,9 @@ import (
2221
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2322
"github.com/cockroachdb/cockroach/pkg/roachpb"
2423
"github.com/cockroachdb/cockroach/pkg/sql"
25-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
2624
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2725
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
26+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2827
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2928
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
3029
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -249,15 +248,6 @@ func (s dbSplitAndScatterer) findDestination(res *kvpb.AdminScatterResponse) roa
249248
return roachpb.NodeID(0)
250249
}
251250

252-
func routingDatumsForSQLInstance(
253-
sqlInstanceID base.SQLInstanceID,
254-
) (rowenc.EncDatum, rowenc.EncDatum) {
255-
routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID))
256-
startDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes)))
257-
endDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next())))
258-
return startDatum, endDatum
259-
}
260-
261251
type entryNode struct {
262252
entry execinfrapb.RestoreSpanEntry
263253
node roachpb.NodeID
@@ -393,7 +383,7 @@ func (gssp *generativeSplitAndScatterProcessor) Next() (
393383
// The routing datums informs the router which output stream should be used.
394384
routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node)
395385
if !ok {
396-
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
386+
routingDatum, _ = physicalplan.RoutingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
397387
gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum)
398388
}
399389

@@ -745,24 +735,6 @@ var splitAndScatterOutputTypes = []*types.T{
745735
types.Bytes, // RestoreDataEntry bytes
746736
}
747737

748-
// routingSpanForSQLInstance provides the mapping to be used during distsql planning
749-
// when setting up the output router.
750-
func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) {
751-
var alloc tree.DatumAlloc
752-
startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID)
753-
754-
startBytes, endBytes := make([]byte, 0), make([]byte, 0)
755-
startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes)
756-
if err != nil {
757-
return nil, nil, err
758-
}
759-
endBytes, err = endDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes)
760-
if err != nil {
761-
return nil, nil, err
762-
}
763-
return startBytes, endBytes, nil
764-
}
765-
766738
func init() {
767739
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor
768740
}

pkg/backup/restore_processor_planning.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
package backup
77

88
import (
9-
"bytes"
109
"context"
1110
"math"
12-
"slices"
1311

1412
"github.com/cockroachdb/cockroach/pkg/base"
1513
"github.com/cockroachdb/cockroach/pkg/cloud"
@@ -19,7 +17,6 @@ import (
1917
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2018
"github.com/cockroachdb/cockroach/pkg/roachpb"
2119
"github.com/cockroachdb/cockroach/pkg/sql"
22-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
2320
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2421
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2522
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -123,34 +120,10 @@ func distRestore(
123120
// Plan SplitAndScatter on the coordinator node.
124121
splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs)
125122

126-
defaultStream := int32(0)
127-
rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{
128-
Spans: nil,
129-
DefaultDest: &defaultStream,
130-
Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{
131-
{
132-
Column: 0,
133-
Encoding: catenumpb.DatumEncoding_ASCENDING_KEY,
134-
},
135-
},
136-
}
137-
for stream, sqlInstanceID := range sqlInstanceIDs {
138-
startBytes, endBytes, err := routingSpanForSQLInstance(sqlInstanceID)
139-
if err != nil {
140-
return nil, nil, err
141-
}
142-
143-
span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{
144-
Start: startBytes,
145-
End: endBytes,
146-
Stream: int32(stream),
147-
}
148-
rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span)
123+
rangeRouterSpec, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs)
124+
if err != nil {
125+
return nil, nil, err
149126
}
150-
// The router expects the spans to be sorted.
151-
slices.SortFunc(rangeRouterSpec.Spans, func(a, b execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) int {
152-
return bytes.Compare(a.Start, b.Start)
153-
})
154127

155128
// TODO(pbardea): This not super principled. I just wanted something that
156129
// wasn't a constant and grew slower than linear with the length of

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,9 @@ func setAndVerifyZoneConfigs(
851851
Key: desc.StartKey.AsRawKey(),
852852
EndKey: desc.EndKey.AsRawKey(),
853853
}
854-
if replicaSpan.Overlaps(dataSpan) || desc.RangeID <= 3 {
854+
// Ranges 1-4 are the ranges we constrained above
855+
// (1: meta1, 2: meta2, 3: liveness, 4: system).
856+
if replicaSpan.Overlaps(dataSpan) || desc.RangeID <= 4 {
855857
overlappingReplicas = append(overlappingReplicas, replica)
856858
}
857859
return true // continue

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1370,6 +1370,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
13701370
} else if rs.localRangeOwner != msg.StoreID {
13711371
rs.localRangeOwner = msg.StoreID
13721372
}
1373+
rs.load = rangeMsg.RangeLoad
13731374
if !rangeMsg.MaybeSpanConfIsPopulated && len(rs.pendingChanges) == 0 {
13741375
// Common case: no pending changes, and span config not provided.
13751376
//
@@ -1397,16 +1398,18 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
13971398
mayHaveDiverged = true
13981399
}
13991400
if !mayHaveDiverged {
1401+
// This is the common case where there were no pending changes, and no
1402+
// span config provided and no replicas changes. We don't need to do
1403+
// any more processing of this RangeMsg.
14001404
continue
14011405
}
14021406
// Else fall through and do the expensive work.
14031407
}
1408+
// Slow path, which reconstructs everything about the range.
1409+
14041410
// Set the range state and store state to match the range message state
14051411
// initially. The pending changes which are not enacted in the range
14061412
// message are handled and added back below.
1407-
if rangeMsg.MaybeSpanConfIsPopulated {
1408-
rs.load = rangeMsg.RangeLoad
1409-
}
14101413
for _, replica := range rs.replicas {
14111414
ss := cs.stores[replica.StoreID]
14121415
if ss == nil {
@@ -1548,10 +1551,10 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
15481551
}
15491552
rs.conf = normSpanConfig
15501553
}
1551-
// NB: Always recompute the analyzed range constraints for any range,
1552-
// assuming the leaseholder wouldn't have sent the message if there was no
1553-
// change, or we noticed a divergence in membership above and fell through
1554-
// here.
1554+
// Ensure (later) recomputation of the analyzed range constraints for the
1555+
// range, by clearing the existing analyzed constraints. This is done
1556+
// since in this slow path the span config or the replicas may have
1557+
// changed.
15551558
rs.clearAnalyzedConstraints()
15561559
}
15571560
// Remove ranges for which this is the localRangeOwner, but for which it is

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func parseStoreLeaseholderMsg(t *testing.T, in string) StoreLeaseholderMsg {
187187
case "raft-cpu":
188188
rMsg.RangeLoad.RaftCPU = LoadValue(parseInt(t, parts[1]))
189189
rMsg.MaybeSpanConfIsPopulated = true
190-
case "not-populated":
190+
case "span-config-not-populated":
191191
notPopulatedOverride = true
192192
default:
193193
t.Fatalf("unknown argument: %s", parts[0])

pkg/kv/kvserver/allocator/mmaprototype/messages.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,21 @@ type StoreLeaseholderMsg struct {
4242
}
4343

4444
// RangeMsg is generated by the leaseholder store (and part of
45-
// StoreLeaseholderMsg). If there is any change for that range, the full
46-
// information for that range is provided (and MaybeSpanConfIsPopulated is set
47-
// to true). This is also the case for a new leaseholder since it does not know
48-
// whether something has changed since the last leaseholder informed the
49-
// allocator. A tiny change to the RangeLoad (decided by the caller) will not
50-
// cause the fields to be populated.
45+
// StoreLeaseholderMsg). All fields except for MaybeSpanConf are always
46+
// populated. The MaybeSpanConf is populated if there is a possible change in
47+
// the SpanConfig (including when this is a new leaseholder for the range),
48+
// and is indicated by MaybeSpanConfIsPopulated being true. The optional
49+
// population is done to avoid the expensive processing of a SpanConfig by
50+
// MMA, on every message for the range.
5151
//
52-
// To ensure that the allocator does not lose synchronization with the current
53-
// set of replicas, due to spurious changes (we had one undiagnosed example
54-
// where the allocator was spuriously told that a lease was transferred away),
55-
// the Replicas field is always populated).
52+
// The main reason we always populate Replicas is to ensure that the allocator
53+
// does not lose synchronization with the current set of replicas, due to
54+
// spurious changes (we had one undiagnosed example where the allocator was
55+
// spuriously told that a lease was transferred away).
5656
type RangeMsg struct {
5757
roachpb.RangeID
5858
Replicas []StoreIDAndReplicaState
59+
RangeLoad RangeLoad
5960
MaybeSpanConfIsPopulated bool
6061
MaybeSpanConf roachpb.SpanConfig
61-
RangeLoad RangeLoad
6262
}

pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/multiple_ranges.txt

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ store-load-msg
5151
store-id=2 node-id=2 load=[45,40,50] capacity=[100,100,50] secondary-load=1 load-time=0s
5252
----
5353

54-
# StoreLeaseholderMsg, with empty information, since the range states have not changed.
54+
# StoreLeaseholderMsg, with no span configs for the ranges.
5555
store-leaseholder-msg
5656
store-id=1
57-
range-id=1 not-populated
57+
range-id=1 load=[10,10,20] raft-cpu=5 span-config-not-populated
5858
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
5959
store-id=2 replica-id=2 type=VOTER_FULL
60-
range-id=2 not-populated
60+
range-id=2 load=[20,10,15] raft-cpu=10 span-config-not-populated
6161
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
6262
store-id=2 replica-id=2 type=VOTER_FULL
63-
range-id=3 not-populated
63+
range-id=3 load=[30,10,10] raft-cpu=25 span-config-not-populated
6464
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
6565
store-id=2 replica-id=2 type=VOTER_FULL
66-
range-id=4 not-populated
66+
range-id=4 load=[40,10,5] raft-cpu=5 span-config-not-populated
6767
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
6868
store-id=2 replica-id=2 type=VOTER_FULL
6969
----
@@ -76,28 +76,32 @@ store-id=1 node-id=1 status=ok accepting all reported=[cpu:100, write-bandwidth:
7676
store-id=2 node-id=2 status=ok accepting all reported=[cpu:45, write-bandwidth:40, byte-size:50] adjusted=[cpu:45, write-bandwidth:40, byte-size:50] node-reported-cpu=45 node-adjusted-cpu=45 seq=2
7777
top-k-ranges (local-store-id=1) dim=ByteSize: r1 r2
7878

79-
# StoreLeaseholderMsg not containing r1 and r4 since no longer the leaseholder.
79+
# StoreLeaseholderMsg not containing r1 and r4 since no longer the
80+
# leaseholder. Change the load of r2, while MaybeSpanConfIsPopulated=false,
81+
# to ensure that we notice it.
8082
store-leaseholder-msg
8183
store-id=1
82-
range-id=2 not-populated
84+
range-id=2 load=[40,20,5] raft-cpu=10 span-config-not-populated
8385
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
8486
store-id=2 replica-id=2 type=VOTER_FULL
85-
range-id=3 not-populated
87+
range-id=3 load=[30,10,10] raft-cpu=25 span-config-not-populated
8688
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
8789
store-id=2 replica-id=2 type=VOTER_FULL
8890
----
8991

90-
# r1 and r4 no longer mentioned in the top-k.
92+
# r1 and r4 no longer mentioned in the top-k. And the top-k ordering has
93+
# changed for both stores since r2 is now more than r3 in terms of CPU,
94+
# but less than r3 in terms of ByteSize.
9195
get-load-info
9296
----
9397
store-id=1 node-id=1 status=ok accepting all reported=[cpu:100, write-bandwidth:40, byte-size:50] adjusted=[cpu:100, write-bandwidth:40, byte-size:50] node-reported-cpu=100 node-adjusted-cpu=100 seq=1
94-
top-k-ranges (local-store-id=1) dim=CPURate: r3 r2
98+
top-k-ranges (local-store-id=1) dim=CPURate: r2 r3
9599
store-id=2 node-id=2 status=ok accepting all reported=[cpu:45, write-bandwidth:40, byte-size:50] adjusted=[cpu:45, write-bandwidth:40, byte-size:50] node-reported-cpu=45 node-adjusted-cpu=45 seq=2
96-
top-k-ranges (local-store-id=1) dim=ByteSize: r2 r3
100+
top-k-ranges (local-store-id=1) dim=ByteSize: r3 r2
97101

98102
ranges
99103
----
100-
range-id=2 local-store=1 load=[cpu:20, write-bandwidth:10, byte-size:15] raft-cpu=10
104+
range-id=2 local-store=1 load=[cpu:40, write-bandwidth:20, byte-size:5] raft-cpu=10
101105
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
102106
store-id=2 replica-id=2 type=VOTER_FULL
103107
range-id=3 local-store=1 load=[cpu:30, write-bandwidth:10, byte-size:10] raft-cpu=25
@@ -109,17 +113,17 @@ range-id=3 local-store=1 load=[cpu:30, write-bandwidth:10, byte-size:10] raft-cp
109113
# of replicas, and the other the replica-id of one of the store's replicas.
110114
store-leaseholder-msg
111115
store-id=1
112-
range-id=2 not-populated
116+
range-id=2 load=[40,20,30] raft-cpu=10 span-config-not-populated
113117
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
114-
range-id=3 not-populated
118+
range-id=3 load=[30,10,10] raft-cpu=25 span-config-not-populated
115119
store-id=2 replica-id=5 type=VOTER_FULL
116120
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
117121
----
118122

119123
# The ranges reflect the latest state from the StoreLeaseholderMsg.
120124
ranges
121125
----
122-
range-id=2 local-store=1 load=[cpu:20, write-bandwidth:10, byte-size:15] raft-cpu=10
126+
range-id=2 local-store=1 load=[cpu:40, write-bandwidth:20, byte-size:30] raft-cpu=10
123127
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
124128
range-id=3 local-store=1 load=[cpu:30, write-bandwidth:10, byte-size:10] raft-cpu=25
125129
store-id=2 replica-id=5 type=VOTER_FULL

0 commit comments

Comments
 (0)