Skip to content

Conversation

@mw5h
Copy link
Contributor

@mw5h mw5h commented Nov 25, 2025

bulkmerge: add processors and DistSQL physical planning

This commit introduces the initial implementation of bulkmerge processors
for distributed merge operations. It adds three new processor types:
MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL
physical planning infrastructure to orchestrate them.

The MergeLoopback processor runs on the coordinator node and generates
initial tasks. The BulkMerge processors run on each SQL instance to
perform merge operations. The MergeCoordinator collects results from
all merge processors.

This commit also updates the vectorized execution engine (execplan.go)
to recognize the new processor cores, preventing panics when these
processors are encountered.

Informs #156580
Release note: None

backup: deduplicate routing logic with physicalplan package

Previously, the backup package contained duplicate implementations of
routingDatumsForSQLInstance and routingSpanForSQLInstance that were
identical to functions recently added to pkg/sql/physicalplan. This
commit removes these duplicates and uses the shared implementations:
RoutingDatumsForSQLInstance and MakeInstanceRouter.

This reduces code duplication and ensures consistent routing behavior
across DistSQL physical planning for both backup/restore and bulk merge
operations.

Informs #156580
Release note: None

physicalplan: set DefaultDest to nil to catch routing bugs

Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0.
When DefaultDest is nil, any routing key that doesn't match a span will
produce an error rather than silently routing to an arbitrary stream.
This helps catch coordination bugs early.

Updates merge_loopback.go to generate a routing key for its SQL instance
using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded
"loopback" key. This ensures the routing key matches one of the spans in
the router rather than relying on the DefaultDest fallback.

The test expectation is updated from "loopback->merge->coordinator" to
"node1->merge->coordinator" to reflect the explicit routing behavior.

Fixes #156580
Release note: None

@mw5h mw5h requested review from a team as code owners November 25, 2025 18:26
@mw5h mw5h requested review from jeffswenson, mgartner and spilchen and removed request for a team November 25, 2025 18:26
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@mw5h mw5h force-pushed the distmerge-scaffolding branch from d5946a1 to 17749c1 Compare November 25, 2025 18:57
Copy link
Collaborator

@jeffswenson jeffswenson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false];
// spans are the merge tasks. E.g.the merge task `n` is to merge all the input data that overlaps with
// [spans[n].Key, spans[n].EndKey)
repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the formatting is inconsistent in BulkMergeSpec's definition. It seems to be a mix of tabs and spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/me shakes fist at Claude

@mw5h mw5h force-pushed the distmerge-scaffolding branch 2 times, most recently from ffc184b to 5de1e7f Compare November 25, 2025 19:41
Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spilchen reviewed 8 of 17 files at r1, 2 of 6 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jeffswenson and @mgartner)


pkg/sql/execinfrapb/processors_bulk_io.proto line 619 at r4 (raw file):

    optional string uri = 1 [(gogoproto.nullable) = false];
    // start_key is the first key in the SST.
    optional bytes start_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];

I see you changes the start_key/end_key from string to bytes. This was a miss in my earlier PR. There are a bunch of other changes needed for that. See #158290. I think I'll go ahead and merge that to unblock this PR.

@mw5h mw5h force-pushed the distmerge-scaffolding branch from 5de1e7f to 827768f Compare November 25, 2025 22:16
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jeffswenson, @mgartner, @mw5h, and @spilchen)


pkg/sql/execinfrapb/processors_bulk_io.proto line 188 at r7 (raw file):

// MergeLoopback is scheduled on the same node as the MergeCoordinator.
// MegeCoordinator is the final processor in the flow and MergeLoopback is the

nit: s/Mege/Merge/.


pkg/sql/execinfrapb/processors_bulk_io.proto line 603 at r7 (raw file):

message BulkMergeSpec {
  // ssts is the list of input SSTs to merge.
  repeated SST ssts = 1 [(gogoproto.nullable) = false];

nit: would be nice to give this customname of SSTs. Ditto in output proto below.


pkg/sql/bulkmerge/merge_test.go line 53 at r7 (raw file):

	})

	sqlReciever := sql.MakeDistSQLReceiver(

nit: s/Reciever/Receiver/.


pkg/sql/physicalplan/routing.go line 44 at r7 (raw file):

	startDatum, endDatum := RoutingDatumsForSQLInstance(sqlInstanceID)

	startBytes, endBytes := make([]byte, 0), make([]byte, 0)

nit: this seems unusual - we can just pass nil as appendTo argument of EncDatum.Encode.


pkg/sql/bulkmerge/merge_planning.go line 23 at r7 (raw file):

	// NOTE: This implementation is inspired by the physical plan created by
	// restore in `pkg/backup/restore_processor_planning.go`
	planCtx, sqlInstanceIDs, err := execCtx.DistSQLPlanner().SetupAllNodesPlanning(

We'll need to be careful about mixed-version state where the coordinator node is already running CRDB version supporting the bulk merge, but other nodes in the cluster do not yet. We probably just want to add a version gate somewhere, so maybe lets leave a TODO for now.


pkg/sql/bulkmerge/merge_processor.go line 33 at r7 (raw file):

// bulkMergeProcessor accepts rows that include an assigned task id and emits
// rows that are (taskID, []ouput_sst) where output_sst is the name of SSTs

nit: s/ouput/output/. Also something is off with "rpo the merged output" below.

@mw5h mw5h force-pushed the distmerge-scaffolding branch from 827768f to 220af80 Compare November 27, 2025 01:02
jeffswenson and others added 3 commits November 26, 2025 19:32
This commit introduces the initial implementation of bulkmerge processors
for distributed merge operations. It adds three new processor types:
MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL
physical planning infrastructure to orchestrate them.

The MergeLoopback processor runs on the coordinator node and generates
initial tasks. The BulkMerge processors run on each SQL instance to
perform merge operations. The MergeCoordinator collects results from
all merge processors.

This commit also updates the vectorized execution engine (execplan.go)
to recognize the new processor cores, preventing panics when these
processors are encountered.

Informs cockroachdb#156580
Release note: None
Previously, the backup package contained duplicate implementations of
routingDatumsForSQLInstance and routingSpanForSQLInstance that were
identical to functions recently added to pkg/sql/physicalplan. This
commit removes these duplicates and uses the shared implementations:
RoutingDatumsForSQLInstance and MakeInstanceRouter.

This reduces code duplication and ensures consistent routing behavior
across DistSQL physical planning for both backup/restore and bulk merge
operations.

Informs cockroachdb#156580
Release note: None
Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0.
When DefaultDest is nil, any routing key that doesn't match a span will
produce an error rather than silently routing to an arbitrary stream.
This helps catch coordination bugs early.

Updates merge_loopback.go to generate a routing key for its SQL instance
using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded
"loopback" key. This ensures the routing key matches one of the spans in
the router rather than relying on the DefaultDest fallback.

The test expectation is updated from "loopback->merge->coordinator" to
"node1->merge->coordinator" to reflect the explicit routing behavior.

Fixes cockroachdb#156580
Release note: None
@mw5h mw5h force-pushed the distmerge-scaffolding branch from 220af80 to 7e30e1b Compare November 27, 2025 03:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

sql/bulkmerge: add merge processor scaffolding and DistSQL flow

5 participants