Skip to content

Commit 6a103f0

Browse files
committed
backup: deduplicate routing logic with physicalplan package
Previously, the backup package contained duplicate implementations of routingDatumsForSQLInstance and routingSpanForSQLInstance that were identical to functions recently added to pkg/sql/physicalplan. This commit removes these duplicates and uses the shared implementations: RoutingDatumsForSQLInstance and MakeInstanceRouter. This reduces code duplication and ensures consistent routing behavior across DistSQL physical planning for both backup/restore and bulk merge operations. Informs #156580 Release note: None
1 parent dad4cd0 commit 6a103f0

File tree

2 files changed

+5
-58
lines changed

2 files changed

+5
-58
lines changed

pkg/backup/generative_split_and_scatter_processor.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package backup
77

88
import (
99
"context"
10-
"fmt"
1110
"hash/fnv"
1211
"math/rand"
1312
"strings"
@@ -22,9 +21,9 @@ import (
2221
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2322
"github.com/cockroachdb/cockroach/pkg/roachpb"
2423
"github.com/cockroachdb/cockroach/pkg/sql"
25-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
2624
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2725
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
26+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2827
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2928
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
3029
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -249,14 +248,6 @@ func (s dbSplitAndScatterer) findDestination(res *kvpb.AdminScatterResponse) roa
249248
return roachpb.NodeID(0)
250249
}
251250

252-
func routingDatumsForSQLInstance(
253-
sqlInstanceID base.SQLInstanceID,
254-
) (rowenc.EncDatum, rowenc.EncDatum) {
255-
routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID))
256-
startDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes)))
257-
endDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next())))
258-
return startDatum, endDatum
259-
}
260251

261252
type entryNode struct {
262253
entry execinfrapb.RestoreSpanEntry
@@ -393,7 +384,7 @@ func (gssp *generativeSplitAndScatterProcessor) Next() (
393384
// The routing datums informs the router which output stream should be used.
394385
routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node)
395386
if !ok {
396-
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
387+
routingDatum, _ = physicalplan.RoutingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
397388
gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum)
398389
}
399390

@@ -745,23 +736,6 @@ var splitAndScatterOutputTypes = []*types.T{
745736
types.Bytes, // RestoreDataEntry bytes
746737
}
747738

748-
// routingSpanForSQLInstance provides the mapping to be used during distsql planning
749-
// when setting up the output router.
750-
func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) {
751-
var alloc tree.DatumAlloc
752-
startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID)
753-
754-
startBytes, endBytes := make([]byte, 0), make([]byte, 0)
755-
startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes)
756-
if err != nil {
757-
return nil, nil, err
758-
}
759-
endBytes, err = endDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes)
760-
if err != nil {
761-
return nil, nil, err
762-
}
763-
return startBytes, endBytes, nil
764-
}
765739

766740
func init() {
767741
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor

pkg/backup/restore_processor_planning.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
package backup
77

88
import (
9-
"bytes"
109
"context"
1110
"math"
12-
"slices"
1311

1412
"github.com/cockroachdb/cockroach/pkg/base"
1513
"github.com/cockroachdb/cockroach/pkg/cloud"
@@ -19,7 +17,6 @@ import (
1917
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2018
"github.com/cockroachdb/cockroach/pkg/roachpb"
2119
"github.com/cockroachdb/cockroach/pkg/sql"
22-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
2320
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2421
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2522
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -123,34 +120,10 @@ func distRestore(
123120
// Plan SplitAndScatter on the coordinator node.
124121
splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs)
125122

126-
defaultStream := int32(0)
127-
rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{
128-
Spans: nil,
129-
DefaultDest: &defaultStream,
130-
Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{
131-
{
132-
Column: 0,
133-
Encoding: catenumpb.DatumEncoding_ASCENDING_KEY,
134-
},
135-
},
136-
}
137-
for stream, sqlInstanceID := range sqlInstanceIDs {
138-
startBytes, endBytes, err := routingSpanForSQLInstance(sqlInstanceID)
139-
if err != nil {
140-
return nil, nil, err
141-
}
142-
143-
span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{
144-
Start: startBytes,
145-
End: endBytes,
146-
Stream: int32(stream),
147-
}
148-
rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span)
123+
rangeRouterSpec, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs)
124+
if err != nil {
125+
return nil, nil, err
149126
}
150-
// The router expects the spans to be sorted.
151-
slices.SortFunc(rangeRouterSpec.Spans, func(a, b execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) int {
152-
return bytes.Compare(a.Start, b.Start)
153-
})
154127

155128
// TODO(pbardea): This not super principled. I just wanted something that
156129
// wasn't a constant and grew slower than linear with the length of

0 commit comments

Comments
 (0)