Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete txn scope code #1587

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type Config struct {
OpenTracingEnable bool
Path string
EnableForwarding bool
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
// FIXME: rename
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
// RegionsRefreshInterval indicates the interval of loading regions info, the unit is second, if RegionsRefreshInterval == 0, it will be disabled.
RegionsRefreshInterval uint64
// EnablePreload indicates whether to preload region info when initializing the client.
Expand Down
25 changes: 1 addition & 24 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *testOnePCSuite) Test1PCIsolation() {
// Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs
// calculation.
for i := 0; i < 10; i++ {
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -244,29 +244,6 @@ func (s *testOnePCSuite) Test1PCLinearizability() {
s.Less(commitTS2, commitTS1)
}

func (s *testOnePCSuite) Test1PCWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.begin1PC()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
err = localTxn.Commit(context.Background())
s.Nil(err)
s.False(localTxn.GetCommitter().IsOnePC())

globalTxn := s.begin1PC()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(context.Background())
s.Nil(err)
s.True(globalTxn.GetCommitter().IsOnePC())
}

func (s *testOnePCSuite) TestTxnCommitCounter() {
initial := metrics.GetTxnCommitCounter()

Expand Down
10 changes: 5 additions & 5 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (s *testCommitterSuite) TestPrewriteRollback() {
err = committer.PrewriteAllMutations(ctx)
s.Nil(err)
}
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
committer.SetCommitTS(commitTS)
err = committer.CommitMutations(ctx)
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
err = txn.LockKeys(context.Background(), lockCtx, key2)
s.Nil(err)
lockInfo := s.getLockInfo(key)
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{})
s.GreaterOrEqual(msBeforeLockExpired, int64(100))

lr := s.store.NewLockResolver()
Expand All @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
check := func() bool {
lockInfoNew := s.getLockInfo(key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{})
s.Nil(err)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl)
Expand Down Expand Up @@ -1583,7 +1583,7 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
Expand Down Expand Up @@ -1662,7 +1662,7 @@ func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfter
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
Expand Down
39 changes: 7 additions & 32 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr
s.Nil(err)

if commitPrimary {
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
tpc.SetCommitTS(commitTS)
err = tpc.CommitMutations(ctx)
Expand All @@ -257,23 +257,23 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false)
lock := s.mustGetLock([]byte("z"))
lock.UseAsyncCommit = true
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
var lockutil txnlock.LockProbe
status := lockutil.NewLockStatus(nil, true, ts)

resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
s.Nil(err)
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil)
s.Nil(err)
s.True(status.IsCommitted())
s.Equal(status.CommitTS(), ts)

// One key is committed (i), one key is locked (a). Should get committed.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs := ts + 10

Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.Equal(gotResolve, int64(1))

// One key has been rolled back (b), one is locked (a). Should be rolled back.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs = ts + 10

Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead() {
txn1.Set([]byte("k1"), []byte("v2"))

for i := 0; i < 20; i++ {
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -445,31 +445,6 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Less(commitTS2, commitTS1)
}

// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions
func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.beginAsyncCommit()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
err = localTxn.Commit(ctx)
s.Nil(err)
s.False(localTxn.IsAsyncCommit())

globalTxn := s.beginAsyncCommit()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(ctx)
s.Nil(err)
s.True(globalTxn.IsAsyncCommit())
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
Expand Down Expand Up @@ -620,7 +595,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
lock := s.mustGetLock([]byte("a"))
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
for {
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
s.Nil(err)
Expand Down
20 changes: 10 additions & 10 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl
s.Nil(err)

if commitPrimary {
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
tpc.SetCommitTS(commitTS)
err = tpc.CommitMutations(ctx)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() {

bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
lr := s.store.NewLockResolver()
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{})
s.Nil(err)

// Check the lock TTL of a transaction.
Expand Down Expand Up @@ -324,7 +324,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.prewriteTxnWithTTL(txn, 1000)

o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
s.Greater(currentTS, txn.StartTS())

Expand All @@ -351,7 +351,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.Equal(timeBeforeExpire, int64(0))

// Then call getTxnStatus again and check the lock status.
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil)
s.Nil(err)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
s.Nil(err)

o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
resolver := s.store.NewLockResolver()
Expand Down Expand Up @@ -412,7 +412,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
s.Nil(committer.CleanupMutations(context.Background()))

// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
lock = &txnkv.Lock{
Key: []byte("second"),
Expand Down Expand Up @@ -538,9 +538,9 @@ func (s *testLockSuite) TestBatchResolveLocks() {
}

// Locks may not expired
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{})
s.Greater(msBeforeLockExpired, int64(0))
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{})
s.Greater(msBeforeLockExpired, int64(0))

lr := s.store.NewLockResolver()
Expand Down Expand Up @@ -961,10 +961,10 @@ func (s *testLockSuite) TestResolveLocksForRead() {
s.Nil(committer.PrewriteAllMutations(ctx))
committer.SetPrimaryKey([]byte("k66"))

readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)

commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
s.Greater(commitTS, readStartTS)
committer.SetCommitTS(commitTS)
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ func (s *RegionRequestSender) validateReadTS(ctx context.Context, req *tikvrpc.R
default:
return nil
}
return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{TxnScope: req.TxnScope})
return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{})
}

