From 8ee5a1525d6b3bc765ea3e1e00cf60a35d7f9a7d Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Fri, 21 Feb 2025 17:28:18 -0500 Subject: [PATCH 1/3] bulkmerge: add processors and DistSQL physical planning This commit introduces the initial implementation of bulkmerge processors for distributed merge operations. It adds three new processor types: MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL physical planning infrastructure to orchestrate them. The MergeLoopback processor runs on the coordinator node and generates initial tasks. The BulkMerge processors run on each SQL instance to perform merge operations. The MergeCoordinator collects results from all merge processors. This commit also updates the vectorized execution engine (execplan.go) to recognize the new processor cores, preventing panics when these processors are encountered. Informs #156580 Release note: None --- pkg/BUILD.bazel | 3 + pkg/sql/bulkmerge/BUILD.bazel | 61 +++++++++++ pkg/sql/bulkmerge/main_test.go | 30 ++++++ pkg/sql/bulkmerge/merge_coordinator.go | 87 ++++++++++++++++ pkg/sql/bulkmerge/merge_loopback.go | 74 ++++++++++++++ pkg/sql/bulkmerge/merge_planning.go | 96 +++++++++++++++++ pkg/sql/bulkmerge/merge_processor.go | 102 +++++++++++++++++++ pkg/sql/bulkmerge/merge_processor_test.go | 54 ++++++++++ pkg/sql/bulkmerge/merge_test.go | 76 ++++++++++++++ pkg/sql/colexec/colbuilder/execplan.go | 6 ++ pkg/sql/execinfrapb/flow_diagram.go | 12 +++ pkg/sql/execinfrapb/processors.proto | 5 +- pkg/sql/execinfrapb/processors_bulk_io.proto | 61 +++++++---- pkg/sql/physicalplan/BUILD.bazel | 3 + pkg/sql/physicalplan/routing.go | 92 +++++++++++++++++ pkg/sql/rowexec/processors.go | 56 ++++++++++ 16 files changed, 796 insertions(+), 22 deletions(-) create mode 100644 pkg/sql/bulkmerge/BUILD.bazel create mode 100644 pkg/sql/bulkmerge/main_test.go create mode 100644 pkg/sql/bulkmerge/merge_coordinator.go create mode 100644 pkg/sql/bulkmerge/merge_loopback.go create mode 100644 pkg/sql/bulkmerge/merge_planning.go create mode 100644 pkg/sql/bulkmerge/merge_processor.go create mode 100644 pkg/sql/bulkmerge/merge_processor_test.go create mode 100644 pkg/sql/bulkmerge/merge_test.go create mode 100644 pkg/sql/physicalplan/routing.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ff79ee2fb8d8..ce9f11988696 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -397,6 +397,7 @@ ALL_TESTS = [ "//pkg/sql/auditlogging:auditlogging_test", "//pkg/sql/backfill:backfill_test", "//pkg/sql/bulkingest:bulkingest_test", + "//pkg/sql/bulkmerge:bulkmerge_test", "//pkg/sql/bulksst:bulksst_test", "//pkg/sql/bulkutil:bulkutil_test", "//pkg/sql/cacheutil:cacheutil_test", @@ -1885,6 +1886,8 @@ GO_TARGETS = [ "//pkg/sql/backfill:backfill_test", "//pkg/sql/bulkingest:bulkingest", "//pkg/sql/bulkingest:bulkingest_test", + "//pkg/sql/bulkmerge:bulkmerge", + "//pkg/sql/bulkmerge:bulkmerge_test", "//pkg/sql/bulksst:bulksst", "//pkg/sql/bulksst:bulksst_test", "//pkg/sql/bulkutil:bulkutil", diff --git a/pkg/sql/bulkmerge/BUILD.bazel b/pkg/sql/bulkmerge/BUILD.bazel new file mode 100644 index 000000000000..489bcd8ec3d5 --- /dev/null +++ b/pkg/sql/bulkmerge/BUILD.bazel @@ -0,0 +1,61 @@ +# Copyright 2025 The Cockroach Authors. +# +# Use of this software is governed by the CockroachDB Software License +# included in the /LICENSE file. + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "bulkmerge", + srcs = [ + "merge_coordinator.go", + "merge_loopback.go", + "merge_planning.go", + "merge_processor.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkmerge", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/sql", + "//pkg/sql/execinfra", + "//pkg/sql/execinfrapb", + "//pkg/sql/physicalplan", + "//pkg/sql/rowenc", + "//pkg/sql/rowexec", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "bulkmerge_test", + srcs = [ + "main_test.go", + "merge_processor_test.go", + "merge_test.go", + ], + embed = [":bulkmerge"], + deps = [ + "//pkg/base", + "//pkg/kv/kvclient/kvtenant", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/security/username", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/sql/execinfra", + "//pkg/sql/execinfrapb", + "//pkg/sql/sem/eval", + "//pkg/sql/sem/tree", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/bulkmerge/main_test.go b/pkg/sql/bulkmerge/main_test.go new file mode 100644 index 000000000000..aef1634520e5 --- /dev/null +++ b/pkg/sql/bulkmerge/main_test.go @@ -0,0 +1,30 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +//go:generate ../util/leaktest/add-leaktest.sh *_test.go + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + kvtenant.InitTestConnectorFactory() + os.Exit(m.Run()) +} diff --git a/pkg/sql/bulkmerge/merge_coordinator.go b/pkg/sql/bulkmerge/merge_coordinator.go new file mode 100644 index 000000000000..920d64c45247 --- /dev/null +++ b/pkg/sql/bulkmerge/merge_coordinator.go @@ -0,0 +1,87 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +var ( + _ execinfra.Processor = &mergeCoordinator{} + _ execinfra.RowSource = &mergeCoordinator{} +) + +// Emits a single row on completion which is a protobuf containing the details +// of the merged SSTs. The protobuf is BulkMergeSpec_Output, which contains the +// list of output SSTs with their URIs and key ranges. +var mergeCoordinatorOutputTypes = []*types.T{ + types.Bytes, +} + +type mergeCoordinator struct { + execinfra.ProcessorBase + input execinfra.RowSource +} + +// Next implements execinfra.RowSource. +func (m *mergeCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + for m.State == execinfra.StateRunning { + row, meta := m.input.Next() + switch { + case row == nil && meta == nil: + m.MoveToDraining(nil /* err */) + case meta != nil && meta.Err != nil: + m.MoveToDraining(meta.Err) + case meta != nil: + m.MoveToDraining(errors.Newf("unexpected meta: %v", meta)) + case row != nil: + base := *row[2].Datum.(*tree.DBytes) + return rowenc.EncDatumRow{ + rowenc.EncDatum{Datum: tree.NewDBytes(base + "->coordinator")}, + }, nil + } + } + return nil, m.DrainHelper() +} + +// Start implements execinfra.RowSource. +func (m *mergeCoordinator) Start(ctx context.Context) { + m.StartInternal(ctx, "mergeCoordinator") + m.input.Start(ctx) +} + +func init() { + rowexec.NewMergeCoordinatorProcessor = func( + ctx context.Context, + flow *execinfra.FlowCtx, + flowID int32, + spec execinfrapb.MergeCoordinatorSpec, + postSpec *execinfrapb.PostProcessSpec, + input execinfra.RowSource, + ) (execinfra.Processor, error) { + mc := &mergeCoordinator{ + input: input, + } + err := mc.Init( + ctx, mc, postSpec, mergeCoordinatorOutputTypes, flow, flowID, nil, + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{input}, + }, + ) + if err != nil { + return nil, err + } + return mc, nil + } +} diff --git a/pkg/sql/bulkmerge/merge_loopback.go b/pkg/sql/bulkmerge/merge_loopback.go new file mode 100644 index 000000000000..8f2753a9e4a0 --- /dev/null +++ b/pkg/sql/bulkmerge/merge_loopback.go @@ -0,0 +1,74 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +var ( + _ execinfra.Processor = &mergeLoopback{} + _ execinfra.RowSource = &mergeLoopback{} +) + +var mergeLoopbackOutputTypes = []*types.T{ + // Span key for the range router. It encodes the destination + // processor's SQL instance ID. + types.Bytes, + // Task ID + types.Int4, +} + +type mergeLoopback struct { + execinfra.ProcessorBase + done bool +} + +// Next implements execinfra.RowSource. +func (m *mergeLoopback) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if m.done { + m.MoveToDraining(nil) + return nil, m.DrainHelper() + } + m.done = true + return rowenc.EncDatumRow{ + rowenc.EncDatum{Datum: tree.NewDBytes("loopback")}, + rowenc.EncDatum{Datum: tree.NewDInt(1)}, + }, nil +} + +// Start implements execinfra.RowSource. +func (m *mergeLoopback) Start(ctx context.Context) { + m.StartInternal(ctx, "mergeLoopback") + // TODO(jeffswenson): create the initial set of tasks +} + +func init() { + rowexec.NewMergeLoopbackProcessor = func( + ctx context.Context, + flow *execinfra.FlowCtx, + flowID int32, + spec execinfrapb.MergeLoopbackSpec, + postSpec *execinfrapb.PostProcessSpec, + ) (execinfra.Processor, error) { + ml := &mergeLoopback{} + err := ml.Init( + ctx, ml, postSpec, mergeLoopbackOutputTypes, flow, flowID, nil, + execinfra.ProcStateOpts{}, + ) + if err != nil { + return nil, err + } + return ml, nil + } +} diff --git a/pkg/sql/bulkmerge/merge_planning.go b/pkg/sql/bulkmerge/merge_planning.go new file mode 100644 index 000000000000..210e00260915 --- /dev/null +++ b/pkg/sql/bulkmerge/merge_planning.go @@ -0,0 +1,96 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/errors" +) + +func newBulkMergePlan( + ctx context.Context, execCtx sql.JobExecContext, +) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { + // NOTE: This implementation is inspired by the physical plan created by + // restore in `pkg/backup/restore_processor_planning.go` + // TODO(mw5h): We need to be careful about mixed version clusters, so consider + // where we'll want to add a version gate. + planCtx, sqlInstanceIDs, err := execCtx.DistSQLPlanner().SetupAllNodesPlanning( + ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg()) + if err != nil { + return nil, nil, err + } + + plan := planCtx.NewPhysicalPlan() + // Use the gateway node as the coordinator, which is where the job was initiated. + coordinatorID := plan.GatewaySQLInstanceID + + router, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs) + if err != nil { + return nil, nil, errors.Wrap(err, "unable to make instance router") + } + + loopbackID := plan.AddProcessor(physicalplan.Processor{ + SQLInstanceID: coordinatorID, + Spec: execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{ + MergeLoopback: &execinfrapb.MergeLoopbackSpec{}, + }, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{{ + Type: execinfrapb.OutputRouterSpec_BY_RANGE, + RangeRouterSpec: router, + }}, + StageID: plan.NewStageOnNodes([]base.SQLInstanceID{coordinatorID}), + ResultTypes: mergeLoopbackOutputTypes, + }, + }) + + mergeStage := plan.NewStageOnNodes(sqlInstanceIDs) + for streamID, sqlInstanceID := range sqlInstanceIDs { + pIdx := plan.AddProcessor(physicalplan.Processor{ + SQLInstanceID: sqlInstanceID, + Spec: execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ + ColumnTypes: mergeLoopbackOutputTypes, + }}, + Core: execinfrapb.ProcessorCoreUnion{ + BulkMerge: &execinfrapb.BulkMergeSpec{ + // TODO(jeffswenson): fill in the rest of the spec + }, + }, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{{ + Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, + }}, + StageID: mergeStage, + ResultTypes: bulkMergeProcessorOutputTypes, + }, + }) + plan.Streams = append(plan.Streams, physicalplan.Stream{ + SourceProcessor: loopbackID, + SourceRouterSlot: streamID, + DestProcessor: pIdx, + DestInput: 0, + }) + plan.ResultRouters = append(plan.ResultRouters, pIdx) + } + + plan.AddSingleGroupStage(ctx, coordinatorID, execinfrapb.ProcessorCoreUnion{ + MergeCoordinator: &execinfrapb.MergeCoordinatorSpec{ + // TODO fill in the rest of the spec + }, + }, execinfrapb.PostProcessSpec{}, mergeCoordinatorOutputTypes, nil /* finalizeLastStageCb */) + + plan.PlanToStreamColMap = []int{0} // Needed for FinalizePlan to populate ResultTypes + sql.FinalizePlan(ctx, planCtx, plan) + + return plan, planCtx, nil +} diff --git a/pkg/sql/bulkmerge/merge_processor.go b/pkg/sql/bulkmerge/merge_processor.go new file mode 100644 index 000000000000..972db364403d --- /dev/null +++ b/pkg/sql/bulkmerge/merge_processor.go @@ -0,0 +1,102 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +var ( + _ execinfra.Processor = &bulkMergeProcessor{} + _ execinfra.RowSource = &bulkMergeProcessor{} +) + +// Output row format for the bulk merge processor. The third column contains +// a marshaled BulkMergeSpec_Output protobuf with the list of output SSTs. +var bulkMergeProcessorOutputTypes = []*types.T{ + types.Int4, // SQL Instance ID of the merge processor + types.Int4, // Task ID + types.Bytes, // Encoded list of output SSTs (BulkMergeSpec_Output protobuf) +} + +// bulkMergeProcessor accepts rows that include an assigned task id and emits +// rows that are (taskID, []output_sst) where output_sst is the name of SSTs +// that were produced by the merged output. +// +// The task ids are used to pick output [start, end) ranges to merge from the +// spec.spans. +// +// Task n is to process the input range from [spans[n].Key, spans[n].EndKey). +type bulkMergeProcessor struct { + execinfra.ProcessorBase + spec execinfrapb.BulkMergeSpec + input execinfra.RowSource +} + +func newBulkMergeProcessor( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + processorID int32, + spec execinfrapb.BulkMergeSpec, + post *execinfrapb.PostProcessSpec, + input execinfra.RowSource, +) (execinfra.Processor, error) { + mp := &bulkMergeProcessor{ + input: input, + spec: spec, + } + err := mp.Init( + ctx, mp, post, bulkMergeProcessorOutputTypes, flowCtx, processorID, nil, + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{input}, + }, + ) + if err != nil { + return nil, err + } + return mp, nil +} + +// Next implements execinfra.RowSource. +func (m *bulkMergeProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + for m.State == execinfra.StateRunning { + row, meta := m.input.Next() + switch { + case row == nil && meta == nil: + m.MoveToDraining(nil /* err */) + case meta != nil && meta.Err != nil: + m.MoveToDraining(meta.Err) + case meta != nil: + m.MoveToDraining(errors.Newf("unexpected meta: %v", meta)) + case row != nil: + base := *row[0].Datum.(*tree.DBytes) + return rowenc.EncDatumRow{ + rowenc.EncDatum{Datum: tree.NewDInt(1)}, // TODO(jeffswenson): SQL Instance ID + rowenc.EncDatum{Datum: tree.NewDInt(1)}, // TODO(jeffswenson): Task ID + rowenc.EncDatum{Datum: tree.NewDBytes(base + "->merge")}, // TODO(jeffswenson): output SST + }, nil + } + } + return nil, m.DrainHelper() +} + +// Start implements execinfra.RowSource. +func (m *bulkMergeProcessor) Start(ctx context.Context) { + m.StartInternal(ctx, "bulkMergeProcessor") + m.input.Start(ctx) +} + +func init() { + rowexec.NewBulkMergeProcessor = newBulkMergeProcessor +} diff --git a/pkg/sql/bulkmerge/merge_processor_test.go b/pkg/sql/bulkmerge/merge_processor_test.go new file mode 100644 index 000000000000..f74b324867ef --- /dev/null +++ b/pkg/sql/bulkmerge/merge_processor_test.go @@ -0,0 +1,54 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestBulkMergeProcessor(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + tc := testcluster.NewTestCluster(t, 1, base.TestClusterArgs{}) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + + // Just testing that the package is set up correctly. + sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) + sqlDB.Exec(t, `CREATE DATABASE test`) + + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + + diskMonitor := execinfra.NewTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) + + flowCtx := &execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Mon: evalCtx.TestingMon, + Cfg: &execinfra.ServerConfig{ + Settings: st, + }, + DiskMonitor: diskMonitor, + } + post := execinfrapb.PostProcessSpec{} + + _, err := newBulkMergeProcessor(ctx, flowCtx, 0, execinfrapb.BulkMergeSpec{}, &post, nil) + require.NoError(t, err) +} diff --git a/pkg/sql/bulkmerge/merge_test.go b/pkg/sql/bulkmerge/merge_test.go new file mode 100644 index 000000000000..e3de50916cb3 --- /dev/null +++ b/pkg/sql/bulkmerge/merge_test.go @@ -0,0 +1,76 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package bulkmerge + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestMergeProcessors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(jeffswenson): start a three node instance to ensure each instances + // gets a processor. + ctx := context.Background() + srv := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + + s := srv.ApplicationLayer() + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + + jobExecCtx, cleanup := sql.MakeJobExecContext( + ctx, "test", username.RootUserName(), &sql.MemoryMetrics{}, &execCfg, + ) + defer cleanup() + + plan, planCtx, err := newBulkMergePlan(ctx, jobExecCtx) + require.NoError(t, err) + defer plan.Release() + + require.Equal(t, plan.GetResultTypes(), mergeCoordinatorOutputTypes) + + var result string + rowWriter := sql.NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + result = string(*row[0].(*tree.DBytes)) + return nil + }) + + sqlReceiver := sql.MakeDistSQLReceiver( + ctx, + rowWriter, + tree.Rows, + execCfg.RangeDescriptorCache, + nil, + nil, + jobExecCtx.ExtendedEvalContext().Tracing) + defer sqlReceiver.Release() + + evalCtxCopy := jobExecCtx.ExtendedEvalContext().Context.Copy() + jobExecCtx.DistSQLPlanner().Run( + ctx, + planCtx, + nil, + plan, + sqlReceiver, + evalCtxCopy, + nil, + ) + + require.NoError(t, rowWriter.Err()) + require.Equal(t, result, "loopback->merge->coordinator") +} diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index e43d5260872a..bba96e04413d 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -341,6 +341,12 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo case core.VectorMutationSearch != nil: case core.CompactBackups != nil: return errCoreNotWorthWrapping + case core.BulkMerge != nil: + return errCoreNotWorthWrapping + case core.MergeCoordinator != nil: + return errCoreNotWorthWrapping + case core.MergeLoopback != nil: + return errCoreNotWorthWrapping default: err := errors.AssertionFailedf("unexpected processor core %q", core) if buildutil.CrdbTestBuild { diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 743295944c6e..8dd89175a71e 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -807,6 +807,18 @@ func (m *CompactBackupsSpec) summary() (string, []string) { return "CompactBackupsSpec", details } +func (m *BulkMergeSpec) summary() (string, []string) { + return "BulkMerge", nil +} + +func (m *MergeCoordinatorSpec) summary() (string, []string) { + return "MergeCoordinator", nil +} + +func (m *MergeLoopbackSpec) summary() (string, []string) { + return "MergeLoopback", nil +} + type diagramCell struct { Title string `json:"title"` Details []string `json:"details"` diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 140bcaaae933..ad1dbe1683a2 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -134,9 +134,12 @@ message ProcessorCoreUnion { optional VectorMutationSearchSpec vectorMutationSearch = 48; optional CompactBackupsSpec compactBackups = 49; optional InspectSpec inspect = 50; + optional BulkMergeSpec bulkMerge = 51; + optional MergeCoordinatorSpec mergeCoordinator = 52; + optional MergeLoopbackSpec mergeLoopback = 53; reserved 6, 12, 14, 17, 18, 19, 20, 32; - // NEXT ID: 51. + // NEXT ID: 54. } // NoopCoreSpec indicates a "no-op" processor core. This is used when we just diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index e1d0de42fedf..1795ac92c679 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -184,6 +184,20 @@ message ReadImportDataSpec { // NEXTID: 21. } +message MergeCoordinatorSpec { + optional int64 task_count = 1 [(gogoproto.nullable) = false]; + repeated bytes worker_sql_instance_ids = 2; + // NEXT ID: 3. +} + +// MergeLoopback is scheduled on the same node as the MergeCoordinator. +// MergeCoordinator is the final processor in the flow and MergeLoopback is the +// first processor in the flow. Scheduling them on the same node allows the +// MergeCoordinator to inject task rows into the process. +message MergeLoopbackSpec { + +} + message IngestStoppedSpec { optional int64 job_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID"]; @@ -591,27 +605,32 @@ message CompactBackupsSpec { } message BulkMergeSpec { - // SST represents metadata about a single SST file to be merged. - message SST { - optional string uri = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "URI"]; - // start_key is the first key in the SST. - optional bytes start_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; - // end_key is the last key in the SST. - optional bytes end_key = 3 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; - } - - // ssts is the list of input SSTs to merge. - repeated SST ssts = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "SSTs"]; - - // spans are the key ranges that should be merged. Each merge processor - // will be assigned one or more spans to merge. - repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false]; - - // output_uri is a URI prefix like 'nodelocal://1//merger'. The processor - // should use this as the base for all output files. - optional string output_uri = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "OutputURI"]; - - // NEXT ID: 4. + // SST represents metadata about a single SST file to be merged. + message SST { + // uri is a fully qualifed URI to the SST file. It will look something like + // 'nodelocal://2//input/0.sst' + optional string uri = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "URI"]; + // start_key is the first key in the SST. + optional bytes start_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + // end_key is the last key in the SST. + optional bytes end_key = 3 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + }; + + // ssts is the list of input SSTs to merge. + repeated SST ssts = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "SSTs"]; + + // spans are the merge tasks. E.g.the merge task `n` is to merge all the input data that overlaps with + // [spans[n].Key, spans[n].EndKey) + repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false]; + + // output_storage is the external storage connection where the merged output should be stored. + optional cloud.cloudpb.ExternalStorage output_storage = 3 [(gogoproto.nullable) = false]; + + message Output { + // output_ssts is the list of output SSTs. + repeated SST ssts = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "SSTs"]; + } + // NEXT ID: 4. } // IndexBackfillMapProgress is emitted by map-stage processors to describe newly diff --git a/pkg/sql/physicalplan/BUILD.bazel b/pkg/sql/physicalplan/BUILD.bazel index 6405bfb99d12..bff3763b93e5 100644 --- a/pkg/sql/physicalplan/BUILD.bazel +++ b/pkg/sql/physicalplan/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "expression.go", "fake_span_resolver.go", "physical_plan.go", + "routing.go", "span_resolver.go", "specs.go", ], @@ -22,10 +23,12 @@ go_library( "//pkg/roachpb", "//pkg/rpc", "//pkg/settings/cluster", + "//pkg/sql/catalog/catenumpb", "//pkg/sql/catalog/colinfo", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/physicalplan/replicaoracle", + "//pkg/sql/rowenc", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sem/tree/treebin", diff --git a/pkg/sql/physicalplan/routing.go b/pkg/sql/physicalplan/routing.go new file mode 100644 index 000000000000..6174d8483d26 --- /dev/null +++ b/pkg/sql/physicalplan/routing.go @@ -0,0 +1,92 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package physicalplan + +import ( + "bytes" + "fmt" + "slices" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +func RoutingKeyForSQLInstance(sqlInstanceID base.SQLInstanceID) roachpb.Key { + return roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID)) +} + +// RoutingDatumsForSQLInstance returns a pair of encoded datums representing the +// start and end keys for routing data to a specific SQL instance. This is used +// when setting up range-based routing in DistSQL physical plans. +func RoutingDatumsForSQLInstance( + sqlInstanceID base.SQLInstanceID, +) (rowenc.EncDatum, rowenc.EncDatum) { + routingBytes := RoutingKeyForSQLInstance(sqlInstanceID) + startDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes))) + endDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next()))) + return startDatum, endDatum +} + +// RoutingSpanForSQLInstance provides the encoded byte ranges to be used during +// DistSQL planning when setting up the output router for a specific SQL instance. +func RoutingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) { + var alloc tree.DatumAlloc + startDatum, endDatum := RoutingDatumsForSQLInstance(sqlInstanceID) + + var startBytes, endBytes []byte + startBytes, err := startDatum.Encode(types.Bytes, &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes) + if err != nil { + return nil, nil, err + } + endBytes, err = endDatum.Encode(types.Bytes, &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes) + if err != nil { + return nil, nil, err + } + return startBytes, endBytes, nil +} + +// MakeInstanceRouter creates a RangeRouterSpec that routes data to SQL instances +// based on routing keys. Each SQL instance gets assigned a span, and the router +// directs rows to the appropriate stream based on which span the routing key falls into. +func MakeInstanceRouter( + ids []base.SQLInstanceID, +) (execinfrapb.OutputRouterSpec_RangeRouterSpec, error) { + var zero execinfrapb.OutputRouterSpec_RangeRouterSpec + defaultStream := int32(0) + rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{ + Spans: nil, + DefaultDest: &defaultStream, + Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{ + { + Column: 0, + Encoding: catenumpb.DatumEncoding_ASCENDING_KEY, + }, + }, + } + for stream, sqlInstanceID := range ids { + startBytes, endBytes, err := RoutingSpanForSQLInstance(sqlInstanceID) + if err != nil { + return zero, err + } + + span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{ + Start: startBytes, + End: endBytes, + Stream: int32(stream), + } + rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span) + } + // The router expects the spans to be sorted. + slices.SortFunc(rangeRouterSpec.Spans, func(a, b execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) int { + return bytes.Compare(a.Start, b.Start) + }) + return rangeRouterSpec, nil +} diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index c2542d728589..106ceab873e6 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -422,6 +422,36 @@ func NewProcessor( } return NewCompactBackupsProcessor(ctx, flowCtx, processorID, *core.CompactBackups, post) } + if core.BulkMerge != nil { + if err := checkNumIn(inputs, 1); err != nil { + return nil, err + } + if NewBulkMergeProcessor == nil { + return nil, errors.New("BulkMerge processor unimplemented") + } + return NewBulkMergeProcessor(ctx, flowCtx, processorID, *core.BulkMerge, post, inputs[0]) + } + if core.MergeCoordinator != nil { + if err := checkNumIn(inputs, 1); err != nil { + return nil, err + } + if NewMergeCoordinatorProcessor == nil { + return nil, errors.New("MergeCoordinator processor unimplemented") + } + return NewMergeCoordinatorProcessor( + ctx, flowCtx, processorID, *core.MergeCoordinator, post, inputs[0], + ) + } + if core.MergeLoopback != nil { + if err := checkNumIn(inputs, 0); err != nil { + return nil, err + } + if NewMergeLoopbackProcessor == nil { + return nil, errors.New("MergeLoopback processor unimplemented") + } + return NewMergeLoopbackProcessor(ctx, flowCtx, processorID, *core.MergeLoopback, post) + } + return nil, errors.Errorf("unsupported processor core %q", core) } @@ -434,6 +464,32 @@ var NewCloudStorageTestProcessor func(context.Context, *execinfra.FlowCtx, int32 // NewIngestStoppedProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewIngestStoppedProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.IngestStoppedSpec, *execinfrapb.PostProcessSpec) (execinfra.Processor, error) +var NewBulkMergeProcessor func( + context.Context, + *execinfra.FlowCtx, + int32, + execinfrapb.BulkMergeSpec, + *execinfrapb.PostProcessSpec, + execinfra.RowSource, +) (execinfra.Processor, error) + +var NewMergeCoordinatorProcessor func( + context.Context, + *execinfra.FlowCtx, + int32, + execinfrapb.MergeCoordinatorSpec, + *execinfrapb.PostProcessSpec, + execinfra.RowSource, +) (execinfra.Processor, error) + +var NewMergeLoopbackProcessor func( + context.Context, + *execinfra.FlowCtx, + int32, + execinfrapb.MergeLoopbackSpec, + *execinfrapb.PostProcessSpec, +) (execinfra.Processor, error) + // NewBackupDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewBackupDataProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.BackupDataSpec, *execinfrapb.PostProcessSpec) (execinfra.Processor, error) From 00d346806d615a1fbd0fcc311bc6cd46da24015c Mon Sep 17 00:00:00 2001 From: Matt White Date: Tue, 25 Nov 2025 10:01:58 -0800 Subject: [PATCH 2/3] backup: deduplicate routing logic with physicalplan package Previously, the backup package contained duplicate implementations of routingDatumsForSQLInstance and routingSpanForSQLInstance that were identical to functions recently added to pkg/sql/physicalplan. This commit removes these duplicates and uses the shared implementations: RoutingDatumsForSQLInstance and MakeInstanceRouter. This reduces code duplication and ensures consistent routing behavior across DistSQL physical planning for both backup/restore and bulk merge operations. Informs #156580 Release note: None --- pkg/backup/BUILD.bazel | 1 - .../generative_split_and_scatter_processor.go | 32 ++---------------- pkg/backup/restore_processor_planning.go | 33 ++----------------- 3 files changed, 5 insertions(+), 61 deletions(-) diff --git a/pkg/backup/BUILD.bazel b/pkg/backup/BUILD.bazel index 0527da171c97..4d7618eb2d8e 100644 --- a/pkg/backup/BUILD.bazel +++ b/pkg/backup/BUILD.bazel @@ -83,7 +83,6 @@ go_library( "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", - "//pkg/sql/catalog/catenumpb", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/dbdesc", diff --git a/pkg/backup/generative_split_and_scatter_processor.go b/pkg/backup/generative_split_and_scatter_processor.go index cce9070d0183..58ec9e6d4517 100644 --- a/pkg/backup/generative_split_and_scatter_processor.go +++ b/pkg/backup/generative_split_and_scatter_processor.go @@ -7,7 +7,6 @@ package backup import ( "context" - "fmt" "hash/fnv" "math/rand" "strings" @@ -22,9 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -249,15 +248,6 @@ func (s dbSplitAndScatterer) findDestination(res *kvpb.AdminScatterResponse) roa return roachpb.NodeID(0) } -func routingDatumsForSQLInstance( - sqlInstanceID base.SQLInstanceID, -) (rowenc.EncDatum, rowenc.EncDatum) { - routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID)) - startDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes))) - endDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next()))) - return startDatum, endDatum -} - type entryNode struct { entry execinfrapb.RestoreSpanEntry node roachpb.NodeID @@ -393,7 +383,7 @@ func (gssp *generativeSplitAndScatterProcessor) Next() ( // The routing datums informs the router which output stream should be used. routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node) if !ok { - routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) + routingDatum, _ = physicalplan.RoutingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum) } @@ -745,24 +735,6 @@ var splitAndScatterOutputTypes = []*types.T{ types.Bytes, // RestoreDataEntry bytes } -// routingSpanForSQLInstance provides the mapping to be used during distsql planning -// when setting up the output router. -func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) { - var alloc tree.DatumAlloc - startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID) - - startBytes, endBytes := make([]byte, 0), make([]byte, 0) - startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes) - if err != nil { - return nil, nil, err - } - endBytes, err = endDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes) - if err != nil { - return nil, nil, err - } - return startBytes, endBytes, nil -} - func init() { rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor } diff --git a/pkg/backup/restore_processor_planning.go b/pkg/backup/restore_processor_planning.go index bb274969cac9..2ae138029fb4 100644 --- a/pkg/backup/restore_processor_planning.go +++ b/pkg/backup/restore_processor_planning.go @@ -6,10 +6,8 @@ package backup import ( - "bytes" "context" "math" - "slices" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -19,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -123,34 +120,10 @@ func distRestore( // Plan SplitAndScatter on the coordinator node. splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs) - defaultStream := int32(0) - rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{ - Spans: nil, - DefaultDest: &defaultStream, - Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{ - { - Column: 0, - Encoding: catenumpb.DatumEncoding_ASCENDING_KEY, - }, - }, - } - for stream, sqlInstanceID := range sqlInstanceIDs { - startBytes, endBytes, err := routingSpanForSQLInstance(sqlInstanceID) - if err != nil { - return nil, nil, err - } - - span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{ - Start: startBytes, - End: endBytes, - Stream: int32(stream), - } - rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span) + rangeRouterSpec, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs) + if err != nil { + return nil, nil, err } - // The router expects the spans to be sorted. - slices.SortFunc(rangeRouterSpec.Spans, func(a, b execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) int { - return bytes.Compare(a.Start, b.Start) - }) // TODO(pbardea): This not super principled. I just wanted something that // wasn't a constant and grew slower than linear with the length of From d96f8199745046909798d3c8f3f276df631e430f Mon Sep 17 00:00:00 2001 From: Matt White Date: Mon, 24 Nov 2025 14:53:40 -0800 Subject: [PATCH 3/3] physicalplan: set DefaultDest to nil to catch routing bugs Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0. When DefaultDest is nil, any routing key that doesn't match a span will produce an error rather than silently routing to an arbitrary stream. This helps catch coordination bugs early. Updates merge_loopback.go to generate a routing key for its SQL instance using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded "loopback" key. This ensures the routing key matches one of the spans in the router rather than relying on the DefaultDest fallback. The test expectation is updated from "loopback->merge->coordinator" to "node1->merge->coordinator" to reflect the explicit routing behavior. Fixes #156580 Release note: None --- pkg/sql/bulkmerge/merge_loopback.go | 6 +++++- pkg/sql/bulkmerge/merge_test.go | 4 +++- pkg/sql/physicalplan/routing.go | 9 ++++++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/sql/bulkmerge/merge_loopback.go b/pkg/sql/bulkmerge/merge_loopback.go index 8f2753a9e4a0..54d200e4314c 100644 --- a/pkg/sql/bulkmerge/merge_loopback.go +++ b/pkg/sql/bulkmerge/merge_loopback.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -41,8 +42,11 @@ func (m *mergeLoopback) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadat return nil, m.DrainHelper() } m.done = true + // Generate a routing key for the current SQL instance (where this processor is running). + // This ensures the routing key matches one of the spans in the range router. + routingDatum, _ := physicalplan.RoutingDatumsForSQLInstance(m.FlowCtx.NodeID.SQLInstanceID()) return rowenc.EncDatumRow{ - rowenc.EncDatum{Datum: tree.NewDBytes("loopback")}, + routingDatum, rowenc.EncDatum{Datum: tree.NewDInt(1)}, }, nil } diff --git a/pkg/sql/bulkmerge/merge_test.go b/pkg/sql/bulkmerge/merge_test.go index e3de50916cb3..eff936952656 100644 --- a/pkg/sql/bulkmerge/merge_test.go +++ b/pkg/sql/bulkmerge/merge_test.go @@ -72,5 +72,7 @@ func TestMergeProcessors(t *testing.T) { ) require.NoError(t, rowWriter.Err()) - require.Equal(t, result, "loopback->merge->coordinator") + // The output includes the routing key (e.g., "node1") from the SQL instance + // where the merge loopback processor runs. + require.Equal(t, result, "node1->merge->coordinator") } diff --git a/pkg/sql/physicalplan/routing.go b/pkg/sql/physicalplan/routing.go index 6174d8483d26..f1db5c89c214 100644 --- a/pkg/sql/physicalplan/routing.go +++ b/pkg/sql/physicalplan/routing.go @@ -60,10 +60,13 @@ func MakeInstanceRouter( ids []base.SQLInstanceID, ) (execinfrapb.OutputRouterSpec_RangeRouterSpec, error) { var zero execinfrapb.OutputRouterSpec_RangeRouterSpec - defaultStream := int32(0) rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{ - Spans: nil, - DefaultDest: &defaultStream, + Spans: nil, + // DefaultDest is nil so that any routing key that doesn't match a span + // will produce an error. This ensures we catch coordination bugs where + // a routing key is generated for an instance not in the router's span list, + // rather than silently routing to an arbitrary instance. + DefaultDest: nil, Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{ { Column: 0,