Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f6986fb

Browse files
authoredFeb 17, 2023
Use buffer pool for decompressed buffer (#1063)
* Use buffer pool for decompressed buffer * Address review comments
1 parent dc0faf5 commit f6986fb

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed
 

‎batch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,16 @@ func (batch *Batch) close() (err error) {
7979

8080
batch.conn = nil
8181
batch.lock = nil
82+
8283
if batch.msgs != nil {
8384
batch.msgs.discard()
8485
}
8586

87+
if batch.msgs != nil && batch.msgs.decompressed != nil {
88+
releaseBuffer(batch.msgs.decompressed)
89+
batch.msgs.decompressed = nil
90+
}
91+
8692
if err = batch.err; errors.Is(batch.err, io.EOF) {
8793
err = nil
8894
}

‎message_reader.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type messageSetReader struct {
2222
// This is used to detect truncation of the response.
2323
lengthRemain int
2424

25-
decompressed bytes.Buffer
25+
decompressed *bytes.Buffer
2626
}
2727

2828
type readerStack struct {
@@ -87,6 +87,7 @@ func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, e
8787
reader: reader,
8888
remain: remain,
8989
},
90+
decompressed: acquireBuffer(),
9091
}
9192
err := res.readHeader()
9293
return res, err
@@ -199,7 +200,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
199200
// Allocate a buffer of size 0, which gets capped at 16 bytes
200201
// by the bufio package. We are already reading buffered data
201202
// here, no need to reserve another 4KB buffer.
202-
reader: bufio.NewReaderSize(&r.decompressed, 0),
203+
reader: bufio.NewReaderSize(r.decompressed, 0),
203204
remain: r.decompressed.Len(),
204205
base: offset,
205206
parent: r.readerStack,
@@ -278,7 +279,7 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
278279
}
279280
r.remain -= batchRemain - int(limitReader.N)
280281
r.readerStack = &readerStack{
281-
reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer
282+
reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer
282283
remain: r.decompressed.Len(),
283284
base: -1, // base is unused here
284285
parent: r.readerStack,

0 commit comments

Comments
 (0)
Please sign in to comment.