Skip to content

Commit 962dd2e

Browse files
ekexiumAilinKid
authored andcommitted
fix: check kill signal against 0 (pingcap#50029)
ref pingcap#49643
1 parent 846fb38 commit 962dd2e

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

pkg/executor/internal/mpp/local_mpp_coordinator.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -668,10 +668,17 @@ func (c *localMppCoordinator) nextImpl(ctx context.Context) (resp *mppResponse,
668668
case resp, ok = <-c.respChan:
669669
return
670670
case <-ticker.C:
671-
if c.vars != nil && c.vars.Killed != nil && atomic.LoadUint32(c.vars.Killed) == 1 {
672-
err = derr.ErrQueryInterrupted
673-
exit = true
674-
return
671+
if c.vars != nil && c.vars.Killed != nil {
672+
killed := atomic.LoadUint32(c.vars.Killed)
673+
if killed != 0 {
674+
logutil.Logger(ctx).Info(
675+
"a killed signal is received",
676+
zap.Uint32("signal", killed),
677+
)
678+
err = derr.ErrQueryInterrupted
679+
exit = true
680+
return
681+
}
675682
}
676683
case <-c.finishCh:
677684
exit = true

pkg/store/copr/batch_coprocessor.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,12 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe
11671167
case resp, ok = <-b.respChan:
11681168
return
11691169
case <-ticker.C:
1170-
if atomic.LoadUint32(b.vars.Killed) == 1 {
1170+
killed := atomic.LoadUint32(b.vars.Killed)
1171+
if killed != 0 {
1172+
logutil.Logger(ctx).Info(
1173+
"a killed signal is received",
1174+
zap.Uint32("signal", killed),
1175+
)
11711176
resp = &batchCopResponse{err: derr.ErrQueryInterrupted}
11721177
ok = true
11731178
return

pkg/store/copr/coprocessor.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,12 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes
937937
exit = true
938938
return
939939
case <-ticker.C:
940-
if atomic.LoadUint32(it.vars.Killed) == 1 {
940+
killed := atomic.LoadUint32(it.vars.Killed)
941+
if killed != 0 {
942+
logutil.Logger(ctx).Info(
943+
"a killed signal is received",
944+
zap.Uint32("signal", killed),
945+
)
941946
resp = &copResponse{err: derr.ErrQueryInterrupted}
942947
ok = true
943948
return
@@ -1862,8 +1867,15 @@ func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *copro
18621867

18631868
// finished checks the flags and finished channel, it tells whether the worker is finished.
18641869
func (worker *copIteratorWorker) finished() bool {
1865-
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
1866-
return true
1870+
if worker.vars != nil && worker.vars.Killed != nil {
1871+
killed := atomic.LoadUint32(worker.vars.Killed)
1872+
if killed != 0 {
1873+
logutil.BgLogger().Info(
1874+
"a killed signal is received in copIteratorWorker",
1875+
zap.Uint32("signal", killed),
1876+
)
1877+
return true
1878+
}
18671879
}
18681880
select {
18691881
case <-worker.finishCh:

0 commit comments

Comments
 (0)