func patchRequestSource(req *tikvrpc.Request, replicaType string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS()
}

getTS := func() uint64 {
ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{})
s.NoError(err)
return ts
}
Expand Down
10 changes: 0 additions & 10 deletions internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,6 @@ func (s *Store) IsStoreMatch(stores []uint64) bool {
return false
}

// GetLabelValue returns the value of the label
func (s *Store) GetLabelValue(key string) (string, bool) {
for _, label := range s.labels {
if label.Key == key {
return label.Value, true
}
}
return "", false
}

// IsSameLabels returns whether the store have the same labels with target labels
func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool {
if len(s.labels) != len(labels) {
Expand Down
4 changes: 2 additions & 2 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (
)

// Option represents available options for the oracle.Oracle.
// TODO: remove this struct
type Option struct {
TxnScope string
}

// Oracle is the interface that provides strictly ascending timestamps.
Expand All @@ -58,7 +58,7 @@ type Oracle interface {
// WARNING: This method does not guarantee whether the generated timestamp is legal for accessing the data.
// Neither is it safe to use it for verifying the legality of another calculated timestamp.
// Be sure to validate the timestamp before using it to access the data.
GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error)
GetStaleTimestamp(ctx context.Context, prevSecond uint64) (uint64, error)
IsExpired(lockTimestamp, TTL uint64, opt *Option) bool
UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64
Close()
Expand Down
5 changes: 2 additions & 3 deletions oracle/oracles/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func StartTsUpdateLoop(o oracle.Oracle, ctx context.Context, wg *sync.WaitGroup)
func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) {
switch o := oc.(type) {
case *pdOracle:
lastTSInterface, _ := o.lastTSMap.LoadOrStore(oracle.GlobalTxnScope, &atomic.Pointer[lastTSO]{})
lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO])
lastTSPointer.Store(&lastTSO{tso: ts, arrival: oracle.GetTimeFromTS(ts)})
o.lastTS = &atomic.Pointer[lastTSO]{}
o.lastTS.Store(&lastTSO{tso: ts, arrival: oracle.GetTimeFromTS(ts)})
}
}
2 changes: 1 addition & 1 deletion oracle/oracles/local.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 TiKV Authors

Check failure on line 1 in oracle/oracles/local.go

View workflow job for this annotation

GitHub Actions / golangci

: # github.com/tikv/client-go/v2/oracle/oracles [github.com/tikv/client-go/v2/oracle/oracles.test]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -118,7 +118,7 @@
}

// GetStaleTimestamp return physical
func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
func (l *localOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) {
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

Expand Down
2 changes: 1 addition & 1 deletion oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (o *MockOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, e
}

// GetStaleTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
func (o *MockOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) {
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

Expand Down
Loading
Loading