Skip to content

Commit b467894

Browse files
craig[bot]dt
andcommitted
Merge #158361
158361: crosscluster/logical: add CPU pacing to batch handling r=dt a=dt Release note (performance improvement): More of the CPU usage of LDR jobs is subject to background job admission control limits. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents 7ac9a86 + 0e00af4 commit b467894

File tree

4 files changed

+23
-0
lines changed

4 files changed

+23
-0
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ go_library(
9292
"//pkg/sql/syntheticprivilege",
9393
"//pkg/sql/types",
9494
"//pkg/storage",
95+
"//pkg/util/admission",
9596
"//pkg/util/admission/admissionpb",
9697
"//pkg/util/buildutil",
9798
"//pkg/util/bulk",

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
2222
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2323
"github.com/cockroachdb/cockroach/pkg/keys"
24+
kvbulk "github.com/cockroachdb/cockroach/pkg/kv/bulk"
2425
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
2526
"github.com/cockroachdb/cockroach/pkg/roachpb"
2627
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -40,6 +41,7 @@ import (
4041
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4142
"github.com/cockroachdb/cockroach/pkg/sql/stats"
4243
"github.com/cockroachdb/cockroach/pkg/sql/types"
44+
"github.com/cockroachdb/cockroach/pkg/util/admission"
4345
"github.com/cockroachdb/cockroach/pkg/util/bulk"
4446
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4547
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -148,6 +150,8 @@ type logicalReplicationWriterProcessor struct {
148150
dupeCount int64
149151
seenEvery log.EveryN
150152
retryEvery log.EveryN
153+
154+
pacer *admission.Pacer
151155
}
152156

153157
var (
@@ -229,6 +233,7 @@ func newLogicalReplicationWriterProcessor(
229233
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics),
230234
seenEvery: log.Every(1 * time.Minute),
231235
retryEvery: log.Every(1 * time.Minute),
236+
pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority),
232237
}
233238
lrw.purgatory = purgatory{
234239
deadline: func() time.Duration { return retryQueueAgeLimit.Get(&flowCtx.Cfg.Settings.SV) },

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1414
"github.com/cockroachdb/cockroach/pkg/keys"
1515
"github.com/cockroachdb/cockroach/pkg/kv"
16+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1718
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -29,6 +30,7 @@ import (
2930
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
3031
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3132
"github.com/cockroachdb/cockroach/pkg/sql/stats"
33+
"github.com/cockroachdb/cockroach/pkg/util/admission"
3234
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
3335
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3436
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -47,6 +49,8 @@ type kvRowProcessor struct {
4749
dstBySrc map[descpb.ID]descpb.ID
4850
writers map[descpb.ID]*kvTableWriter
4951

52+
pacer *admission.Pacer
53+
5054
failureInjector
5155
}
5256

@@ -79,6 +83,7 @@ func newKVRowProcessor(
7983
dstBySrc: dstBySrc,
8084
writers: make(map[descpb.ID]*kvTableWriter, len(procConfigByDestID)),
8185
decoder: decoder,
86+
pacer: bulk.NewCPUPacer(ctx, cfg.DB.KV(), useLowPriority),
8287
}
8388
return p, nil
8489
}
@@ -119,6 +124,10 @@ func (p *kvRowProcessor) HandleBatch(
119124
ctx, sp := tracing.ChildSpan(ctx, "kvRowProcessor.HandleBatch")
120125
defer sp.Finish()
121126

127+
if _, err := p.pacer.Pace(ctx); err != nil {
128+
return batchStats{}, err
129+
}
130+
122131
if len(batch) == 1 {
123132
stats := batchStats{}
124133
if p.spec.Discard == jobspb.LogicalReplicationDetails_DiscardAllDeletes && len(batch[0].KeyValue.Value.RawBytes) == 0 {

pkg/crosscluster/logical/sql_crud_writer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1212
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
1314
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1415
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1516
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -18,6 +19,7 @@ import (
1819
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
1920
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2021
"github.com/cockroachdb/cockroach/pkg/sql/stats"
22+
"github.com/cockroachdb/cockroach/pkg/util/admission"
2123
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2224
)
2325

@@ -32,6 +34,7 @@ type sqlCrudWriter struct {
3234
handlers map[descpb.ID]*tableHandler
3335
settings *cluster.Settings
3436
discard jobspb.LogicalReplicationDetails_Discard
37+
pacer *admission.Pacer
3538
}
3639

3740
var _ BatchHandler = &sqlCrudWriter{}
@@ -81,6 +84,7 @@ func newCrudSqlWriter(
8184
handlers: handlers,
8285
settings: evalCtx.Settings,
8386
discard: discard,
87+
pacer: bulk.NewCPUPacer(ctx, cfg.DB.KV(), useLowPriority),
8488
}, nil
8589
}
8690

@@ -90,6 +94,10 @@ func (c *sqlCrudWriter) HandleBatch(
9094
ctx, sp := tracing.ChildSpan(ctx, "crudBatcher.HandleBatch")
9195
defer sp.Finish()
9296

97+
if _, err := c.pacer.Pace(ctx); err != nil {
98+
return batchStats{}, err
99+
}
100+
93101
sortedEvents, err := c.decoder.decodeAndCoalesceEvents(ctx, batch, c.discard)
94102
if err != nil {
95103
return batchStats{}, err

0 commit comments

Comments
 (0)