databuffer.go

  1// Copyright 2014 The Go Authors. All rights reserved.
  2// Use of this source code is governed by a BSD-style
  3// license that can be found in the LICENSE file.
  4
  5package http2
  6
  7import (
  8	"errors"
  9	"fmt"
 10	"sync"
 11)
 12
 13// Buffer chunks are allocated from a pool to reduce pressure on GC.
 14// The maximum wasted space per dataBuffer is 2x the largest size class,
 15// which happens when the dataBuffer has multiple chunks and there is
 16// one unread byte in both the first and last chunks. We use a few size
 17// classes to minimize overheads for servers that typically receive very
 18// small request bodies.
 19//
 20// TODO: Benchmark to determine if the pools are necessary. The GC may have
 21// improved enough that we can instead allocate chunks like this:
 22// make([]byte, max(16<<10, expectedBytesRemaining))
 23var dataChunkPools = [...]sync.Pool{
 24	{New: func() interface{} { return new([1 << 10]byte) }},
 25	{New: func() interface{} { return new([2 << 10]byte) }},
 26	{New: func() interface{} { return new([4 << 10]byte) }},
 27	{New: func() interface{} { return new([8 << 10]byte) }},
 28	{New: func() interface{} { return new([16 << 10]byte) }},
 29}
 30
 31func getDataBufferChunk(size int64) []byte {
 32	switch {
 33	case size <= 1<<10:
 34		return dataChunkPools[0].Get().(*[1 << 10]byte)[:]
 35	case size <= 2<<10:
 36		return dataChunkPools[1].Get().(*[2 << 10]byte)[:]
 37	case size <= 4<<10:
 38		return dataChunkPools[2].Get().(*[4 << 10]byte)[:]
 39	case size <= 8<<10:
 40		return dataChunkPools[3].Get().(*[8 << 10]byte)[:]
 41	default:
 42		return dataChunkPools[4].Get().(*[16 << 10]byte)[:]
 43	}
 44}
 45
 46func putDataBufferChunk(p []byte) {
 47	switch len(p) {
 48	case 1 << 10:
 49		dataChunkPools[0].Put((*[1 << 10]byte)(p))
 50	case 2 << 10:
 51		dataChunkPools[1].Put((*[2 << 10]byte)(p))
 52	case 4 << 10:
 53		dataChunkPools[2].Put((*[4 << 10]byte)(p))
 54	case 8 << 10:
 55		dataChunkPools[3].Put((*[8 << 10]byte)(p))
 56	case 16 << 10:
 57		dataChunkPools[4].Put((*[16 << 10]byte)(p))
 58	default:
 59		panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
 60	}
 61}
 62
 63// dataBuffer is an io.ReadWriter backed by a list of data chunks.
 64// Each dataBuffer is used to read DATA frames on a single stream.
 65// The buffer is divided into chunks so the server can limit the
 66// total memory used by a single connection without limiting the
 67// request body size on any single stream.
 68type dataBuffer struct {
 69	chunks   [][]byte
 70	r        int   // next byte to read is chunks[0][r]
 71	w        int   // next byte to write is chunks[len(chunks)-1][w]
 72	size     int   // total buffered bytes
 73	expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0)
 74}
 75
 76var errReadEmpty = errors.New("read from empty dataBuffer")
 77
 78// Read copies bytes from the buffer into p.
 79// It is an error to read when no data is available.
 80func (b *dataBuffer) Read(p []byte) (int, error) {
 81	if b.size == 0 {
 82		return 0, errReadEmpty
 83	}
 84	var ntotal int
 85	for len(p) > 0 && b.size > 0 {
 86		readFrom := b.bytesFromFirstChunk()
 87		n := copy(p, readFrom)
 88		p = p[n:]
 89		ntotal += n
 90		b.r += n
 91		b.size -= n
 92		// If the first chunk has been consumed, advance to the next chunk.
 93		if b.r == len(b.chunks[0]) {
 94			putDataBufferChunk(b.chunks[0])
 95			end := len(b.chunks) - 1
 96			copy(b.chunks[:end], b.chunks[1:])
 97			b.chunks[end] = nil
 98			b.chunks = b.chunks[:end]
 99			b.r = 0
100		}
101	}
102	return ntotal, nil
103}
104
105func (b *dataBuffer) bytesFromFirstChunk() []byte {
106	if len(b.chunks) == 1 {
107		return b.chunks[0][b.r:b.w]
108	}
109	return b.chunks[0][b.r:]
110}
111
112// Len returns the number of bytes of the unread portion of the buffer.
113func (b *dataBuffer) Len() int {
114	return b.size
115}
116
117// Write appends p to the buffer.
118func (b *dataBuffer) Write(p []byte) (int, error) {
119	ntotal := len(p)
120	for len(p) > 0 {
121		// If the last chunk is empty, allocate a new chunk. Try to allocate
122		// enough to fully copy p plus any additional bytes we expect to
123		// receive. However, this may allocate less than len(p).
124		want := int64(len(p))
125		if b.expected > want {
126			want = b.expected
127		}
128		chunk := b.lastChunkOrAlloc(want)
129		n := copy(chunk[b.w:], p)
130		p = p[n:]
131		b.w += n
132		b.size += n
133		b.expected -= int64(n)
134	}
135	return ntotal, nil
136}
137
138func (b *dataBuffer) lastChunkOrAlloc(want int64) []byte {
139	if len(b.chunks) != 0 {
140		last := b.chunks[len(b.chunks)-1]
141		if b.w < len(last) {
142			return last
143		}
144	}
145	chunk := getDataBufferChunk(want)
146	b.chunks = append(b.chunks, chunk)
147	b.w = 0
148	return chunk
149}