@@ -4,12 +4,10 @@ import (
4
4
"encoding/binary"
5
5
"errors"
6
6
"fmt"
7
+ "github.com/valyala/bytebufferpool"
7
8
"hash/crc32"
8
9
"io"
9
10
"os"
10
- "sync"
11
-
12
- "github.com/valyala/bytebufferpool"
13
11
)
14
12
15
13
type ChunkType = byte
@@ -53,7 +51,7 @@ type segment struct {
53
51
currentBlockSize uint32
54
52
closed bool
55
53
header []byte
56
- blockPool sync. Pool
54
+ cachedBlock * blockAndHeader
57
55
}
58
56
59
57
// segmentReader is used to iterate all the data from the segment file.
@@ -67,8 +65,9 @@ type segmentReader struct {
67
65
68
66
// block and chunk header, saved in pool.
69
67
type blockAndHeader struct {
70
- block []byte
71
- header []byte
68
+ block []byte
69
+ header []byte
70
+ blockNumber int64
72
71
}
73
72
74
73
// ChunkPosition represents the position of a chunk in a segment file.
@@ -101,23 +100,23 @@ func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) {
101
100
panic (fmt .Errorf ("seek to the end of segment file %d%s failed: %v" , id , extName , err ))
102
101
}
103
102
103
+ // init cached block
104
+ bh := & blockAndHeader {
105
+ block : make ([]byte , blockSize ),
106
+ header : make ([]byte , chunkHeaderSize ),
107
+ blockNumber : - 1 ,
108
+ }
109
+
104
110
return & segment {
105
111
id : id ,
106
112
fd : fd ,
107
113
header : make ([]byte , chunkHeaderSize ),
108
- blockPool : sync.Pool {New : newBlockAndHeader },
109
114
currentBlockNumber : uint32 (offset / blockSize ),
110
115
currentBlockSize : uint32 (offset % blockSize ),
116
+ cachedBlock : bh ,
111
117
}, nil
112
118
}
113
119
114
- func newBlockAndHeader () interface {} {
115
- return & blockAndHeader {
116
- block : make ([]byte , blockSize ),
117
- header : make ([]byte , chunkHeaderSize ),
118
- }
119
- }
120
-
121
120
// NewReader creates a new segment reader.
122
121
// You can call Next to get the next chunk data,
123
122
// and io.EOF will be returned when there is no data.
@@ -356,6 +355,8 @@ func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error {
356
355
return err
357
356
}
358
357
358
+ // the cached block can not be reused again after writes.
359
+ seg .cachedBlock .blockNumber = - 1
359
360
return nil
360
361
}
361
362
@@ -372,13 +373,10 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
372
373
373
374
var (
374
375
result []byte
375
- bh = seg .blockPool . Get ().( * blockAndHeader )
376
+ bh = seg .cachedBlock
376
377
segSize = seg .Size ()
377
378
nextChunk = & ChunkPosition {SegmentId : seg .id }
378
379
)
379
- defer func () {
380
- seg .blockPool .Put (bh )
381
- }()
382
380
383
381
for {
384
382
size := int64 (blockSize )
@@ -391,10 +389,18 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
391
389
return nil , nil , io .EOF
392
390
}
393
391
394
- // cache miss, read block from the segment file
395
- _ , err := seg .fd .ReadAt (bh .block [0 :size ], offset )
396
- if err != nil {
397
- return nil , nil , err
392
+ // There are two cases that we should read block from file:
393
+ // 1. the acquired block is not the cached one
394
+ // 2. new writes appended to the block, and the block
395
+ // is still smaller than 32KB, we must read it again because of the new writes.
396
+ if seg .cachedBlock .blockNumber != int64 (blockNumber ) || size != blockSize {
397
+ // read block from segment file at the specified offset.
398
+ _ , err := seg .fd .ReadAt (bh .block [0 :size ], offset )
399
+ if err != nil {
400
+ return nil , nil , err
401
+ }
402
+ // remember the block
403
+ bh .blockNumber = int64 (blockNumber )
398
404
}
399
405
400
406
// header
0 commit comments