buffer_pool.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package mem
 20
 21import (
 22	"sort"
 23	"sync"
 24
 25	"google.golang.org/grpc/internal"
 26)
 27
 28// BufferPool is a pool of buffers that can be shared and reused, resulting in
 29// decreased memory allocation.
 30type BufferPool interface {
 31	// Get returns a buffer with specified length from the pool.
 32	Get(length int) *[]byte
 33
 34	// Put returns a buffer to the pool.
 35	Put(*[]byte)
 36}
 37
 38var defaultBufferPoolSizes = []int{
 39	256,
 40	4 << 10,  // 4KB (go page size)
 41	16 << 10, // 16KB (max HTTP/2 frame size used by gRPC)
 42	32 << 10, // 32KB (default buffer size for io.Copy)
 43	1 << 20,  // 1MB
 44}
 45
 46var defaultBufferPool BufferPool
 47
 48func init() {
 49	defaultBufferPool = NewTieredBufferPool(defaultBufferPoolSizes...)
 50
 51	internal.SetDefaultBufferPoolForTesting = func(pool BufferPool) {
 52		defaultBufferPool = pool
 53	}
 54
 55	internal.SetBufferPoolingThresholdForTesting = func(threshold int) {
 56		bufferPoolingThreshold = threshold
 57	}
 58}
 59
 60// DefaultBufferPool returns the current default buffer pool. It is a BufferPool
 61// created with NewBufferPool that uses a set of default sizes optimized for
 62// expected workflows.
 63func DefaultBufferPool() BufferPool {
 64	return defaultBufferPool
 65}
 66
 67// NewTieredBufferPool returns a BufferPool implementation that uses multiple
 68// underlying pools of the given pool sizes.
 69func NewTieredBufferPool(poolSizes ...int) BufferPool {
 70	sort.Ints(poolSizes)
 71	pools := make([]*sizedBufferPool, len(poolSizes))
 72	for i, s := range poolSizes {
 73		pools[i] = newSizedBufferPool(s)
 74	}
 75	return &tieredBufferPool{
 76		sizedPools: pools,
 77	}
 78}
 79
 80// tieredBufferPool implements the BufferPool interface with multiple tiers of
 81// buffer pools for different sizes of buffers.
 82type tieredBufferPool struct {
 83	sizedPools   []*sizedBufferPool
 84	fallbackPool simpleBufferPool
 85}
 86
 87func (p *tieredBufferPool) Get(size int) *[]byte {
 88	return p.getPool(size).Get(size)
 89}
 90
 91func (p *tieredBufferPool) Put(buf *[]byte) {
 92	p.getPool(cap(*buf)).Put(buf)
 93}
 94
 95func (p *tieredBufferPool) getPool(size int) BufferPool {
 96	poolIdx := sort.Search(len(p.sizedPools), func(i int) bool {
 97		return p.sizedPools[i].defaultSize >= size
 98	})
 99
100	if poolIdx == len(p.sizedPools) {
101		return &p.fallbackPool
102	}
103
104	return p.sizedPools[poolIdx]
105}
106
107// sizedBufferPool is a BufferPool implementation that is optimized for specific
108// buffer sizes. For example, HTTP/2 frames within gRPC have a default max size
109// of 16kb and a sizedBufferPool can be configured to only return buffers with a
110// capacity of 16kb. Note that however it does not support returning larger
111// buffers and in fact panics if such a buffer is requested. Because of this,
112// this BufferPool implementation is not meant to be used on its own and rather
113// is intended to be embedded in a tieredBufferPool such that Get is only
114// invoked when the required size is smaller than or equal to defaultSize.
115type sizedBufferPool struct {
116	pool        sync.Pool
117	defaultSize int
118}
119
120func (p *sizedBufferPool) Get(size int) *[]byte {
121	buf := p.pool.Get().(*[]byte)
122	b := *buf
123	clear(b[:cap(b)])
124	*buf = b[:size]
125	return buf
126}
127
128func (p *sizedBufferPool) Put(buf *[]byte) {
129	if cap(*buf) < p.defaultSize {
130		// Ignore buffers that are too small to fit in the pool. Otherwise, when
131		// Get is called it will panic as it tries to index outside the bounds
132		// of the buffer.
133		return
134	}
135	p.pool.Put(buf)
136}
137
138func newSizedBufferPool(size int) *sizedBufferPool {
139	return &sizedBufferPool{
140		pool: sync.Pool{
141			New: func() any {
142				buf := make([]byte, size)
143				return &buf
144			},
145		},
146		defaultSize: size,
147	}
148}
149
150var _ BufferPool = (*simpleBufferPool)(nil)
151
152// simpleBufferPool is an implementation of the BufferPool interface that
153// attempts to pool buffers with a sync.Pool. When Get is invoked, it tries to
154// acquire a buffer from the pool but if that buffer is too small, it returns it
155// to the pool and creates a new one.
156type simpleBufferPool struct {
157	pool sync.Pool
158}
159
160func (p *simpleBufferPool) Get(size int) *[]byte {
161	bs, ok := p.pool.Get().(*[]byte)
162	if ok && cap(*bs) >= size {
163		*bs = (*bs)[:size]
164		return bs
165	}
166
167	// A buffer was pulled from the pool, but it is too small. Put it back in
168	// the pool and create one large enough.
169	if ok {
170		p.pool.Put(bs)
171	}
172
173	b := make([]byte, size)
174	return &b
175}
176
177func (p *simpleBufferPool) Put(buf *[]byte) {
178	p.pool.Put(buf)
179}
180
181var _ BufferPool = NopBufferPool{}
182
183// NopBufferPool is a buffer pool that returns new buffers without pooling.
184type NopBufferPool struct{}
185
186// Get returns a buffer with specified length from the pool.
187func (NopBufferPool) Get(length int) *[]byte {
188	b := make([]byte, length)
189	return &b
190}
191
192// Put returns a buffer to the pool.
193func (NopBufferPool) Put(*[]byte) {
194}