Skip to content

Commit 5de1e7f

Browse files
committed
physicalplan: set DefaultDest to nil to catch routing bugs
Changes MakeInstanceRouter to set DefaultDest to nil instead of stream 0. When DefaultDest is nil, any routing key that doesn't match a span will produce an error rather than silently routing to an arbitrary stream. This helps catch coordination bugs early. Updates merge_loopback.go to generate a routing key for its SQL instance using physicalplan.RoutingDatumsForSQLInstance instead of a hardcoded "loopback" key. This ensures the routing key matches one of the spans in the router rather than relying on the DefaultDest fallback. The test expectation is updated from "loopback->merge->coordinator" to "node1->merge->coordinator" to reflect the explicit routing behavior. Fixes #156580 Release note: None
1 parent d5b6bdc commit 5de1e7f

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

pkg/sql/bulkmerge/merge_loopback.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
1212
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
1314
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
1415
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
1516
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -41,8 +42,11 @@ func (m *mergeLoopback) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadat
4142
return nil, m.DrainHelper()
4243
}
4344
m.done = true
45+
// Generate a routing key for the current SQL instance (where this processor is running).
46+
// This ensures the routing key matches one of the spans in the range router.
47+
routingDatum, _ := physicalplan.RoutingDatumsForSQLInstance(m.FlowCtx.NodeID.SQLInstanceID())
4448
return rowenc.EncDatumRow{
45-
rowenc.EncDatum{Datum: tree.NewDBytes("loopback")},
49+
routingDatum,
4650
rowenc.EncDatum{Datum: tree.NewDInt(1)},
4751
}, nil
4852
}

pkg/sql/bulkmerge/merge_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,7 @@ func TestMergeProcessors(t *testing.T) {
7272
)
7373

7474
require.NoError(t, rowWriter.Err())
75-
require.Equal(t, result, "loopback->merge->coordinator")
75+
// The output includes the routing key (e.g., "node1") from the SQL instance
76+
// where the merge loopback processor runs.
77+
require.Equal(t, result, "node1->merge->coordinator")
7678
}

pkg/sql/physicalplan/routing.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ func MakeInstanceRouter(
5656
ids []base.SQLInstanceID,
5757
) (execinfrapb.OutputRouterSpec_RangeRouterSpec, error) {
5858
var zero execinfrapb.OutputRouterSpec_RangeRouterSpec
59-
defaultStream := int32(0)
6059
rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{
61-
Spans: nil,
62-
DefaultDest: &defaultStream,
60+
Spans: nil,
61+
// DefaultDest is nil so that any routing key that doesn't match a span
62+
// will produce an error. This ensures we catch coordination bugs where
63+
// a routing key is generated for an instance not in the router's span list,
64+
// rather than silently routing to an arbitrary instance.
65+
DefaultDest: nil,
6366
Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{
6467
{
6568
Column: 0,

0 commit comments

Comments
 (0)