buffers.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19// Package mem provides utilities that facilitate memory reuse in byte slices
 20// that are used as buffers.
 21//
 22// # Experimental
 23//
 24// Notice: All APIs in this package are EXPERIMENTAL and may be changed or
 25// removed in a later release.
 26package mem
 27
 28import (
 29	"fmt"
 30	"sync"
 31	"sync/atomic"
 32)
 33
 34// A Buffer represents a reference counted piece of data (in bytes) that can be
 35// acquired by a call to NewBuffer() or Copy(). A reference to a Buffer may be
 36// released by calling Free(), which invokes the free function given at creation
 37// only after all references are released.
 38//
 39// Note that a Buffer is not safe for concurrent access and instead each
 40// goroutine should use its own reference to the data, which can be acquired via
 41// a call to Ref().
 42//
 43// Attempts to access the underlying data after releasing the reference to the
 44// Buffer will panic.
 45type Buffer interface {
 46	// ReadOnlyData returns the underlying byte slice. Note that it is undefined
 47	// behavior to modify the contents of this slice in any way.
 48	ReadOnlyData() []byte
 49	// Ref increases the reference counter for this Buffer.
 50	Ref()
 51	// Free decrements this Buffer's reference counter and frees the underlying
 52	// byte slice if the counter reaches 0 as a result of this call.
 53	Free()
 54	// Len returns the Buffer's size.
 55	Len() int
 56
 57	split(n int) (left, right Buffer)
 58	read(buf []byte) (int, Buffer)
 59}
 60
 61var (
 62	bufferPoolingThreshold = 1 << 10
 63
 64	bufferObjectPool = sync.Pool{New: func() any { return new(buffer) }}
 65	refObjectPool    = sync.Pool{New: func() any { return new(atomic.Int32) }}
 66)
 67
 68// IsBelowBufferPoolingThreshold returns true if the given size is less than or
 69// equal to the threshold for buffer pooling. This is used to determine whether
 70// to pool buffers or allocate them directly.
 71func IsBelowBufferPoolingThreshold(size int) bool {
 72	return size <= bufferPoolingThreshold
 73}
 74
 75type buffer struct {
 76	origData *[]byte
 77	data     []byte
 78	refs     *atomic.Int32
 79	pool     BufferPool
 80}
 81
 82func newBuffer() *buffer {
 83	return bufferObjectPool.Get().(*buffer)
 84}
 85
 86// NewBuffer creates a new Buffer from the given data, initializing the reference
 87// counter to 1. The data will then be returned to the given pool when all
 88// references to the returned Buffer are released. As a special case to avoid
 89// additional allocations, if the given buffer pool is nil, the returned buffer
 90// will be a "no-op" Buffer where invoking Buffer.Free() does nothing and the
 91// underlying data is never freed.
 92//
 93// Note that the backing array of the given data is not copied.
 94func NewBuffer(data *[]byte, pool BufferPool) Buffer {
 95	// Use the buffer's capacity instead of the length, otherwise buffers may
 96	// not be reused under certain conditions. For example, if a large buffer
 97	// is acquired from the pool, but fewer bytes than the buffering threshold
 98	// are written to it, the buffer will not be returned to the pool.
 99	if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) {
100		return (SliceBuffer)(*data)
101	}
102	b := newBuffer()
103	b.origData = data
104	b.data = *data
105	b.pool = pool
106	b.refs = refObjectPool.Get().(*atomic.Int32)
107	b.refs.Add(1)
108	return b
109}
110
111// Copy creates a new Buffer from the given data, initializing the reference
112// counter to 1.
113//
114// It acquires a []byte from the given pool and copies over the backing array
115// of the given data. The []byte acquired from the pool is returned to the
116// pool when all references to the returned Buffer are released.
117func Copy(data []byte, pool BufferPool) Buffer {
118	if IsBelowBufferPoolingThreshold(len(data)) {
119		buf := make(SliceBuffer, len(data))
120		copy(buf, data)
121		return buf
122	}
123
124	buf := pool.Get(len(data))
125	copy(*buf, data)
126	return NewBuffer(buf, pool)
127}
128
129func (b *buffer) ReadOnlyData() []byte {
130	if b.refs == nil {
131		panic("Cannot read freed buffer")
132	}
133	return b.data
134}
135
136func (b *buffer) Ref() {
137	if b.refs == nil {
138		panic("Cannot ref freed buffer")
139	}
140	b.refs.Add(1)
141}
142
143func (b *buffer) Free() {
144	if b.refs == nil {
145		panic("Cannot free freed buffer")
146	}
147
148	refs := b.refs.Add(-1)
149	switch {
150	case refs > 0:
151		return
152	case refs == 0:
153		if b.pool != nil {
154			b.pool.Put(b.origData)
155		}
156
157		refObjectPool.Put(b.refs)
158		b.origData = nil
159		b.data = nil
160		b.refs = nil
161		b.pool = nil
162		bufferObjectPool.Put(b)
163	default:
164		panic("Cannot free freed buffer")
165	}
166}
167
168func (b *buffer) Len() int {
169	return len(b.ReadOnlyData())
170}
171
172func (b *buffer) split(n int) (Buffer, Buffer) {
173	if b.refs == nil {
174		panic("Cannot split freed buffer")
175	}
176
177	b.refs.Add(1)
178	split := newBuffer()
179	split.origData = b.origData
180	split.data = b.data[n:]
181	split.refs = b.refs
182	split.pool = b.pool
183
184	b.data = b.data[:n]
185
186	return b, split
187}
188
189func (b *buffer) read(buf []byte) (int, Buffer) {
190	if b.refs == nil {
191		panic("Cannot read freed buffer")
192	}
193
194	n := copy(buf, b.data)
195	if n == len(b.data) {
196		b.Free()
197		return n, nil
198	}
199
200	b.data = b.data[n:]
201	return n, b
202}
203
204func (b *buffer) String() string {
205	return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))
206}
207
208// ReadUnsafe reads bytes from the given Buffer into the provided slice.
209// It does not perform safety checks.
210func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) {
211	return buf.read(dst)
212}
213
214// SplitUnsafe modifies the receiver to point to the first n bytes while it
215// returns a new reference to the remaining bytes. The returned Buffer
216// functions just like a normal reference acquired using Ref().
217func SplitUnsafe(buf Buffer, n int) (left, right Buffer) {
218	return buf.split(n)
219}
220
221type emptyBuffer struct{}
222
223func (e emptyBuffer) ReadOnlyData() []byte {
224	return nil
225}
226
227func (e emptyBuffer) Ref()  {}
228func (e emptyBuffer) Free() {}
229
230func (e emptyBuffer) Len() int {
231	return 0
232}
233
234func (e emptyBuffer) split(int) (left, right Buffer) {
235	return e, e
236}
237
238func (e emptyBuffer) read([]byte) (int, Buffer) {
239	return 0, e
240}
241
242// SliceBuffer is a Buffer implementation that wraps a byte slice. It provides
243// methods for reading, splitting, and managing the byte slice.
244type SliceBuffer []byte
245
246// ReadOnlyData returns the byte slice.
247func (s SliceBuffer) ReadOnlyData() []byte { return s }
248
249// Ref is a noop implementation of Ref.
250func (s SliceBuffer) Ref() {}
251
252// Free is a noop implementation of Free.
253func (s SliceBuffer) Free() {}
254
255// Len is a noop implementation of Len.
256func (s SliceBuffer) Len() int { return len(s) }
257
258func (s SliceBuffer) split(n int) (left, right Buffer) {
259	return s[:n], s[n:]
260}
261
262func (s SliceBuffer) read(buf []byte) (int, Buffer) {
263	n := copy(buf, s)
264	if n == len(s) {
265		return n, nil
266	}
267	return n, s[n:]
268}