writesched_roundrobin.go

  1// Copyright 2023 The Go Authors. All rights reserved.
  2// Use of this source code is governed by a BSD-style
  3// license that can be found in the LICENSE file.
  4
  5package http2
  6
  7import (
  8	"fmt"
  9	"math"
 10)
 11
 12type roundRobinWriteScheduler struct {
 13	// control contains control frames (SETTINGS, PING, etc.).
 14	control writeQueue
 15
 16	// streams maps stream ID to a queue.
 17	streams map[uint32]*writeQueue
 18
 19	// stream queues are stored in a circular linked list.
 20	// head is the next stream to write, or nil if there are no streams open.
 21	head *writeQueue
 22
 23	// pool of empty queues for reuse.
 24	queuePool writeQueuePool
 25}
 26
 27// newRoundRobinWriteScheduler constructs a new write scheduler.
 28// The round robin scheduler priorizes control frames
 29// like SETTINGS and PING over DATA frames.
 30// When there are no control frames to send, it performs a round-robin
 31// selection from the ready streams.
 32func newRoundRobinWriteScheduler() WriteScheduler {
 33	ws := &roundRobinWriteScheduler{
 34		streams: make(map[uint32]*writeQueue),
 35	}
 36	return ws
 37}
 38
 39func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
 40	if ws.streams[streamID] != nil {
 41		panic(fmt.Errorf("stream %d already opened", streamID))
 42	}
 43	q := ws.queuePool.get()
 44	ws.streams[streamID] = q
 45	if ws.head == nil {
 46		ws.head = q
 47		q.next = q
 48		q.prev = q
 49	} else {
 50		// Queues are stored in a ring.
 51		// Insert the new stream before ws.head, putting it at the end of the list.
 52		q.prev = ws.head.prev
 53		q.next = ws.head
 54		q.prev.next = q
 55		q.next.prev = q
 56	}
 57}
 58
 59func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
 60	q := ws.streams[streamID]
 61	if q == nil {
 62		return
 63	}
 64	if q.next == q {
 65		// This was the only open stream.
 66		ws.head = nil
 67	} else {
 68		q.prev.next = q.next
 69		q.next.prev = q.prev
 70		if ws.head == q {
 71			ws.head = q.next
 72		}
 73	}
 74	delete(ws.streams, streamID)
 75	ws.queuePool.put(q)
 76}
 77
 78func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
 79
 80func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
 81	if wr.isControl() {
 82		ws.control.push(wr)
 83		return
 84	}
 85	q := ws.streams[wr.StreamID()]
 86	if q == nil {
 87		// This is a closed stream.
 88		// wr should not be a HEADERS or DATA frame.
 89		// We push the request onto the control queue.
 90		if wr.DataSize() > 0 {
 91			panic("add DATA on non-open stream")
 92		}
 93		ws.control.push(wr)
 94		return
 95	}
 96	q.push(wr)
 97}
 98
 99func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
100	// Control and RST_STREAM frames first.
101	if !ws.control.empty() {
102		return ws.control.shift(), true
103	}
104	if ws.head == nil {
105		return FrameWriteRequest{}, false
106	}
107	q := ws.head
108	for {
109		if wr, ok := q.consume(math.MaxInt32); ok {
110			ws.head = q.next
111			return wr, true
112		}
113		q = q.next
114		if q == ws.head {
115			break
116		}
117	}
118	return FrameWriteRequest{}, false
119}