Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

variable: tidb_pipelined_dml_resource_policy #57352

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4030,10 +4030,13 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
})

s.txn.changeToPending(&txnFuture{
future: future,
store: s.store,
txnScope: scope,
pipelined: s.usePipelinedDmlOrWarn(ctx),
future: future,
store: s.store,
txnScope: scope,
pipelined: s.usePipelinedDmlOrWarn(ctx),
pipelinedFlushConcurrency: s.GetSessionVars().PipelinedFlushConcurrency,
pipelinedResolveLockConcurrency: s.GetSessionVars().PipelinedResolveLockConcurrency,
pipelinedWriteThrottleRatio: s.GetSessionVars().PipelinedWriteThrottleRatio,
})
return nil
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,13 @@ func (txnFailFuture) Wait() (uint64, error) {

// txnFuture is a promise, which promises to return a txn in future.
type txnFuture struct {
future oracle.Future
store kv.Storage
txnScope string
pipelined bool
future oracle.Future
store kv.Storage
txnScope string
pipelined bool
pipelinedFlushConcurrency int
pipelinedResolveLockConcurrency int
pipelinedWriteThrottleRatio float64
}

func (tf *txnFuture) wait() (kv.Transaction, error) {
Expand All @@ -690,7 +693,14 @@ func (tf *txnFuture) wait() (kv.Transaction, error) {
}

if tf.pipelined {
options = append(options, tikv.WithDefaultPipelinedTxn())
options = append(
options,
tikv.WithPipelinedTxn(
tf.pipelinedFlushConcurrency,
tf.pipelinedResolveLockConcurrency,
tf.pipelinedWriteThrottleRatio,
),
)
}

return tf.store.Begin(options...)
Expand Down
31 changes: 31 additions & 0 deletions pkg/sessionctx/vardef/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,9 @@ const (
TiDBRCWriteCheckTs = "tidb_rc_write_check_ts"
// TiDBCommitterConcurrency controls the number of running concurrent requests in the commit phase.
TiDBCommitterConcurrency = "tidb_committer_concurrency"
// TiDBPipelinedDmlResourcePolicy controls the number of running concurrent requests in the
// pipelined flush action.
TiDBPipelinedDmlResourcePolicy = "tidb_pipelined_dml_resource_policy"
// TiDBEnableBatchDML enables batch dml.
TiDBEnableBatchDML = "tidb_enable_batch_dml"
// TiDBStatsCacheMemQuota records stats cache quota
Expand Down Expand Up @@ -1236,6 +1239,24 @@ const (
MaxPreSplitRegions = 15
)

// Pipelined-DML related constants
const (
// MinPipelinedDMLConcurrency is the minimum acceptable concurrency
MinPipelinedDMLConcurrency = 1
// MaxPipelinedDMLConcurrency is the maximum acceptable concurrency
MaxPipelinedDMLConcurrency = 8192

// DefaultFlushConcurrency is the default flush concurrency
DefaultFlushConcurrency = 128
// DefaultResolveConcurrency is the default resolve_lock concurrency
DefaultResolveConcurrency = 8

// ConservationFlushConcurrency is the flush concurrency in conservation mode
ConservationFlushConcurrency = 2
// ConservationResolveConcurrency is the resolve_lock concurrency in conservation mode
ConservationResolveConcurrency = 2
)

// Default TiDB system variable values.
const (
DefHostname = "localhost"
Expand Down Expand Up @@ -1432,6 +1453,7 @@ const (
DefTiDBQueryLogMaxLen = 4096
DefRequireSecureTransport = false
DefTiDBCommitterConcurrency = 128
DefTiDBPipelinedDmlResourcePolicy = StrategyPerformance
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
DefTiDBEnableAutoAnalyze = true
Expand Down Expand Up @@ -1887,6 +1909,15 @@ const (
// keep approximately 4 batched TSO requests running in parallel. This option tries to reduce the batch-waiting time
// by 3/4, at the expense of about 4 times the amount of TSO RPC calls.
TSOClientRPCModeParallelFast = "PARALLEL-FAST"

// StrategyPerformance is a choice of variable TiDBPipelinedDmlResourcePolicy,
// the best performance policy
StrategyPerformance = "performance"
// StrategyConservation is a choice of variable TiDBPipelinedDmlResourcePolicy,
// a rather conservative policy
StrategyConservation = "conservation"
// StrategyCustom is a choice of variable TiDBPipelinedDmlResourcePolicy,
StrategyCustom = "custom"
)

// Global config name list.
Expand Down
21 changes: 18 additions & 3 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ type SessionVars struct {
Concurrency
MemQuota
BatchSize
PipelinedDMLConfig
// DMLBatchSize indicates the number of rows batch-committed for a statement.
// It will be used when using LOAD DATA or BatchInsert or BatchDelete is on.
DMLBatchSize int
Expand Down Expand Up @@ -2958,9 +2959,6 @@ type Concurrency struct {

// IdleTransactionTimeout indicates the maximum time duration a transaction could be idle, unit is second.
IdleTransactionTimeout int

// BulkDMLEnabled indicates whether to enable bulk DML in pipelined mode.
BulkDMLEnabled bool
}

// SetIndexLookupConcurrency set the number of concurrent index lookup worker.
Expand Down Expand Up @@ -3158,6 +3156,23 @@ type BatchSize struct {
MaxPagingSize int
}

// PipelinedDMLConfig defines the configuration for pipelined DML.
type PipelinedDMLConfig struct {
// BulkDMLEnabled indicates whether to enable bulk DML in pipelined mode.
BulkDMLEnabled bool

// PipelinedFLushConcurrency indicates the number of concurrent worker for pipelined flush.
PipelinedFlushConcurrency int

// PipelinedResolveLockConcurrency indicates the number of concurrent worker for pipelined resolve lock.
PipelinedResolveLockConcurrency int

// PipelinedWriteThrottleRatio defines how the flush process is throttled
// by adding sleep intervals between flushes, to avoid overwhelming the storage layer.
// It is defined as: throttle_ratio = T_sleep / (T_sleep + T_flush)
PipelinedWriteThrottleRatio float64
}

const (
// SlowLogRowPrefixStr is slow log row prefix.
SlowLogRowPrefixStr = "# "
Expand Down
113 changes: 113 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3442,6 +3442,13 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{
Scope: vardef.ScopeGlobal | vardef.ScopeSession,
Name: vardef.TiDBPipelinedDmlResourcePolicy,
Value: vardef.DefTiDBPipelinedDmlResourcePolicy,
Type: vardef.TypeStr,
SetSession: setPipelinedDmlResourcePolicy,
},
}

// GlobalSystemVariableInitialValue gets the default value for a system variable including ones that are dynamically set (e.g. based on the store)
Expand Down Expand Up @@ -3483,3 +3490,109 @@ func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error {
s.TiFlashComputeDispatchPolicy = p
return nil
}

func setPipelinedDmlResourcePolicy(s *SessionVars, val string) error {
// ensure the value is trimmed and lowercased
val = strings.TrimSpace(val)
lowVal := strings.ToLower(val)
switch lowVal {
case vardef.StrategyPerformance:
s.PipelinedDMLConfig.PipelinedFlushConcurrency = vardef.DefaultFlushConcurrency
s.PipelinedDMLConfig.PipelinedResolveLockConcurrency = vardef.DefaultResolveConcurrency
s.PipelinedDMLConfig.PipelinedWriteThrottleRatio = 0
case vardef.StrategyConservation:
s.PipelinedDMLConfig.PipelinedFlushConcurrency = vardef.ConservationFlushConcurrency
s.PipelinedDMLConfig.PipelinedResolveLockConcurrency = vardef.ConservationResolveConcurrency
s.PipelinedDMLConfig.PipelinedWriteThrottleRatio = 0
default:
// Create a temporary config to hold new values to avoid partial application
newConfig := PipelinedDMLConfig{
PipelinedFlushConcurrency: vardef.DefaultFlushConcurrency,
PipelinedResolveLockConcurrency: vardef.DefaultResolveConcurrency,
PipelinedWriteThrottleRatio: 0,
}
Comment on lines +3509 to +3513
Copy link
Contributor

@you06 you06 Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
newConfig := PipelinedDMLConfig{
PipelinedFlushConcurrency: vardef.DefaultFlushConcurrency,
PipelinedResolveLockConcurrency: vardef.DefaultResolveConcurrency,
PipelinedWriteThrottleRatio: 0,
}
newConfig := PipelinedDMLConfig{
PipelinedFlushConcurrency: s.PipelinedFlushConcurrency,
PipelinedResolveLockConcurrency: s.PipelinedResolveLockConcurrency,
PipelinedWriteThrottleRatio: s.PipelinedWriteThrottleRatio,
}

Shall we keep the previous configuration if the user don't set it?

Unless some variables will be reset when user update another one.

set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=64}' -- PipelinedFlushConcurrency is 64
set @@tidb_pipelined_dml_resource_policy = 'custom{write_throttle_ratio=0.5}' -- PipelinedFlushConcurrency is reset to 128

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to just let the omitted fields be reset to the default value. Unless the actual variable may be different from show variables like "tidb_pipelined_dml_resource_policy".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some variables will be reset when user update another one

This is the intended behavior, as we're performing setting operations rather than updates, so the override behavior is appropriate.


// More flexible custom format validation
if !strings.HasPrefix(lowVal, vardef.StrategyCustom) {
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}

// Extract everything after "custom"
remaining := strings.TrimSpace(lowVal[len(vardef.StrategyCustom):])
if len(remaining) < 2 || !strings.HasPrefix(remaining, "{") || !strings.HasSuffix(remaining, "}") {
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}

// Extract and trim content between brackets
content := strings.TrimSpace(remaining[1 : len(remaining)-1])
if content == "" {
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}

// Split parameters
rawParams := strings.Split(content, ",")
for _, rawParam := range rawParams {
param := strings.TrimSpace(rawParam)
if param == "" {
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}

// Split key-values
parts := strings.FieldsFunc(param, func(r rune) bool {
return r == '=' || r == ':'
})

if len(parts) != 2 {
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}

key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])

switch key {
case "concurrency":
concurrency, err := strconv.ParseInt(value, 10, 64)
if err != nil || concurrency < vardef.MinPipelinedDMLConcurrency || concurrency > vardef.MaxPipelinedDMLConcurrency {
logutil.BgLogger().Warn(
"invalid concurrency value in pipelined DML resource policy",
zap.String("value", val),
zap.String("concurrency", value),
zap.Error(err),
)
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}
newConfig.PipelinedFlushConcurrency = int(concurrency)
case "resolve_concurrency":
concurrency, err := strconv.ParseInt(value, 10, 64)
if err != nil || concurrency < vardef.MinPipelinedDMLConcurrency || concurrency > vardef.MaxPipelinedDMLConcurrency {
logutil.BgLogger().Warn(
"invalid resolve_concurrency value in pipelined DML resource policy",
zap.String("value", val),
zap.String("resolve_concurrency", value),
zap.Error(err),
)
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}
newConfig.PipelinedResolveLockConcurrency = int(concurrency)
case "write_throttle_ratio":
ratio, err := strconv.ParseFloat(value, 64)
if err != nil || ratio < 0 || ratio >= 1 {
logutil.BgLogger().Warn(
"invalid write_throttle_ratio value in pipelined DML resource policy",
zap.String("value", val),
zap.String("write_throttle_ratio", value),
zap.Error(err),
)
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}
newConfig.PipelinedWriteThrottleRatio = ratio
default:
return ErrWrongValueForVar.FastGenByArgs(vardef.TiDBPipelinedDmlResourcePolicy, val)
}
}

// Only apply changes after all validation passed
s.PipelinedDMLConfig = newConfig
}
return nil
}
84 changes: 84 additions & 0 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,97 @@ func TestVariable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Original test cases...
require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, false)
tk.MustExec("set session tidb_dml_type = bulk")
require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, true)
tk.MustExec("set session tidb_dml_type = standard")
require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, false)
// not supported yet.
tk.MustExecToErr("set session tidb_dml_type = bulk(10)")

