1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "fmt"
9 "math"
10)
11
12type roundRobinWriteScheduler struct {
13 // control contains control frames (SETTINGS, PING, etc.).
14 control writeQueue
15
16 // streams maps stream ID to a queue.
17 streams map[uint32]*writeQueue
18
19 // stream queues are stored in a circular linked list.
20 // head is the next stream to write, or nil if there are no streams open.
21 head *writeQueue
22
23 // pool of empty queues for reuse.
24 queuePool writeQueuePool
25}
26
27// newRoundRobinWriteScheduler constructs a new write scheduler.
28// The round robin scheduler priorizes control frames
29// like SETTINGS and PING over DATA frames.
30// When there are no control frames to send, it performs a round-robin
31// selection from the ready streams.
32func newRoundRobinWriteScheduler() WriteScheduler {
33 ws := &roundRobinWriteScheduler{
34 streams: make(map[uint32]*writeQueue),
35 }
36 return ws
37}
38
39func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
40 if ws.streams[streamID] != nil {
41 panic(fmt.Errorf("stream %d already opened", streamID))
42 }
43 q := ws.queuePool.get()
44 ws.streams[streamID] = q
45 if ws.head == nil {
46 ws.head = q
47 q.next = q
48 q.prev = q
49 } else {
50 // Queues are stored in a ring.
51 // Insert the new stream before ws.head, putting it at the end of the list.
52 q.prev = ws.head.prev
53 q.next = ws.head
54 q.prev.next = q
55 q.next.prev = q
56 }
57}
58
59func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
60 q := ws.streams[streamID]
61 if q == nil {
62 return
63 }
64 if q.next == q {
65 // This was the only open stream.
66 ws.head = nil
67 } else {
68 q.prev.next = q.next
69 q.next.prev = q.prev
70 if ws.head == q {
71 ws.head = q.next
72 }
73 }
74 delete(ws.streams, streamID)
75 ws.queuePool.put(q)
76}
77
78func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
79
80func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
81 if wr.isControl() {
82 ws.control.push(wr)
83 return
84 }
85 q := ws.streams[wr.StreamID()]
86 if q == nil {
87 // This is a closed stream.
88 // wr should not be a HEADERS or DATA frame.
89 // We push the request onto the control queue.
90 if wr.DataSize() > 0 {
91 panic("add DATA on non-open stream")
92 }
93 ws.control.push(wr)
94 return
95 }
96 q.push(wr)
97}
98
99func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
100 // Control and RST_STREAM frames first.
101 if !ws.control.empty() {
102 return ws.control.shift(), true
103 }
104 if ws.head == nil {
105 return FrameWriteRequest{}, false
106 }
107 q := ws.head
108 for {
109 if wr, ok := q.consume(math.MaxInt32); ok {
110 ws.head = q.next
111 return wr, true
112 }
113 q = q.next
114 if q == ws.head {
115 break
116 }
117 }
118 return FrameWriteRequest{}, false
119}