pipe.go

  1// Copyright 2014 The Go Authors. All rights reserved.
  2// Use of this source code is governed by a BSD-style
  3// license that can be found in the LICENSE file.
  4
  5package http2
  6
  7import (
  8	"errors"
  9	"io"
 10	"sync"
 11)
 12
 13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
 14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
 15// underlying buffer is an interface. (io.Pipe is always unbuffered)
 16type pipe struct {
 17	mu       sync.Mutex
 18	c        sync.Cond     // c.L lazily initialized to &p.mu
 19	b        pipeBuffer    // nil when done reading
 20	unread   int           // bytes unread when done
 21	err      error         // read error once empty. non-nil means closed.
 22	breakErr error         // immediate read error (caller doesn't see rest of b)
 23	donec    chan struct{} // closed on error
 24	readFn   func()        // optional code to run in Read before error
 25}
 26
 27type pipeBuffer interface {
 28	Len() int
 29	io.Writer
 30	io.Reader
 31}
 32
 33// setBuffer initializes the pipe buffer.
 34// It has no effect if the pipe is already closed.
 35func (p *pipe) setBuffer(b pipeBuffer) {
 36	p.mu.Lock()
 37	defer p.mu.Unlock()
 38	if p.err != nil || p.breakErr != nil {
 39		return
 40	}
 41	p.b = b
 42}
 43
 44func (p *pipe) Len() int {
 45	p.mu.Lock()
 46	defer p.mu.Unlock()
 47	if p.b == nil {
 48		return p.unread
 49	}
 50	return p.b.Len()
 51}
 52
 53// Read waits until data is available and copies bytes
 54// from the buffer into p.
 55func (p *pipe) Read(d []byte) (n int, err error) {
 56	p.mu.Lock()
 57	defer p.mu.Unlock()
 58	if p.c.L == nil {
 59		p.c.L = &p.mu
 60	}
 61	for {
 62		if p.breakErr != nil {
 63			return 0, p.breakErr
 64		}
 65		if p.b != nil && p.b.Len() > 0 {
 66			return p.b.Read(d)
 67		}
 68		if p.err != nil {
 69			if p.readFn != nil {
 70				p.readFn()     // e.g. copy trailers
 71				p.readFn = nil // not sticky like p.err
 72			}
 73			p.b = nil
 74			return 0, p.err
 75		}
 76		p.c.Wait()
 77	}
 78}
 79
 80var (
 81	errClosedPipeWrite        = errors.New("write on closed buffer")
 82	errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
 83)
 84
 85// Write copies bytes from p into the buffer and wakes a reader.
 86// It is an error to write more data than the buffer can hold.
 87func (p *pipe) Write(d []byte) (n int, err error) {
 88	p.mu.Lock()
 89	defer p.mu.Unlock()
 90	if p.c.L == nil {
 91		p.c.L = &p.mu
 92	}
 93	defer p.c.Signal()
 94	if p.err != nil || p.breakErr != nil {
 95		return 0, errClosedPipeWrite
 96	}
 97	// pipe.setBuffer is never invoked, leaving the buffer uninitialized.
 98	// We shouldn't try to write to an uninitialized pipe,
 99	// but returning an error is better than panicking.
100	if p.b == nil {
101		return 0, errUninitializedPipeWrite
102	}
103	return p.b.Write(d)
104}
105
106// CloseWithError causes the next Read (waking up a current blocked
107// Read if needed) to return the provided err after all data has been
108// read.
109//
110// The error must be non-nil.
111func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
112
113// BreakWithError causes the next Read (waking up a current blocked
114// Read if needed) to return the provided err immediately, without
115// waiting for unread data.
116func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
117
118// closeWithErrorAndCode is like CloseWithError but also sets some code to run
119// in the caller's goroutine before returning the error.
120func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
121
122func (p *pipe) closeWithError(dst *error, err error, fn func()) {
123	if err == nil {
124		panic("err must be non-nil")
125	}
126	p.mu.Lock()
127	defer p.mu.Unlock()
128	if p.c.L == nil {
129		p.c.L = &p.mu
130	}
131	defer p.c.Signal()
132	if *dst != nil {
133		// Already been done.
134		return
135	}
136	p.readFn = fn
137	if dst == &p.breakErr {
138		if p.b != nil {
139			p.unread += p.b.Len()
140		}
141		p.b = nil
142	}
143	*dst = err
144	p.closeDoneLocked()
145}
146
147// requires p.mu be held.
148func (p *pipe) closeDoneLocked() {
149	if p.donec == nil {
150		return
151	}
152	// Close if unclosed. This isn't racy since we always
153	// hold p.mu while closing.
154	select {
155	case <-p.donec:
156	default:
157		close(p.donec)
158	}
159}
160
161// Err returns the error (if any) first set by BreakWithError or CloseWithError.
162func (p *pipe) Err() error {
163	p.mu.Lock()
164	defer p.mu.Unlock()
165	if p.breakErr != nil {
166		return p.breakErr
167	}
168	return p.err
169}
170
171// Done returns a channel which is closed if and when this pipe is closed
172// with CloseWithError.
173func (p *pipe) Done() <-chan struct{} {
174	p.mu.Lock()
175	defer p.mu.Unlock()
176	if p.donec == nil {
177		p.donec = make(chan struct{})
178		if p.err != nil || p.breakErr != nil {
179			// Already hit an error.
180			p.closeDoneLocked()
181		}
182	}
183	return p.donec
184}