// Basic policy tests
tk.MustExec("set tidb_pipelined_dml_resource_policy = 'PERFORMANCE'")
tk.MustQuery("select @@tidb_pipelined_dml_resource_policy").Check(testkit.Rows("PERFORMANCE"))
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 128)
require.Equal(t, tk.Session().GetSessionVars().PipelinedResolveLockConcurrency, 8)
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'conserVation'")
tk.MustQuery("select @@tidb_pipelined_dml_resource_policy").Check(testkit.Rows("conserVation"))
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 2)
require.Equal(t, tk.Session().GetSessionVars().PipelinedResolveLockConcurrency, 2)
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'Performance'")
tk.MustQuery("select @@tidb_pipelined_dml_resource_policy").Check(testkit.Rows("Performance"))
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 128)
require.Equal(t, tk.Session().GetSessionVars().PipelinedResolveLockConcurrency, 8)

// Test custom configuration with valid values
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=64}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 64)
require.Equal(t, tk.Session().GetSessionVars().PipelinedResolveLockConcurrency, 8)

tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{write_throttle_ratio=0.5}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.5)

// Test multiple parameters
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=32,write_throttle_ratio=0.3}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 32)
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.3)

// Test different separators
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency:64,write_throttle_ratio:0.4}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 64)
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.4)

