buffer_slice.go

  1/*
  2 *
  3 * Copyright 2024 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package mem
 20
 21import (
 22	"io"
 23)
 24
 25const (
 26	// 32 KiB is what io.Copy uses.
 27	readAllBufSize = 32 * 1024
 28)
 29
 30// BufferSlice offers a means to represent data that spans one or more Buffer
 31// instances. A BufferSlice is meant to be immutable after creation, and methods
 32// like Ref create and return copies of the slice. This is why all methods have
 33// value receivers rather than pointer receivers.
 34//
 35// Note that any of the methods that read the underlying buffers such as Ref,
 36// Len or CopyTo etc., will panic if any underlying buffers have already been
 37// freed. It is recommended to not directly interact with any of the underlying
 38// buffers directly, rather such interactions should be mediated through the
 39// various methods on this type.
 40//
 41// By convention, any APIs that return (mem.BufferSlice, error) should reduce
 42// the burden on the caller by never returning a mem.BufferSlice that needs to
 43// be freed if the error is non-nil, unless explicitly stated.
 44type BufferSlice []Buffer
 45
 46// Len returns the sum of the length of all the Buffers in this slice.
 47//
 48// # Warning
 49//
 50// Invoking the built-in len on a BufferSlice will return the number of buffers
 51// in the slice, and *not* the value returned by this function.
 52func (s BufferSlice) Len() int {
 53	var length int
 54	for _, b := range s {
 55		length += b.Len()
 56	}
 57	return length
 58}
 59
 60// Ref invokes Ref on each buffer in the slice.
 61func (s BufferSlice) Ref() {
 62	for _, b := range s {
 63		b.Ref()
 64	}
 65}
 66
 67// Free invokes Buffer.Free() on each Buffer in the slice.
 68func (s BufferSlice) Free() {
 69	for _, b := range s {
 70		b.Free()
 71	}
 72}
 73
 74// CopyTo copies each of the underlying Buffer's data into the given buffer,
 75// returning the number of bytes copied. Has the same semantics as the copy
 76// builtin in that it will copy as many bytes as it can, stopping when either dst
 77// is full or s runs out of data, returning the minimum of s.Len() and len(dst).
 78func (s BufferSlice) CopyTo(dst []byte) int {
 79	off := 0
 80	for _, b := range s {
 81		off += copy(dst[off:], b.ReadOnlyData())
 82	}
 83	return off
 84}
 85
 86// Materialize concatenates all the underlying Buffer's data into a single
 87// contiguous buffer using CopyTo.
 88func (s BufferSlice) Materialize() []byte {
 89	l := s.Len()
 90	if l == 0 {
 91		return nil
 92	}
 93	out := make([]byte, l)
 94	s.CopyTo(out)
 95	return out
 96}
 97
 98// MaterializeToBuffer functions like Materialize except that it writes the data
 99// to a single Buffer pulled from the given BufferPool.
100//
101// As a special case, if the input BufferSlice only actually has one Buffer, this
102// function simply increases the refcount before returning said Buffer. Freeing this
103// buffer won't release it until the BufferSlice is itself released.
104func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {
105	if len(s) == 1 {
106		s[0].Ref()
107		return s[0]
108	}
109	sLen := s.Len()
110	if sLen == 0 {
111		return emptyBuffer{}
112	}
113	buf := pool.Get(sLen)
114	s.CopyTo(*buf)
115	return NewBuffer(buf, pool)
116}
117
118// Reader returns a new Reader for the input slice after taking references to
119// each underlying buffer.
120func (s BufferSlice) Reader() Reader {
121	s.Ref()
122	return &sliceReader{
123		data: s,
124		len:  s.Len(),
125	}
126}
127
128// Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
129// with other parts systems. It also provides an additional convenience method
130// Remaining(), which returns the number of unread bytes remaining in the slice.
131// Buffers will be freed as they are read.
132type Reader interface {
133	io.Reader
134	io.ByteReader
135	// Close frees the underlying BufferSlice and never returns an error. Subsequent
136	// calls to Read will return (0, io.EOF).
137	Close() error
138	// Remaining returns the number of unread bytes remaining in the slice.
139	Remaining() int
140}
141
142type sliceReader struct {
143	data BufferSlice
144	len  int
145	// The index into data[0].ReadOnlyData().
146	bufferIdx int
147}
148
149func (r *sliceReader) Remaining() int {
150	return r.len
151}
152
153func (r *sliceReader) Close() error {
154	r.data.Free()
155	r.data = nil
156	r.len = 0
157	return nil
158}
159
160func (r *sliceReader) freeFirstBufferIfEmpty() bool {
161	if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
162		return false
163	}
164
165	r.data[0].Free()
166	r.data = r.data[1:]
167	r.bufferIdx = 0
168	return true
169}
170
171func (r *sliceReader) Read(buf []byte) (n int, _ error) {
172	if r.len == 0 {
173		return 0, io.EOF
174	}
175
176	for len(buf) != 0 && r.len != 0 {
177		// Copy as much as possible from the first Buffer in the slice into the
178		// given byte slice.
179		data := r.data[0].ReadOnlyData()
180		copied := copy(buf, data[r.bufferIdx:])
181		r.len -= copied       // Reduce len by the number of bytes copied.
182		r.bufferIdx += copied // Increment the buffer index.
183		n += copied           // Increment the total number of bytes read.
184		buf = buf[copied:]    // Shrink the given byte slice.
185
186		// If we have copied all the data from the first Buffer, free it and advance to
187		// the next in the slice.
188		r.freeFirstBufferIfEmpty()
189	}
190
191	return n, nil
192}
193
194func (r *sliceReader) ReadByte() (byte, error) {
195	if r.len == 0 {
196		return 0, io.EOF
197	}
198
199	// There may be any number of empty buffers in the slice, clear them all until a
200	// non-empty buffer is reached. This is guaranteed to exit since r.len is not 0.
201	for r.freeFirstBufferIfEmpty() {
202	}
203
204	b := r.data[0].ReadOnlyData()[r.bufferIdx]
205	r.len--
206	r.bufferIdx++
207	// Free the first buffer in the slice if the last byte was read
208	r.freeFirstBufferIfEmpty()
209	return b, nil
210}
211
212var _ io.Writer = (*writer)(nil)
213
214type writer struct {
215	buffers *BufferSlice
216	pool    BufferPool
217}
218
219func (w *writer) Write(p []byte) (n int, err error) {
220	b := Copy(p, w.pool)
221	*w.buffers = append(*w.buffers, b)
222	return b.Len(), nil
223}
224
225// NewWriter wraps the given BufferSlice and BufferPool to implement the
226// io.Writer interface. Every call to Write copies the contents of the given
227// buffer into a new Buffer pulled from the given pool and the Buffer is
228// added to the given BufferSlice.
229func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {
230	return &writer{buffers: buffers, pool: pool}
231}
232
233// ReadAll reads from r until an error or EOF and returns the data it read.
234// A successful call returns err == nil, not err == EOF. Because ReadAll is
235// defined to read from src until EOF, it does not treat an EOF from Read
236// as an error to be reported.
237//
238// Important: A failed call returns a non-nil error and may also return
239// partially read buffers. It is the responsibility of the caller to free the
240// BufferSlice returned, or its memory will not be reused.
241func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
242	var result BufferSlice
243	if wt, ok := r.(io.WriterTo); ok {
244		// This is more optimal since wt knows the size of chunks it wants to
245		// write and, hence, we can allocate buffers of an optimal size to fit
246		// them. E.g. might be a single big chunk, and we wouldn't chop it
247		// into pieces.
248		w := NewWriter(&result, pool)
249		_, err := wt.WriteTo(w)
250		return result, err
251	}
252nextBuffer:
253	for {
254		buf := pool.Get(readAllBufSize)
255		// We asked for 32KiB but may have been given a bigger buffer.
256		// Use all of it if that's the case.
257		*buf = (*buf)[:cap(*buf)]
258		usedCap := 0
259		for {
260			n, err := r.Read((*buf)[usedCap:])
261			usedCap += n
262			if err != nil {
263				if usedCap == 0 {
264					// Nothing in this buf, put it back
265					pool.Put(buf)
266				} else {
267					*buf = (*buf)[:usedCap]
268					result = append(result, NewBuffer(buf, pool))
269				}
270				if err == io.EOF {
271					err = nil
272				}
273				return result, err
274			}
275			if len(*buf) == usedCap {
276				result = append(result, NewBuffer(buf, pool))
277				continue nextBuffer
278			}
279		}
280	}
281}