Skip to content

Commit dad4cd0

Browse files
jeffswensonmw5h
authored andcommitted
bulkmerge: add processors and DistSQL physical planning
This commit introduces the initial implementation of bulkmerge processors for distributed merge operations. It adds three new processor types: MergeLoopback, BulkMerge, and MergeCoordinator, along with the DistSQL physical planning infrastructure to orchestrate them. The MergeLoopback processor runs on the coordinator node and generates initial tasks. The BulkMerge processors run on each SQL instance to perform merge operations. The MergeCoordinator collects results from all merge processors. This commit also updates the vectorized execution engine (execplan.go) to recognize the new processor cores, preventing panics when these processors are encountered. Informs #156580 Release note: None
1 parent 5d7e8e4 commit dad4cd0

File tree

16 files changed

+776
-22
lines changed

16 files changed

+776
-22
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ ALL_TESTS = [
397397
"//pkg/sql/auditlogging:auditlogging_test",
398398
"//pkg/sql/backfill:backfill_test",
399399
"//pkg/sql/bulkingest:bulkingest_test",
400+
"//pkg/sql/bulkmerge:bulkmerge_test",
400401
"//pkg/sql/bulksst:bulksst_test",
401402
"//pkg/sql/bulkutil:bulkutil_test",
402403
"//pkg/sql/cacheutil:cacheutil_test",
@@ -1885,6 +1886,8 @@ GO_TARGETS = [
18851886
"//pkg/sql/backfill:backfill_test",
18861887
"//pkg/sql/bulkingest:bulkingest",
18871888
"//pkg/sql/bulkingest:bulkingest_test",
1889+
"//pkg/sql/bulkmerge:bulkmerge",
1890+
"//pkg/sql/bulkmerge:bulkmerge_test",
18881891
"//pkg/sql/bulksst:bulksst",
18891892
"//pkg/sql/bulksst:bulksst_test",
18901893
"//pkg/sql/bulkutil:bulkutil",

pkg/sql/bulkmerge/BUILD.bazel

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright 2025 The Cockroach Authors.
2+
#
3+
# Use of this software is governed by the CockroachDB Software License
4+
# included in the /LICENSE file.
5+
6+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
7+
8+
go_library(
9+
name = "bulkmerge",
10+
srcs = [
11+
"merge_coordinator.go",
12+
"merge_loopback.go",
13+
"merge_planning.go",
14+
"merge_processor.go",
15+
],
16+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkmerge",
17+
visibility = ["//visibility:public"],
18+
deps = [
19+
"//pkg/base",
20+
"//pkg/sql",
21+
"//pkg/sql/execinfra",
22+
"//pkg/sql/execinfrapb",
23+
"//pkg/sql/physicalplan",
24+
"//pkg/sql/rowenc",
25+
"//pkg/sql/rowexec",
26+
"//pkg/sql/sem/tree",
27+
"//pkg/sql/types",
28+
"@com_github_cockroachdb_errors//:errors",
29+
],
30+
)
31+
32+
go_test(
33+
name = "bulkmerge_test",
34+
srcs = [
35+
"main_test.go",
36+
"merge_processor_test.go",
37+
"merge_test.go",
38+
],
39+
embed = [":bulkmerge"],
40+
deps = [
41+
"//pkg/base",
42+
"//pkg/kv/kvclient/kvtenant",
43+
"//pkg/security/securityassets",
44+
"//pkg/security/securitytest",
45+
"//pkg/security/username",
46+
"//pkg/server",
47+
"//pkg/sql",
48+
"//pkg/sql/sem/tree",
49+
"//pkg/testutils/serverutils",
50+
"//pkg/testutils/sqlutils",
51+
"//pkg/testutils/testcluster",
52+
"//pkg/util/leaktest",
53+
"//pkg/util/log",
54+
"//pkg/util/randutil",
55+
"@com_github_stretchr_testify//require",
56+
],
57+
)

pkg/sql/bulkmerge/main_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkmerge
7+
8+
import (
9+
"os"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
13+
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
14+
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
15+
"github.com/cockroachdb/cockroach/pkg/server"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
18+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
19+
)
20+
21+
//go:generate ../util/leaktest/add-leaktest.sh *_test.go
22+
23+
func TestMain(m *testing.M) {
24+
securityassets.SetLoader(securitytest.EmbeddedAssets)
25+
randutil.SeedForTests()
26+
serverutils.InitTestServerFactory(server.TestServerFactory)
27+
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
28+
kvtenant.InitTestConnectorFactory()
29+
os.Exit(m.Run())
30+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkmerge
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
12+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
14+
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/sql/types"
17+
"github.com/cockroachdb/errors"
18+
)
19+
20+
var (
21+
_ execinfra.Processor = &mergeCoordinator{}
22+
_ execinfra.RowSource = &mergeCoordinator{}
23+
)
24+
25+
// Emits a single row on completion which is a protobuf containing the details
26+
// of the merged SSTs.
27+
// TODO(jeffswenson): define the protobuf
28+
var mergeCoordinatorOutputTypes = []*types.T{
29+
types.Bytes,
30+
}
31+
32+
type mergeCoordinator struct {
33+
execinfra.ProcessorBase
34+
input execinfra.RowSource
35+
}
36+
37+
// Next implements execinfra.RowSource.
38+
func (m *mergeCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
39+
for m.State == execinfra.StateRunning {
40+
row, meta := m.input.Next()
41+
switch {
42+
case row == nil && meta == nil:
43+
m.MoveToDraining(nil /* err */)
44+
break
45+
case meta != nil && meta.Err != nil:
46+
m.MoveToDraining(meta.Err)
47+
break
48+
case meta != nil:
49+
m.MoveToDraining(errors.Newf("unexpected meta: %v", meta))
50+
break
51+
case row != nil:
52+
base := *row[2].Datum.(*tree.DBytes)
53+
return rowenc.EncDatumRow{
54+
rowenc.EncDatum{Datum: tree.NewDBytes(base + "->coordinator")},
55+
}, nil
56+
}
57+
}
58+
return nil, m.DrainHelper()
59+
}
60+
61+
// Start implements execinfra.RowSource.
62+
func (m *mergeCoordinator) Start(ctx context.Context) {
63+
m.StartInternal(ctx, "mergeCoordinator")
64+
m.input.Start(ctx)
65+
}
66+
67+
func init() {
68+
rowexec.NewMergeCoordinatorProcessor = func(
69+
ctx context.Context,
70+
flow *execinfra.FlowCtx,
71+
flowID int32,
72+
spec execinfrapb.MergeCoordinatorSpec,
73+
postSpec *execinfrapb.PostProcessSpec,
74+
input execinfra.RowSource,
75+
) (execinfra.Processor, error) {
76+
mc := &mergeCoordinator{
77+
input: input,
78+
}
79+
err := mc.Init(
80+
ctx, mc, postSpec, mergeCoordinatorOutputTypes, flow, flowID, nil,
81+
execinfra.ProcStateOpts{
82+
InputsToDrain: []execinfra.RowSource{input},
83+
},
84+
)
85+
if err != nil {
86+
return nil, err
87+
}
88+
return mc, nil
89+
}
90+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkmerge
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
12+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
14+
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/sql/types"
17+
)
18+
19+
var (
20+
_ execinfra.Processor = &mergeLoopback{}
21+
_ execinfra.RowSource = &mergeLoopback{}
22+
)
23+
24+
var mergeLoopbackOutputTypes = []*types.T{
25+
// Span key for the range router. It encodes the destination
26+
// processor's SQL instance ID.
27+
types.Bytes,
28+
// Task ID
29+
types.Int4,
30+
}
31+
32+
type mergeLoopback struct {
33+
execinfra.ProcessorBase
34+
done bool
35+
}
36+
37+
// Next implements execinfra.RowSource.
38+
func (m *mergeLoopback) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
39+
if m.done {
40+
m.MoveToDraining(nil)
41+
return nil, m.DrainHelper()
42+
}
43+
m.done = true
44+
return rowenc.EncDatumRow{
45+
rowenc.EncDatum{Datum: tree.NewDBytes("loopback")},
46+
rowenc.EncDatum{Datum: tree.NewDInt(1)},
47+
}, nil
48+
}
49+
50+
// Start implements execinfra.RowSource.
51+
func (m *mergeLoopback) Start(ctx context.Context) {
52+
m.StartInternal(ctx, "mergeLoopback")
53+
// TODO(jeffswenson): create the initial set of tasks
54+
}
55+
56+
func init() {
57+
rowexec.NewMergeLoopbackProcessor = func(
58+
ctx context.Context,
59+
flow *execinfra.FlowCtx,
60+
flowID int32,
61+
spec execinfrapb.MergeLoopbackSpec,
62+
postSpec *execinfrapb.PostProcessSpec,
63+
) (execinfra.Processor, error) {
64+
ml := &mergeLoopback{}
65+
err := ml.Init(
66+
ctx, ml, postSpec, mergeLoopbackOutputTypes, flow, flowID, nil,
67+
execinfra.ProcStateOpts{},
68+
)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return ml, nil
73+
}
74+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkmerge
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/base"
12+
"github.com/cockroachdb/cockroach/pkg/sql"
13+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
func newBulkMergePlan(
19+
ctx context.Context, execCtx sql.JobExecContext,
20+
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
21+
// NOTE: This implementation is inspired by the physical plan created by
22+
// restore in `pkg/backup/restore_processor_planning.go`
23+
planCtx, sqlInstanceIDs, err := execCtx.DistSQLPlanner().SetupAllNodesPlanning(
24+
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg())
25+
if err != nil {
26+
return nil, nil, err
27+
}
28+
29+
plan := planCtx.NewPhysicalPlan()
30+
// Use the gateway node as the coordinator, which is where the job was initiated.
31+
coordinatorID := plan.GatewaySQLInstanceID
32+
33+
router, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs)
34+
if err != nil {
35+
return nil, nil, errors.Wrap(err, "unable to make instance router")
36+
}
37+
38+
loopbackID := plan.AddProcessor(physicalplan.Processor{
39+
SQLInstanceID: coordinatorID,
40+
Spec: execinfrapb.ProcessorSpec{
41+
Core: execinfrapb.ProcessorCoreUnion{
42+
MergeLoopback: &execinfrapb.MergeLoopbackSpec{},
43+
},
44+
Post: execinfrapb.PostProcessSpec{},
45+
Output: []execinfrapb.OutputRouterSpec{{
46+
Type: execinfrapb.OutputRouterSpec_BY_RANGE,
47+
RangeRouterSpec: router,
48+
}},
49+
StageID: plan.NewStageOnNodes([]base.SQLInstanceID{coordinatorID}),
50+
ResultTypes: mergeLoopbackOutputTypes,
51+
},
52+
})
53+
54+
mergeStage := plan.NewStageOnNodes(sqlInstanceIDs)
55+
for streamID, sqlInstanceID := range sqlInstanceIDs {
56+
pIdx := plan.AddProcessor(physicalplan.Processor{
57+
SQLInstanceID: sqlInstanceID,
58+
Spec: execinfrapb.ProcessorSpec{
59+
Input: []execinfrapb.InputSyncSpec{{
60+
ColumnTypes: mergeLoopbackOutputTypes,
61+
}},
62+
Core: execinfrapb.ProcessorCoreUnion{
63+
BulkMerge: &execinfrapb.BulkMergeSpec{
64+
// TODO(jeffswenson): fill in the rest of the spec
65+
},
66+
},
67+
Post: execinfrapb.PostProcessSpec{},
68+
Output: []execinfrapb.OutputRouterSpec{{
69+
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
70+
}},
71+
StageID: mergeStage,
72+
ResultTypes: bulkMergeProcessorOutputTypes,
73+
},
74+
})
75+
plan.Streams = append(plan.Streams, physicalplan.Stream{
76+
SourceProcessor: loopbackID,
77+
SourceRouterSlot: streamID,
78+
DestProcessor: pIdx,
79+
DestInput: 0,
80+
})
81+
plan.ResultRouters = append(plan.ResultRouters, pIdx)
82+
}
83+
84+
plan.AddSingleGroupStage(ctx, coordinatorID, execinfrapb.ProcessorCoreUnion{
85+
MergeCoordinator: &execinfrapb.MergeCoordinatorSpec{
86+
// TODO fill in the rest of the spec
87+
},
88+
}, execinfrapb.PostProcessSpec{}, mergeCoordinatorOutputTypes, nil /* finalizeLastStageCb */)
89+
90+
plan.PlanToStreamColMap = []int{0} // Needed for FinalizePlan to populate ResultTypes
91+
sql.FinalizePlan(ctx, planCtx, plan)
92+
93+
return plan, planCtx, nil
94+
}

0 commit comments

Comments
 (0)