pool.go

  1// Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to
  2// reduce copying and to allow reuse of individual chunks.
  3package buffer
  4
  5import (
  6	"io"
  7	"net"
  8	"sync"
  9)
 10
 11// PoolConfig contains configuration for the allocation and reuse strategy.
 12type PoolConfig struct {
 13	StartSize  int // Minimum chunk size that is allocated.
 14	PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead.
 15	MaxSize    int // Maximum chunk size that will be allocated.
 16}
 17
 18var config = PoolConfig{
 19	StartSize:  128,
 20	PooledSize: 512,
 21	MaxSize:    32768,
 22}
 23
 24// Reuse pool: chunk size -> pool.
 25var buffers = map[int]*sync.Pool{}
 26
 27func initBuffers() {
 28	for l := config.PooledSize; l <= config.MaxSize; l *= 2 {
 29		buffers[l] = new(sync.Pool)
 30	}
 31}
 32
 33func init() {
 34	initBuffers()
 35}
 36
 37// Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done.
 38func Init(cfg PoolConfig) {
 39	config = cfg
 40	initBuffers()
 41}
 42
 43// putBuf puts a chunk to reuse pool if it can be reused.
 44func putBuf(buf []byte) {
 45	size := cap(buf)
 46	if size < config.PooledSize {
 47		return
 48	}
 49	if c := buffers[size]; c != nil {
 50		c.Put(buf[:0])
 51	}
 52}
 53
 54// getBuf gets a chunk from reuse pool or creates a new one if reuse failed.
 55func getBuf(size int) []byte {
 56	if size >= config.PooledSize {
 57		if c := buffers[size]; c != nil {
 58			v := c.Get()
 59			if v != nil {
 60				return v.([]byte)
 61			}
 62		}
 63	}
 64	return make([]byte, 0, size)
 65}
 66
 67// Buffer is a buffer optimized for serialization without extra copying.
 68type Buffer struct {
 69
 70	// Buf is the current chunk that can be used for serialization.
 71	Buf []byte
 72
 73	toPool []byte
 74	bufs   [][]byte
 75}
 76
 77// EnsureSpace makes sure that the current chunk contains at least s free bytes,
 78// possibly creating a new chunk.
 79func (b *Buffer) EnsureSpace(s int) {
 80	if cap(b.Buf)-len(b.Buf) < s {
 81		b.ensureSpaceSlow(s)
 82	}
 83}
 84
 85func (b *Buffer) ensureSpaceSlow(s int) {
 86	l := len(b.Buf)
 87	if l > 0 {
 88		if cap(b.toPool) != cap(b.Buf) {
 89			// Chunk was reallocated, toPool can be pooled.
 90			putBuf(b.toPool)
 91		}
 92		if cap(b.bufs) == 0 {
 93			b.bufs = make([][]byte, 0, 8)
 94		}
 95		b.bufs = append(b.bufs, b.Buf)
 96		l = cap(b.toPool) * 2
 97	} else {
 98		l = config.StartSize
 99	}
100
101	if l > config.MaxSize {
102		l = config.MaxSize
103	}
104	b.Buf = getBuf(l)
105	b.toPool = b.Buf
106}
107
108// AppendByte appends a single byte to buffer.
109func (b *Buffer) AppendByte(data byte) {
110	b.EnsureSpace(1)
111	b.Buf = append(b.Buf, data)
112}
113
114// AppendBytes appends a byte slice to buffer.
115func (b *Buffer) AppendBytes(data []byte) {
116	if len(data) <= cap(b.Buf)-len(b.Buf) {
117		b.Buf = append(b.Buf, data...) // fast path
118	} else {
119		b.appendBytesSlow(data)
120	}
121}
122
123func (b *Buffer) appendBytesSlow(data []byte) {
124	for len(data) > 0 {
125		b.EnsureSpace(1)
126
127		sz := cap(b.Buf) - len(b.Buf)
128		if sz > len(data) {
129			sz = len(data)
130		}
131
132		b.Buf = append(b.Buf, data[:sz]...)
133		data = data[sz:]
134	}
135}
136
137// AppendString appends a string to buffer.
138func (b *Buffer) AppendString(data string) {
139	if len(data) <= cap(b.Buf)-len(b.Buf) {
140		b.Buf = append(b.Buf, data...) // fast path
141	} else {
142		b.appendStringSlow(data)
143	}
144}
145
146func (b *Buffer) appendStringSlow(data string) {
147	for len(data) > 0 {
148		b.EnsureSpace(1)
149
150		sz := cap(b.Buf) - len(b.Buf)
151		if sz > len(data) {
152			sz = len(data)
153		}
154
155		b.Buf = append(b.Buf, data[:sz]...)
156		data = data[sz:]
157	}
158}
159
160// Size computes the size of a buffer by adding sizes of every chunk.
161func (b *Buffer) Size() int {
162	size := len(b.Buf)
163	for _, buf := range b.bufs {
164		size += len(buf)
165	}
166	return size
167}
168
169// DumpTo outputs the contents of a buffer to a writer and resets the buffer.
170func (b *Buffer) DumpTo(w io.Writer) (written int, err error) {
171	bufs := net.Buffers(b.bufs)
172	if len(b.Buf) > 0 {
173		bufs = append(bufs, b.Buf)
174	}
175	n, err := bufs.WriteTo(w)
176
177	for _, buf := range b.bufs {
178		putBuf(buf)
179	}
180	putBuf(b.toPool)
181
182	b.bufs = nil
183	b.Buf = nil
184	b.toPool = nil
185
186	return int(n), err
187}
188
189// BuildBytes creates a single byte slice with all the contents of the buffer. Data is
190// copied if it does not fit in a single chunk. You can optionally provide one byte
191// slice as argument that it will try to reuse.
192func (b *Buffer) BuildBytes(reuse ...[]byte) []byte {
193	if len(b.bufs) == 0 {
194		ret := b.Buf
195		b.toPool = nil
196		b.Buf = nil
197		return ret
198	}
199
200	var ret []byte
201	size := b.Size()
202
203	// If we got a buffer as argument and it is big enough, reuse it.
204	if len(reuse) == 1 && cap(reuse[0]) >= size {
205		ret = reuse[0][:0]
206	} else {
207		ret = make([]byte, 0, size)
208	}
209	for _, buf := range b.bufs {
210		ret = append(ret, buf...)
211		putBuf(buf)
212	}
213
214	ret = append(ret, b.Buf...)
215	putBuf(b.toPool)
216
217	b.bufs = nil
218	b.toPool = nil
219	b.Buf = nil
220
221	return ret
222}
223
224type readCloser struct {
225	offset int
226	bufs   [][]byte
227}
228
229func (r *readCloser) Read(p []byte) (n int, err error) {
230	for _, buf := range r.bufs {
231		// Copy as much as we can.
232		x := copy(p[n:], buf[r.offset:])
233		n += x // Increment how much we filled.
234
235		// Did we empty the whole buffer?
236		if r.offset+x == len(buf) {
237			// On to the next buffer.
238			r.offset = 0
239			r.bufs = r.bufs[1:]
240
241			// We can release this buffer.
242			putBuf(buf)
243		} else {
244			r.offset += x
245		}
246
247		if n == len(p) {
248			break
249		}
250	}
251	// No buffers left or nothing read?
252	if len(r.bufs) == 0 {
253		err = io.EOF
254	}
255	return
256}
257
258func (r *readCloser) Close() error {
259	// Release all remaining buffers.
260	for _, buf := range r.bufs {
261		putBuf(buf)
262	}
263	// In case Close gets called multiple times.
264	r.bufs = nil
265
266	return nil
267}
268
269// ReadCloser creates an io.ReadCloser with all the contents of the buffer.
270func (b *Buffer) ReadCloser() io.ReadCloser {
271	ret := &readCloser{0, append(b.bufs, b.Buf)}
272
273	b.bufs = nil
274	b.toPool = nil
275	b.Buf = nil
276
277	return ret
278}