1// Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to
2// reduce copying and to allow reuse of individual chunks.
3package buffer
4
5import (
6 "io"
7 "net"
8 "sync"
9)
10
11// PoolConfig contains configuration for the allocation and reuse strategy.
12type PoolConfig struct {
13 StartSize int // Minimum chunk size that is allocated.
14 PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead.
15 MaxSize int // Maximum chunk size that will be allocated.
16}
17
18var config = PoolConfig{
19 StartSize: 128,
20 PooledSize: 512,
21 MaxSize: 32768,
22}
23
24// Reuse pool: chunk size -> pool.
25var buffers = map[int]*sync.Pool{}
26
27func initBuffers() {
28 for l := config.PooledSize; l <= config.MaxSize; l *= 2 {
29 buffers[l] = new(sync.Pool)
30 }
31}
32
33func init() {
34 initBuffers()
35}
36
37// Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done.
38func Init(cfg PoolConfig) {
39 config = cfg
40 initBuffers()
41}
42
43// putBuf puts a chunk to reuse pool if it can be reused.
44func putBuf(buf []byte) {
45 size := cap(buf)
46 if size < config.PooledSize {
47 return
48 }
49 if c := buffers[size]; c != nil {
50 c.Put(buf[:0])
51 }
52}
53
54// getBuf gets a chunk from reuse pool or creates a new one if reuse failed.
55func getBuf(size int) []byte {
56 if size >= config.PooledSize {
57 if c := buffers[size]; c != nil {
58 v := c.Get()
59 if v != nil {
60 return v.([]byte)
61 }
62 }
63 }
64 return make([]byte, 0, size)
65}
66
67// Buffer is a buffer optimized for serialization without extra copying.
68type Buffer struct {
69
70 // Buf is the current chunk that can be used for serialization.
71 Buf []byte
72
73 toPool []byte
74 bufs [][]byte
75}
76
77// EnsureSpace makes sure that the current chunk contains at least s free bytes,
78// possibly creating a new chunk.
79func (b *Buffer) EnsureSpace(s int) {
80 if cap(b.Buf)-len(b.Buf) < s {
81 b.ensureSpaceSlow(s)
82 }
83}
84
85func (b *Buffer) ensureSpaceSlow(s int) {
86 l := len(b.Buf)
87 if l > 0 {
88 if cap(b.toPool) != cap(b.Buf) {
89 // Chunk was reallocated, toPool can be pooled.
90 putBuf(b.toPool)
91 }
92 if cap(b.bufs) == 0 {
93 b.bufs = make([][]byte, 0, 8)
94 }
95 b.bufs = append(b.bufs, b.Buf)
96 l = cap(b.toPool) * 2
97 } else {
98 l = config.StartSize
99 }
100
101 if l > config.MaxSize {
102 l = config.MaxSize
103 }
104 b.Buf = getBuf(l)
105 b.toPool = b.Buf
106}
107
108// AppendByte appends a single byte to buffer.
109func (b *Buffer) AppendByte(data byte) {
110 b.EnsureSpace(1)
111 b.Buf = append(b.Buf, data)
112}
113
114// AppendBytes appends a byte slice to buffer.
115func (b *Buffer) AppendBytes(data []byte) {
116 if len(data) <= cap(b.Buf)-len(b.Buf) {
117 b.Buf = append(b.Buf, data...) // fast path
118 } else {
119 b.appendBytesSlow(data)
120 }
121}
122
123func (b *Buffer) appendBytesSlow(data []byte) {
124 for len(data) > 0 {
125 b.EnsureSpace(1)
126
127 sz := cap(b.Buf) - len(b.Buf)
128 if sz > len(data) {
129 sz = len(data)
130 }
131
132 b.Buf = append(b.Buf, data[:sz]...)
133 data = data[sz:]
134 }
135}
136
137// AppendString appends a string to buffer.
138func (b *Buffer) AppendString(data string) {
139 if len(data) <= cap(b.Buf)-len(b.Buf) {
140 b.Buf = append(b.Buf, data...) // fast path
141 } else {
142 b.appendStringSlow(data)
143 }
144}
145
146func (b *Buffer) appendStringSlow(data string) {
147 for len(data) > 0 {
148 b.EnsureSpace(1)
149
150 sz := cap(b.Buf) - len(b.Buf)
151 if sz > len(data) {
152 sz = len(data)
153 }
154
155 b.Buf = append(b.Buf, data[:sz]...)
156 data = data[sz:]
157 }
158}
159
160// Size computes the size of a buffer by adding sizes of every chunk.
161func (b *Buffer) Size() int {
162 size := len(b.Buf)
163 for _, buf := range b.bufs {
164 size += len(buf)
165 }
166 return size
167}
168
169// DumpTo outputs the contents of a buffer to a writer and resets the buffer.
170func (b *Buffer) DumpTo(w io.Writer) (written int, err error) {
171 bufs := net.Buffers(b.bufs)
172 if len(b.Buf) > 0 {
173 bufs = append(bufs, b.Buf)
174 }
175 n, err := bufs.WriteTo(w)
176
177 for _, buf := range b.bufs {
178 putBuf(buf)
179 }
180 putBuf(b.toPool)
181
182 b.bufs = nil
183 b.Buf = nil
184 b.toPool = nil
185
186 return int(n), err
187}
188
189// BuildBytes creates a single byte slice with all the contents of the buffer. Data is
190// copied if it does not fit in a single chunk. You can optionally provide one byte
191// slice as argument that it will try to reuse.
192func (b *Buffer) BuildBytes(reuse ...[]byte) []byte {
193 if len(b.bufs) == 0 {
194 ret := b.Buf
195 b.toPool = nil
196 b.Buf = nil
197 return ret
198 }
199
200 var ret []byte
201 size := b.Size()
202
203 // If we got a buffer as argument and it is big enough, reuse it.
204 if len(reuse) == 1 && cap(reuse[0]) >= size {
205 ret = reuse[0][:0]
206 } else {
207 ret = make([]byte, 0, size)
208 }
209 for _, buf := range b.bufs {
210 ret = append(ret, buf...)
211 putBuf(buf)
212 }
213
214 ret = append(ret, b.Buf...)
215 putBuf(b.toPool)
216
217 b.bufs = nil
218 b.toPool = nil
219 b.Buf = nil
220
221 return ret
222}
223
224type readCloser struct {
225 offset int
226 bufs [][]byte
227}
228
229func (r *readCloser) Read(p []byte) (n int, err error) {
230 for _, buf := range r.bufs {
231 // Copy as much as we can.
232 x := copy(p[n:], buf[r.offset:])
233 n += x // Increment how much we filled.
234
235 // Did we empty the whole buffer?
236 if r.offset+x == len(buf) {
237 // On to the next buffer.
238 r.offset = 0
239 r.bufs = r.bufs[1:]
240
241 // We can release this buffer.
242 putBuf(buf)
243 } else {
244 r.offset += x
245 }
246
247 if n == len(p) {
248 break
249 }
250 }
251 // No buffers left or nothing read?
252 if len(r.bufs) == 0 {
253 err = io.EOF
254 }
255 return
256}
257
258func (r *readCloser) Close() error {
259 // Release all remaining buffers.
260 for _, buf := range r.bufs {
261 putBuf(buf)
262 }
263 // In case Close gets called multiple times.
264 r.bufs = nil
265
266 return nil
267}
268
269// ReadCloser creates an io.ReadCloser with all the contents of the buffer.
270func (b *Buffer) ReadCloser() io.ReadCloser {
271 ret := &readCloser{0, append(b.bufs, b.Buf)}
272
273 b.bufs = nil
274 b.toPool = nil
275 b.Buf = nil
276
277 return ret
278}