flowcontrol.go

  1/*
  2 *
  3 * Copyright 2014 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package transport
 20
 21import (
 22	"fmt"
 23	"math"
 24	"sync"
 25	"sync/atomic"
 26)
 27
 28// writeQuota is a soft limit on the amount of data a stream can
 29// schedule before some of it is written out.
 30type writeQuota struct {
 31	quota int32
 32	// get waits on read from when quota goes less than or equal to zero.
 33	// replenish writes on it when quota goes positive again.
 34	ch chan struct{}
 35	// done is triggered in error case.
 36	done <-chan struct{}
 37	// replenish is called by loopyWriter to give quota back to.
 38	// It is implemented as a field so that it can be updated
 39	// by tests.
 40	replenish func(n int)
 41}
 42
 43func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
 44	w := &writeQuota{
 45		quota: sz,
 46		ch:    make(chan struct{}, 1),
 47		done:  done,
 48	}
 49	w.replenish = w.realReplenish
 50	return w
 51}
 52
 53func (w *writeQuota) get(sz int32) error {
 54	for {
 55		if atomic.LoadInt32(&w.quota) > 0 {
 56			atomic.AddInt32(&w.quota, -sz)
 57			return nil
 58		}
 59		select {
 60		case <-w.ch:
 61			continue
 62		case <-w.done:
 63			return errStreamDone
 64		}
 65	}
 66}
 67
 68func (w *writeQuota) realReplenish(n int) {
 69	sz := int32(n)
 70	a := atomic.AddInt32(&w.quota, sz)
 71	b := a - sz
 72	if b <= 0 && a > 0 {
 73		select {
 74		case w.ch <- struct{}{}:
 75		default:
 76		}
 77	}
 78}
 79
 80type trInFlow struct {
 81	limit               uint32
 82	unacked             uint32
 83	effectiveWindowSize uint32
 84}
 85
 86func (f *trInFlow) newLimit(n uint32) uint32 {
 87	d := n - f.limit
 88	f.limit = n
 89	f.updateEffectiveWindowSize()
 90	return d
 91}
 92
 93func (f *trInFlow) onData(n uint32) uint32 {
 94	f.unacked += n
 95	if f.unacked < f.limit/4 {
 96		f.updateEffectiveWindowSize()
 97		return 0
 98	}
 99	return f.reset()
100}
101
102func (f *trInFlow) reset() uint32 {
103	w := f.unacked
104	f.unacked = 0
105	f.updateEffectiveWindowSize()
106	return w
107}
108
109func (f *trInFlow) updateEffectiveWindowSize() {
110	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
111}
112
113func (f *trInFlow) getSize() uint32 {
114	return atomic.LoadUint32(&f.effectiveWindowSize)
115}
116
117// TODO(mmukhi): Simplify this code.
118// inFlow deals with inbound flow control
119type inFlow struct {
120	mu sync.Mutex
121	// The inbound flow control limit for pending data.
122	limit uint32
123	// pendingData is the overall data which have been received but not been
124	// consumed by applications.
125	pendingData uint32
126	// The amount of data the application has consumed but grpc has not sent
127	// window update for them. Used to reduce window update frequency.
128	pendingUpdate uint32
129	// delta is the extra window update given by receiver when an application
130	// is reading data bigger in size than the inFlow limit.
131	delta uint32
132}
133
134// newLimit updates the inflow window to a new value n.
135// It assumes that n is always greater than the old limit.
136func (f *inFlow) newLimit(n uint32) {
137	f.mu.Lock()
138	f.limit = n
139	f.mu.Unlock()
140}
141
142func (f *inFlow) maybeAdjust(n uint32) uint32 {
143	if n > uint32(math.MaxInt32) {
144		n = uint32(math.MaxInt32)
145	}
146	f.mu.Lock()
147	defer f.mu.Unlock()
148	// estSenderQuota is the receiver's view of the maximum number of bytes the sender
149	// can send without a window update.
150	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
151	// estUntransmittedData is the maximum number of bytes the sends might not have put
152	// on the wire yet. A value of 0 or less means that we have already received all or
153	// more bytes than the application is requesting to read.
154	estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
155	// This implies that unless we send a window update, the sender won't be able to send all the bytes
156	// for this message. Therefore we must send an update over the limit since there's an active read
157	// request from the application.
158	if estUntransmittedData > estSenderQuota {
159		// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
160		if f.limit+n > maxWindowSize {
161			f.delta = maxWindowSize - f.limit
162		} else {
163			// Send a window update for the whole message and not just the difference between
164			// estUntransmittedData and estSenderQuota. This will be helpful in case the message
165			// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
166			f.delta = n
167		}
168		return f.delta
169	}
170	return 0
171}
172
173// onData is invoked when some data frame is received. It updates pendingData.
174func (f *inFlow) onData(n uint32) error {
175	f.mu.Lock()
176	f.pendingData += n
177	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
178		limit := f.limit
179		rcvd := f.pendingData + f.pendingUpdate
180		f.mu.Unlock()
181		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
182	}
183	f.mu.Unlock()
184	return nil
185}
186
187// onRead is invoked when the application reads the data. It returns the window size
188// to be sent to the peer.
189func (f *inFlow) onRead(n uint32) uint32 {
190	f.mu.Lock()
191	if f.pendingData == 0 {
192		f.mu.Unlock()
193		return 0
194	}
195	f.pendingData -= n
196	if n > f.delta {
197		n -= f.delta
198		f.delta = 0
199	} else {
200		f.delta -= n
201		n = 0
202	}
203	f.pendingUpdate += n
204	if f.pendingUpdate >= f.limit/4 {
205		wu := f.pendingUpdate
206		f.pendingUpdate = 0
207		f.mu.Unlock()
208		return wu
209	}
210	f.mu.Unlock()
211	return 0
212}