1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package channelz
20
21import (
22 "fmt"
23 "sort"
24 "sync"
25 "time"
26)
27
28// entry represents a node in the channelz database.
29type entry interface {
30 // addChild adds a child e, whose channelz id is id to child list
31 addChild(id int64, e entry)
32 // deleteChild deletes a child with channelz id to be id from child list
33 deleteChild(id int64)
34 // triggerDelete tries to delete self from channelz database. However, if
35 // child list is not empty, then deletion from the database is on hold until
36 // the last child is deleted from database.
37 triggerDelete()
38 // deleteSelfIfReady check whether triggerDelete() has been called before,
39 // and whether child list is now empty. If both conditions are met, then
40 // delete self from database.
41 deleteSelfIfReady()
42 // getParentID returns parent ID of the entry. 0 value parent ID means no parent.
43 getParentID() int64
44 Entity
45}
46
47// channelMap is the storage data structure for channelz.
48//
49// Methods of channelMap can be divided into two categories with respect to
50// locking.
51//
52// 1. Methods acquire the global lock.
53// 2. Methods that can only be called when global lock is held.
54//
55// A second type of method need always to be called inside a first type of method.
56type channelMap struct {
57 mu sync.RWMutex
58 topLevelChannels map[int64]struct{}
59 channels map[int64]*Channel
60 subChannels map[int64]*SubChannel
61 sockets map[int64]*Socket
62 servers map[int64]*Server
63}
64
65func newChannelMap() *channelMap {
66 return &channelMap{
67 topLevelChannels: make(map[int64]struct{}),
68 channels: make(map[int64]*Channel),
69 subChannels: make(map[int64]*SubChannel),
70 sockets: make(map[int64]*Socket),
71 servers: make(map[int64]*Server),
72 }
73}
74
75func (c *channelMap) addServer(id int64, s *Server) {
76 c.mu.Lock()
77 defer c.mu.Unlock()
78 s.cm = c
79 c.servers[id] = s
80}
81
82func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
83 c.mu.Lock()
84 defer c.mu.Unlock()
85 cn.trace.cm = c
86 c.channels[id] = cn
87 if isTopChannel {
88 c.topLevelChannels[id] = struct{}{}
89 } else if p := c.channels[pid]; p != nil {
90 p.addChild(id, cn)
91 } else {
92 logger.Infof("channel %d references invalid parent ID %d", id, pid)
93 }
94}
95
96func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
97 c.mu.Lock()
98 defer c.mu.Unlock()
99 sc.trace.cm = c
100 c.subChannels[id] = sc
101 if p := c.channels[pid]; p != nil {
102 p.addChild(id, sc)
103 } else {
104 logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
105 }
106}
107
108func (c *channelMap) addSocket(s *Socket) {
109 c.mu.Lock()
110 defer c.mu.Unlock()
111 s.cm = c
112 c.sockets[s.ID] = s
113 if s.Parent == nil {
114 logger.Infof("normal socket %d has no parent", s.ID)
115 }
116 s.Parent.(entry).addChild(s.ID, s)
117}
118
119// removeEntry triggers the removal of an entry, which may not indeed delete the
120// entry, if it has to wait on the deletion of its children and until no other
121// entity's channel trace references it. It may lead to a chain of entry
122// deletion. For example, deleting the last socket of a gracefully shutting down
123// server will lead to the server being also deleted.
124func (c *channelMap) removeEntry(id int64) {
125 c.mu.Lock()
126 defer c.mu.Unlock()
127 c.findEntry(id).triggerDelete()
128}
129
130// tracedChannel represents tracing operations which are present on both
131// channels and subChannels.
132type tracedChannel interface {
133 getChannelTrace() *ChannelTrace
134 incrTraceRefCount()
135 decrTraceRefCount()
136 getRefName() string
137}
138
139// c.mu must be held by the caller
140func (c *channelMap) decrTraceRefCount(id int64) {
141 e := c.findEntry(id)
142 if v, ok := e.(tracedChannel); ok {
143 v.decrTraceRefCount()
144 e.deleteSelfIfReady()
145 }
146}
147
148// c.mu must be held by the caller.
149func (c *channelMap) findEntry(id int64) entry {
150 if v, ok := c.channels[id]; ok {
151 return v
152 }
153 if v, ok := c.subChannels[id]; ok {
154 return v
155 }
156 if v, ok := c.servers[id]; ok {
157 return v
158 }
159 if v, ok := c.sockets[id]; ok {
160 return v
161 }
162 return &dummyEntry{idNotFound: id}
163}
164
165// c.mu must be held by the caller
166//
167// deleteEntry deletes an entry from the channelMap. Before calling this method,
168// caller must check this entry is ready to be deleted, i.e removeEntry() has
169// been called on it, and no children still exist.
170func (c *channelMap) deleteEntry(id int64) entry {
171 if v, ok := c.sockets[id]; ok {
172 delete(c.sockets, id)
173 return v
174 }
175 if v, ok := c.subChannels[id]; ok {
176 delete(c.subChannels, id)
177 return v
178 }
179 if v, ok := c.channels[id]; ok {
180 delete(c.channels, id)
181 delete(c.topLevelChannels, id)
182 return v
183 }
184 if v, ok := c.servers[id]; ok {
185 delete(c.servers, id)
186 return v
187 }
188 return &dummyEntry{idNotFound: id}
189}
190
191func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
192 c.mu.Lock()
193 defer c.mu.Unlock()
194 child := c.findEntry(id)
195 childTC, ok := child.(tracedChannel)
196 if !ok {
197 return
198 }
199 childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
200 if desc.Parent != nil {
201 parent := c.findEntry(child.getParentID())
202 var chanType RefChannelType
203 switch child.(type) {
204 case *Channel:
205 chanType = RefChannel
206 case *SubChannel:
207 chanType = RefSubChannel
208 }
209 if parentTC, ok := parent.(tracedChannel); ok {
210 parentTC.getChannelTrace().append(&traceEvent{
211 Desc: desc.Parent.Desc,
212 Severity: desc.Parent.Severity,
213 Timestamp: time.Now(),
214 RefID: id,
215 RefName: childTC.getRefName(),
216 RefType: chanType,
217 })
218 childTC.incrTraceRefCount()
219 }
220 }
221}
222
223type int64Slice []int64
224
225func (s int64Slice) Len() int { return len(s) }
226func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
227func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
228
229func copyMap(m map[int64]string) map[int64]string {
230 n := make(map[int64]string)
231 for k, v := range m {
232 n[k] = v
233 }
234 return n
235}
236
237func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
238 if maxResults <= 0 {
239 maxResults = EntriesPerPage
240 }
241 c.mu.RLock()
242 defer c.mu.RUnlock()
243 l := int64(len(c.topLevelChannels))
244 ids := make([]int64, 0, l)
245
246 for k := range c.topLevelChannels {
247 ids = append(ids, k)
248 }
249 sort.Sort(int64Slice(ids))
250 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
251 end := true
252 var t []*Channel
253 for _, v := range ids[idx:] {
254 if len(t) == maxResults {
255 end = false
256 break
257 }
258 if cn, ok := c.channels[v]; ok {
259 t = append(t, cn)
260 }
261 }
262 return t, end
263}
264
265func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
266 if maxResults <= 0 {
267 maxResults = EntriesPerPage
268 }
269 c.mu.RLock()
270 defer c.mu.RUnlock()
271 ids := make([]int64, 0, len(c.servers))
272 for k := range c.servers {
273 ids = append(ids, k)
274 }
275 sort.Sort(int64Slice(ids))
276 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
277 end := true
278 var s []*Server
279 for _, v := range ids[idx:] {
280 if len(s) == maxResults {
281 end = false
282 break
283 }
284 if svr, ok := c.servers[v]; ok {
285 s = append(s, svr)
286 }
287 }
288 return s, end
289}
290
291func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
292 if maxResults <= 0 {
293 maxResults = EntriesPerPage
294 }
295 c.mu.RLock()
296 defer c.mu.RUnlock()
297 svr, ok := c.servers[id]
298 if !ok {
299 // server with id doesn't exist.
300 return nil, true
301 }
302 svrskts := svr.sockets
303 ids := make([]int64, 0, len(svrskts))
304 sks := make([]*Socket, 0, min(len(svrskts), maxResults))
305 for k := range svrskts {
306 ids = append(ids, k)
307 }
308 sort.Sort(int64Slice(ids))
309 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
310 end := true
311 for _, v := range ids[idx:] {
312 if len(sks) == maxResults {
313 end = false
314 break
315 }
316 if ns, ok := c.sockets[v]; ok {
317 sks = append(sks, ns)
318 }
319 }
320 return sks, end
321}
322
323func (c *channelMap) getChannel(id int64) *Channel {
324 c.mu.RLock()
325 defer c.mu.RUnlock()
326 return c.channels[id]
327}
328
329func (c *channelMap) getSubChannel(id int64) *SubChannel {
330 c.mu.RLock()
331 defer c.mu.RUnlock()
332 return c.subChannels[id]
333}
334
335func (c *channelMap) getSocket(id int64) *Socket {
336 c.mu.RLock()
337 defer c.mu.RUnlock()
338 return c.sockets[id]
339}
340
341func (c *channelMap) getServer(id int64) *Server {
342 c.mu.RLock()
343 defer c.mu.RUnlock()
344 return c.servers[id]
345}
346
347type dummyEntry struct {
348 // dummyEntry is a fake entry to handle entry not found case.
349 idNotFound int64
350 Entity
351}
352
353func (d *dummyEntry) String() string {
354 return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
355}
356
357func (d *dummyEntry) ID() int64 { return d.idNotFound }
358
359func (d *dummyEntry) addChild(id int64, e entry) {
360 // Note: It is possible for a normal program to reach here under race
361 // condition. For example, there could be a race between ClientConn.Close()
362 // info being propagated to addrConn and http2Client. ClientConn.Close()
363 // cancel the context and result in http2Client to error. The error info is
364 // then caught by transport monitor and before addrConn.tearDown() is called
365 // in side ClientConn.Close(). Therefore, the addrConn will create a new
366 // transport. And when registering the new transport in channelz, its parent
367 // addrConn could have already been torn down and deleted from channelz
368 // tracking, and thus reach the code here.
369 logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
370}
371
372func (d *dummyEntry) deleteChild(id int64) {
373 // It is possible for a normal program to reach here under race condition.
374 // Refer to the example described in addChild().
375 logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
376}
377
378func (d *dummyEntry) triggerDelete() {
379 logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
380}
381
382func (*dummyEntry) deleteSelfIfReady() {
383 // code should not reach here. deleteSelfIfReady is always called on an existing entry.
384}
385
386func (*dummyEntry) getParentID() int64 {
387 return 0
388}
389
390// Entity is implemented by all channelz types.
391type Entity interface {
392 isEntity()
393 fmt.Stringer
394 id() int64
395}