Skip to content

Commit 57ca2cc

Browse files
committed
fix(usenet): optimize memory by using lazy buffer allocation and pooling in BufferedPipe
1 parent 116829d commit 57ca2cc

File tree

1 file changed

+64
-18
lines changed

1 file changed

+64
-18
lines changed

internal/utils/bufpipe.go

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,36 @@ import (
88

99
var (
1010
ErrClosedPipe = errors.New("bufpipe: closed pipe")
11+
// Global pool for 1MB buffers to reduce GC pressure and memory spikes
12+
bufferPool = sync.Pool{
13+
New: func() any {
14+
b := make([]byte, 1024*1024)
15+
return &b
16+
},
17+
}
1118
)
1219

1320
// BufferedPipe is a thread-safe pipe with an internal buffer.
1421
// It allows the writer to continue writing as long as there is space in the buffer,
1522
// and the reader to read available data.
23+
// Memory optimization: The buffer is only allocated from a pool on the first Write call.
1624
type BufferedPipe struct {
17-
buf []byte
25+
buf *[]byte
1826
head int
1927
tail int
2028
closed bool
2129
err error
2230
cond *sync.Cond
2331
mu sync.Mutex
32+
33+
// Track if buffer was returned to pool
34+
released bool
2435
}
2536

37+
// NewBufferedPipe creates a new buffered pipe. The size parameter is currently
38+
// ignored to maintain compatibility with the fixed-size buffer pool (1MB).
2639
func NewBufferedPipe(size int) *BufferedPipe {
27-
p := &BufferedPipe{
28-
buf: make([]byte, size),
29-
}
40+
p := &BufferedPipe{}
3041
p.cond = sync.NewCond(&p.mu)
3142
return p
3243
}
@@ -40,27 +51,33 @@ func (p *BufferedPipe) Write(data []byte) (n int, err error) {
4051
return n, ErrClosedPipe
4152
}
4253

54+
// Lazy allocation: only get a buffer from the pool when we actually start writing
55+
if p.buf == nil {
56+
p.buf = bufferPool.Get().(*[]byte)
57+
}
58+
4359
free := p.freeSpace()
4460
if free == 0 {
4561
p.cond.Wait()
4662
continue
4763
}
4864

65+
buf := *p.buf
4966
chunkSize := len(data) - n
5067
if chunkSize > free {
5168
chunkSize = free
5269
}
5370

54-
// Calculate copy length up to the end of the buffer
71+
// Calculate copy length up to the end of the buffer (linear part)
5572
end := p.tail + chunkSize
56-
if end > len(p.buf) {
57-
end = len(p.buf)
73+
if end > len(buf) {
74+
end = len(buf)
5875
}
5976

6077
copyLen := end - p.tail
61-
copy(p.buf[p.tail:], data[n:n+copyLen])
78+
copy(buf[p.tail:], data[n:n+copyLen])
6279

63-
p.tail = (p.tail + copyLen) % len(p.buf)
80+
p.tail = (p.tail + copyLen) % len(buf)
6481
n += copyLen
6582

6683
p.cond.Broadcast()
@@ -89,20 +106,28 @@ func (p *BufferedPipe) Read(data []byte) (n int, err error) {
89106
n = available
90107
}
91108

92-
// Calculate copy length up to the end of the buffer
109+
buf := *p.buf
110+
// Calculate copy length up to the end of the buffer (linear part)
93111
end := p.head + n
94-
if end > len(p.buf) {
95-
end = len(p.buf)
112+
if end > len(buf) {
113+
end = len(buf)
96114
}
97115

98116
copyLen := end - p.head
99-
copy(data, p.buf[p.head:end])
117+
copy(data, buf[p.head:end])
100118

119+
// If there's more to read (wrapped around), copy from the beginning
101120
if n > copyLen {
102-
copy(data[copyLen:], p.buf[:n-copyLen])
121+
copy(data[copyLen:], buf[:n-copyLen])
122+
}
123+
124+
p.head = (p.head + n) % len(buf)
125+
126+
// If we just read the last byte and the pipe is closed, we can release the buffer
127+
if p.closed && p.availableData() == 0 {
128+
p.releaseBuffer()
103129
}
104130

105-
p.head = (p.head + n) % len(p.buf)
106131
p.cond.Broadcast()
107132

108133
return n, nil
@@ -120,24 +145,45 @@ func (p *BufferedPipe) CloseWithError(err error) error {
120145
}
121146
p.closed = true
122147
p.err = err
148+
149+
// If buffer is empty or was never allocated, we can release it now
150+
if p.availableData() == 0 {
151+
p.releaseBuffer()
152+
}
153+
123154
p.cond.Broadcast()
124155
return nil
125156
}
126157

158+
func (p *BufferedPipe) releaseBuffer() {
159+
if p.buf != nil && !p.released {
160+
bufferPool.Put(p.buf)
161+
p.buf = nil
162+
p.released = true
163+
}
164+
}
165+
127166
func (p *BufferedPipe) availableData() int {
167+
if p.buf == nil {
168+
return 0
169+
}
170+
bufLen := len(*p.buf)
128171
if p.tail >= p.head {
129172
return p.tail - p.head
130173
}
131-
return len(p.buf) - p.head + p.tail
174+
return bufLen - p.head + p.tail
132175
}
133176

134177
func (p *BufferedPipe) freeSpace() int {
135-
return len(p.buf) - p.availableData() - 1
178+
if p.buf == nil {
179+
return 1024*1024 - 1 // Default size minus 1 for head/tail boundary
180+
}
181+
return len(*p.buf) - p.availableData() - 1
136182
}
137183

138184
func (p *BufferedPipe) getErr() error {
139185
if p.err == nil {
140186
return io.EOF
141187
}
142188
return p.err
143-
}
189+
}

0 commit comments

Comments
 (0)