Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit 31a04ec

Browse files
committed
add write stress
Signed-off-by: Liqi Geng <[email protected]>
1 parent 2725790 commit 31a04ec

File tree

3 files changed

+272
-1
lines changed

3 files changed

+272
-1
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ default: tidy fmt lint build
1717

1818
build: tidb pocket tpcc ledger txn-rand-pessimistic on-dup sqllogic block-writer \
1919
region-available deadlock-detector crud bank bank2 abtest cdc-pocket tiflash-pocket vbank \
20-
read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read
20+
read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read write-stress
2121

2222
tidb:
2323
$(GOBUILD) $(GOMOD) -o bin/chaos-tidb cmd/tidb/main.go
@@ -97,6 +97,9 @@ tiflash-cdc:
9797
follower-read:
9898
$(GOBUILD) $(GOMOD) -o bin/follower-read cmd/follower-read/*.go
9999

100+
write-stress:
101+
$(GOBUILD) $(GOMOD) -o bin/write-stress cmd/write-stress/*.go
102+
100103
fmt: groupimports
101104
go fmt ./...
102105

cmd/write-stress/main.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2020 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"flag"
19+
20+
// use mysql
21+
_ "github.com/go-sql-driver/mysql"
22+
23+
test_infra "github.com/pingcap/tipocket/pkg/test-infra"
24+
writestress "github.com/pingcap/tipocket/tests/write-stress"
25+
26+
"github.com/pingcap/tipocket/cmd/util"
27+
"github.com/pingcap/tipocket/pkg/cluster"
28+
"github.com/pingcap/tipocket/pkg/control"
29+
"github.com/pingcap/tipocket/pkg/test-infra/fixture"
30+
)
31+
32+
var (
33+
dataNum = flag.Int("dataNum", 2000, "the number of data(the unit is 10 thoudstand)")
34+
concurrency = flag.Int("concurrency", 400, "concurrency of worker")
35+
batch = flag.Int("batch", 100, "batch of insert sql")
36+
)
37+
38+
func main() {
39+
flag.Parse()
40+
cfg := control.Config{
41+
Mode: control.ModeSelfScheduled,
42+
ClientCount: 1,
43+
RunTime: fixture.Context.RunTime,
44+
RunRound: 1,
45+
}
46+
kvs := []string{"127.0.0.1:20160", "127.0.0.1:20162", "127.0.0.1:20161"}
47+
suit := util.Suit{
48+
Config: &cfg,
49+
//Provisioner: cluster.NewK8sProvisioner(),
50+
Provisioner: cluster.NewLocalClusterProvisioner([]string{"127.0.0.1:4000"}, []string{"127.0.0.1:2379"}, kvs),
51+
ClientCreator: writestress.ClientCreator{Cfg: &writestress.Config{
52+
DataNum: *dataNum,
53+
Concurrency: *concurrency,
54+
Batch: *batch,
55+
}},
56+
NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis),
57+
ClusterDefs: test_infra.NewDefaultCluster(fixture.Context.Namespace, fixture.Context.Namespace,
58+
fixture.Context.TiDBClusterConfig),
59+
}
60+
suit.Run(context.Background())
61+
}

tests/write-stress/write_stress.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package writestress
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/binary"
7+
"fmt"
8+
"math/rand"
9+
"sync"
10+
"time"
11+
12+
"github.com/juju/errors"
13+
"github.com/ngaut/log"
14+
15+
"github.com/pingcap/tipocket/pkg/cluster/types"
16+
"github.com/pingcap/tipocket/pkg/core"
17+
"github.com/pingcap/tipocket/util"
18+
)
19+
20+
// Table schema comes from the bank of pufa `tmp_jieb_instmnt_daily`
21+
// CREATE TABLE `tmp_jieb_instmnt_daily` (
22+
// `ID` bigint(20) DEFAULT NULL COMMENT '主键ID',
23+
// `TABLE_ID` int(11) NOT NULL COMMENT '分库ID',
24+
// `FILE_DATE` char(8) NOT NULL COMMENT '文件日期',
25+
// `CONTRACT_NO` varchar(128) NOT NULL COMMENT '借据号',
26+
// `SETTLE_DATE` char(8) NOT NULL COMMENT '减免会计日期',
27+
// `TERM_NO` int(11) NOT NULL COMMENT '期次号',
28+
//
29+
// `INPT_DATE` char(8) DEFAULT NULL COMMENT '录入日期',
30+
// `INPT_TIME` varchar(20) DEFAULT NULL COMMENT '录入时间',
31+
// `RCRD_ST_CODE` varchar(1) DEFAULT NULL COMMENT '记录状态代码',
32+
// UNIQUE KEY `TMP_JIEB_INSTMNT_DAILY_IDX1` (`CONTRACT_NO`,`TERM_NO`),
33+
// KEY `TMP_JIEB_INSTMNT_DAILY_IDX2` (`TABLE_ID`,`CONTRACT_NO`)
34+
// ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=5 PRE_SPLIT_REGIONS=5 */ COMMENT='借呗日终(分期)信息临时表';
35+
const (
36+
stmtDrop = `DROP TABLE IF EXISTS write_stress`
37+
stmtCreate = `
38+
CREATE TABLE write_stress (
39+
TABLE_ID int(11) NOT NULL COMMENT '分库ID',
40+
CONTRACT_NO varchar(128) NOT NULL COMMENT '借据号',
41+
TERM_NO int(11) NOT NULL COMMENT '期次号',
42+
NOUSE char(60) NOT NULL COMMENT '填充位',
43+
44+
UNIQUE KEY TMP_JIEB_INSTMNT_DAILY_IDX1 (CONTRACT_NO, TERM_NO),
45+
KEY TMP_JIEB_INSTMNT_DAILY_IDX2 (TABLE_ID, CONTRACT_NO)
46+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
47+
`
48+
)
49+
50+
// Config is for writestressClient
51+
type Config struct {
52+
DataNum int `toml:"dataNum"`
53+
Concurrency int `toml:"concurrency"`
54+
Batch int `toml:"batch"`
55+
}
56+
57+
// ClientCreator creates writestressClient
58+
type ClientCreator struct {
59+
Cfg *Config
60+
}
61+
62+
// Create ...
63+
func (l ClientCreator) Create(node types.ClientNode) core.Client {
64+
return &writestressClient{
65+
Config: l.Cfg,
66+
}
67+
}
68+
69+
// ledgerClient simulates a complete record of financial transactions over the
70+
// life of a bank (or other company).
71+
type writestressClient struct {
72+
*Config
73+
db *sql.DB
74+
contract_ids [][]byte
75+
}
76+
77+
func (c *writestressClient) SetUp(ctx context.Context, nodes []types.ClientNode, idx int) error {
78+
if idx != 0 {
79+
return nil
80+
}
81+
82+
var err error
83+
node := nodes[idx]
84+
dsn := fmt.Sprintf("root@tcp(%s:%d)/test", node.IP, node.Port)
85+
86+
log.Infof("start to init...")
87+
c.db, err = util.OpenDB(dsn, c.Concurrency)
88+
if err != nil {
89+
return err
90+
}
91+
defer func() {
92+
log.Infof("init end...")
93+
}()
94+
95+
if _, err := c.db.Exec(stmtDrop); err != nil {
96+
log.Fatalf("execute statement %s error %v", stmtDrop, err)
97+
}
98+
99+
if _, err := c.db.Exec(stmtCreate); err != nil {
100+
log.Fatalf("execute statement %s error %v", stmtCreate, err)
101+
}
102+
103+
return nil
104+
}
105+
106+
func (c *writestressClient) TearDown(ctx context.Context, nodes []types.ClientNode, idx int) error {
107+
return nil
108+
}
109+
110+
func (c *writestressClient) Invoke(ctx context.Context, node types.ClientNode, r interface{}) core.UnknownResponse {
111+
panic("implement me")
112+
}
113+
114+
func (c *writestressClient) NextRequest() interface{} {
115+
panic("implement me")
116+
}
117+
118+
func (c *writestressClient) DumpState(ctx context.Context) (interface{}, error) {
119+
panic("implement me")
120+
}
121+
122+
func (c *writestressClient) Start(ctx context.Context, cfg interface{}, clientNodes []types.ClientNode) error {
123+
log.Infof("start to test...")
124+
defer func() {
125+
log.Infof("test end...")
126+
}()
127+
c.contract_ids = make([][]byte, c.DataNum)
128+
timeUnix := time.Now().Unix()
129+
count := uint8(0)
130+
b := make([]byte, 8)
131+
for i := 0; i < c.DataNum; i++ {
132+
// "abcd" + timestamp(8 bit) + count(8 bit)
133+
c.contract_ids[i] = append(c.contract_ids[i], []byte("abcd")...)
134+
binary.LittleEndian.PutUint64(b, uint64(timeUnix))
135+
c.contract_ids[i] = append(c.contract_ids[i], b...)
136+
binary.LittleEndian.PutUint64(b, uint64(count))
137+
c.contract_ids[i] = append(c.contract_ids[i], b...)
138+
139+
count++
140+
if count == 0 {
141+
timeUnix++
142+
}
143+
}
144+
145+
var wg sync.WaitGroup
146+
for i := 0; i < c.Concurrency; i++ {
147+
wg.Add(1)
148+
go func(i int) {
149+
defer wg.Done()
150+
for {
151+
select {
152+
case <-ctx.Done():
153+
return
154+
default:
155+
}
156+
if err := c.ExecuteInsert(c.db, i); err != nil {
157+
log.Fatalf("exec failed %v", err)
158+
}
159+
}
160+
}(i)
161+
}
162+
163+
wg.Wait()
164+
return nil
165+
}
166+
167+
// ExecuteInsert is run case
168+
func (c *writestressClient) ExecuteInsert(db *sql.DB, pos int) error {
169+
num := c.Config.DataNum * 10000 / c.Config.Concurrency
170+
171+
tx, err := db.Begin()
172+
if err != nil {
173+
return errors.Trace(err)
174+
}
175+
defer tx.Rollback()
176+
str := make([]byte, 50)
177+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
178+
for i := 0; i < num/c.Config.Batch; i++ {
179+
n := num*pos + i*c.Config.Batch
180+
if n >= c.DataNum {
181+
break
182+
}
183+
query := fmt.Sprintf(`INSERT INTO write_stress (TABLE_ID, CONTRACT_NO, TERM_NO, NOUSE) VALUES `)
184+
for j := 0; j < c.Config.Batch; j++ {
185+
n := num*pos + i*c.Config.Batch + j
186+
if n >= c.DataNum {
187+
break
188+
}
189+
contract_id := c.contract_ids[n]
190+
util.RandString(str, rnd)
191+
if j != 0 {
192+
query += ","
193+
}
194+
query += fmt.Sprintf(`(%v, %v, %v, %v)`, rnd.Uint32()%960+1, string(contract_id[:]), rnd.Uint32()%36+1, string(str[:]))
195+
}
196+
fmt.Println(query)
197+
if _, err := tx.Exec(query); err != nil {
198+
return errors.Trace(err)
199+
}
200+
}
201+
202+
if err := tx.Commit(); err != nil {
203+
return errors.Trace(err)
204+
}
205+
206+
return nil
207+
}

0 commit comments

Comments
 (0)