// Test whitespace handling
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = ' custom { concurrency = 64 , write_throttle_ratio = 0.4 } '")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 64)
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.4)

// Test case insensitivity
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'CUSTOM{CONCURRENCY=64,WRITE_THROTTLE_RATIO=0.4}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 64)
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.4)

// Test boundary values
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=1}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, 1)
tk.MustExec("set @@tidb_pipelined_dml_resource_policy = 'custom{write_throttle_ratio=0.999}'")
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, 0.999)

// Test error cases and state consistency
origFlushConcurrency := tk.Session().GetSessionVars().PipelinedFlushConcurrency
origResolveConcurrency := tk.Session().GetSessionVars().PipelinedResolveLockConcurrency
origThrottleRatio := tk.Session().GetSessionVars().PipelinedWriteThrottleRatio

// Invalid format
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, origFlushConcurrency)
require.Equal(t, tk.Session().GetSessionVars().PipelinedResolveLockConcurrency, origResolveConcurrency)
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, origThrottleRatio)

// Empty content
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom{}'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, origFlushConcurrency)

// Invalid parameter name
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom{unknown=1}'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, origFlushConcurrency)

// Out of range values
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=8193}'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, origFlushConcurrency)
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom{write_throttle_ratio=1.1}'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedWriteThrottleRatio, origThrottleRatio)

// Malformed values
tk.MustExecToErr("set @@tidb_pipelined_dml_resource_policy = 'custom{concurrency=abc}'", ".*")
require.Equal(t, tk.Session().GetSessionVars().PipelinedFlushConcurrency, origFlushConcurrency)

// Test global scope
tk.MustExec("set global tidb_pipelined_dml_resource_policy = 'custom{concurrency=64,write_throttle_ratio=0.4}'")
tk2 := testkit.NewTestKit(t, store)
tk2.MustQuery("select @@global.tidb_pipelined_dml_resource_policy").Check(testkit.Rows("custom{concurrency=64,write_throttle_ratio=0.4}"))
}

// We limit this feature only for cases meet all the following conditions:
Expand Down