channelmap.go

  1/*
  2 *
  3 * Copyright 2018 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package channelz
 20
 21import (
 22	"fmt"
 23	"sort"
 24	"sync"
 25	"time"
 26)
 27
 28// entry represents a node in the channelz database.
 29type entry interface {
 30	// addChild adds a child e, whose channelz id is id to child list
 31	addChild(id int64, e entry)
 32	// deleteChild deletes a child with channelz id to be id from child list
 33	deleteChild(id int64)
 34	// triggerDelete tries to delete self from channelz database. However, if
 35	// child list is not empty, then deletion from the database is on hold until
 36	// the last child is deleted from database.
 37	triggerDelete()
 38	// deleteSelfIfReady check whether triggerDelete() has been called before,
 39	// and whether child list is now empty. If both conditions are met, then
 40	// delete self from database.
 41	deleteSelfIfReady()
 42	// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
 43	getParentID() int64
 44	Entity
 45}
 46
 47// channelMap is the storage data structure for channelz.
 48//
 49// Methods of channelMap can be divided into two categories with respect to
 50// locking.
 51//
 52// 1. Methods acquire the global lock.
 53// 2. Methods that can only be called when global lock is held.
 54//
 55// A second type of method need always to be called inside a first type of method.
 56type channelMap struct {
 57	mu               sync.RWMutex
 58	topLevelChannels map[int64]struct{}
 59	channels         map[int64]*Channel
 60	subChannels      map[int64]*SubChannel
 61	sockets          map[int64]*Socket
 62	servers          map[int64]*Server
 63}
 64
 65func newChannelMap() *channelMap {
 66	return &channelMap{
 67		topLevelChannels: make(map[int64]struct{}),
 68		channels:         make(map[int64]*Channel),
 69		subChannels:      make(map[int64]*SubChannel),
 70		sockets:          make(map[int64]*Socket),
 71		servers:          make(map[int64]*Server),
 72	}
 73}
 74
 75func (c *channelMap) addServer(id int64, s *Server) {
 76	c.mu.Lock()
 77	defer c.mu.Unlock()
 78	s.cm = c
 79	c.servers[id] = s
 80}
 81
 82func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
 83	c.mu.Lock()
 84	defer c.mu.Unlock()
 85	cn.trace.cm = c
 86	c.channels[id] = cn
 87	if isTopChannel {
 88		c.topLevelChannels[id] = struct{}{}
 89	} else if p := c.channels[pid]; p != nil {
 90		p.addChild(id, cn)
 91	} else {
 92		logger.Infof("channel %d references invalid parent ID %d", id, pid)
 93	}
 94}
 95
 96func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
 97	c.mu.Lock()
 98	defer c.mu.Unlock()
 99	sc.trace.cm = c
100	c.subChannels[id] = sc
101	if p := c.channels[pid]; p != nil {
102		p.addChild(id, sc)
103	} else {
104		logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
105	}
106}
107
108func (c *channelMap) addSocket(s *Socket) {
109	c.mu.Lock()
110	defer c.mu.Unlock()
111	s.cm = c
112	c.sockets[s.ID] = s
113	if s.Parent == nil {
114		logger.Infof("normal socket %d has no parent", s.ID)
115	}
116	s.Parent.(entry).addChild(s.ID, s)
117}
118
119// removeEntry triggers the removal of an entry, which may not indeed delete the
120// entry, if it has to wait on the deletion of its children and until no other
121// entity's channel trace references it.  It may lead to a chain of entry
122// deletion. For example, deleting the last socket of a gracefully shutting down
123// server will lead to the server being also deleted.
124func (c *channelMap) removeEntry(id int64) {
125	c.mu.Lock()
126	defer c.mu.Unlock()
127	c.findEntry(id).triggerDelete()
128}
129
130// tracedChannel represents tracing operations which are present on both
131// channels and subChannels.
132type tracedChannel interface {
133	getChannelTrace() *ChannelTrace
134	incrTraceRefCount()
135	decrTraceRefCount()
136	getRefName() string
137}
138
139// c.mu must be held by the caller
140func (c *channelMap) decrTraceRefCount(id int64) {
141	e := c.findEntry(id)
142	if v, ok := e.(tracedChannel); ok {
143		v.decrTraceRefCount()
144		e.deleteSelfIfReady()
145	}
146}
147
148// c.mu must be held by the caller.
149func (c *channelMap) findEntry(id int64) entry {
150	if v, ok := c.channels[id]; ok {
151		return v
152	}
153	if v, ok := c.subChannels[id]; ok {
154		return v
155	}
156	if v, ok := c.servers[id]; ok {
157		return v
158	}
159	if v, ok := c.sockets[id]; ok {
160		return v
161	}
162	return &dummyEntry{idNotFound: id}
163}
164
165// c.mu must be held by the caller
166//
167// deleteEntry deletes an entry from the channelMap. Before calling this method,
168// caller must check this entry is ready to be deleted, i.e removeEntry() has
169// been called on it, and no children still exist.
170func (c *channelMap) deleteEntry(id int64) entry {
171	if v, ok := c.sockets[id]; ok {
172		delete(c.sockets, id)
173		return v
174	}
175	if v, ok := c.subChannels[id]; ok {
176		delete(c.subChannels, id)
177		return v
178	}
179	if v, ok := c.channels[id]; ok {
180		delete(c.channels, id)
181		delete(c.topLevelChannels, id)
182		return v
183	}
184	if v, ok := c.servers[id]; ok {
185		delete(c.servers, id)
186		return v
187	}
188	return &dummyEntry{idNotFound: id}
189}
190
191func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
192	c.mu.Lock()
193	defer c.mu.Unlock()
194	child := c.findEntry(id)
195	childTC, ok := child.(tracedChannel)
196	if !ok {
197		return
198	}
199	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
200	if desc.Parent != nil {
201		parent := c.findEntry(child.getParentID())
202		var chanType RefChannelType
203		switch child.(type) {
204		case *Channel:
205			chanType = RefChannel
206		case *SubChannel:
207			chanType = RefSubChannel
208		}
209		if parentTC, ok := parent.(tracedChannel); ok {
210			parentTC.getChannelTrace().append(&traceEvent{
211				Desc:      desc.Parent.Desc,
212				Severity:  desc.Parent.Severity,
213				Timestamp: time.Now(),
214				RefID:     id,
215				RefName:   childTC.getRefName(),
216				RefType:   chanType,
217			})
218			childTC.incrTraceRefCount()
219		}
220	}
221}
222
223type int64Slice []int64
224
225func (s int64Slice) Len() int           { return len(s) }
226func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
227func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
228
229func copyMap(m map[int64]string) map[int64]string {
230	n := make(map[int64]string)
231	for k, v := range m {
232		n[k] = v
233	}
234	return n
235}
236
237func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
238	if maxResults <= 0 {
239		maxResults = EntriesPerPage
240	}
241	c.mu.RLock()
242	defer c.mu.RUnlock()
243	l := int64(len(c.topLevelChannels))
244	ids := make([]int64, 0, l)
245
246	for k := range c.topLevelChannels {
247		ids = append(ids, k)
248	}
249	sort.Sort(int64Slice(ids))
250	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
251	end := true
252	var t []*Channel
253	for _, v := range ids[idx:] {
254		if len(t) == maxResults {
255			end = false
256			break
257		}
258		if cn, ok := c.channels[v]; ok {
259			t = append(t, cn)
260		}
261	}
262	return t, end
263}
264
265func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
266	if maxResults <= 0 {
267		maxResults = EntriesPerPage
268	}
269	c.mu.RLock()
270	defer c.mu.RUnlock()
271	ids := make([]int64, 0, len(c.servers))
272	for k := range c.servers {
273		ids = append(ids, k)
274	}
275	sort.Sort(int64Slice(ids))
276	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
277	end := true
278	var s []*Server
279	for _, v := range ids[idx:] {
280		if len(s) == maxResults {
281			end = false
282			break
283		}
284		if svr, ok := c.servers[v]; ok {
285			s = append(s, svr)
286		}
287	}
288	return s, end
289}
290
291func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
292	if maxResults <= 0 {
293		maxResults = EntriesPerPage
294	}
295	c.mu.RLock()
296	defer c.mu.RUnlock()
297	svr, ok := c.servers[id]
298	if !ok {
299		// server with id doesn't exist.
300		return nil, true
301	}
302	svrskts := svr.sockets
303	ids := make([]int64, 0, len(svrskts))
304	sks := make([]*Socket, 0, min(len(svrskts), maxResults))
305	for k := range svrskts {
306		ids = append(ids, k)
307	}
308	sort.Sort(int64Slice(ids))
309	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
310	end := true
311	for _, v := range ids[idx:] {
312		if len(sks) == maxResults {
313			end = false
314			break
315		}
316		if ns, ok := c.sockets[v]; ok {
317			sks = append(sks, ns)
318		}
319	}
320	return sks, end
321}
322
323func (c *channelMap) getChannel(id int64) *Channel {
324	c.mu.RLock()
325	defer c.mu.RUnlock()
326	return c.channels[id]
327}
328
329func (c *channelMap) getSubChannel(id int64) *SubChannel {
330	c.mu.RLock()
331	defer c.mu.RUnlock()
332	return c.subChannels[id]
333}
334
335func (c *channelMap) getSocket(id int64) *Socket {
336	c.mu.RLock()
337	defer c.mu.RUnlock()
338	return c.sockets[id]
339}
340
341func (c *channelMap) getServer(id int64) *Server {
342	c.mu.RLock()
343	defer c.mu.RUnlock()
344	return c.servers[id]
345}
346
347type dummyEntry struct {
348	// dummyEntry is a fake entry to handle entry not found case.
349	idNotFound int64
350	Entity
351}
352
353func (d *dummyEntry) String() string {
354	return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
355}
356
357func (d *dummyEntry) ID() int64 { return d.idNotFound }
358
359func (d *dummyEntry) addChild(id int64, e entry) {
360	// Note: It is possible for a normal program to reach here under race
361	// condition.  For example, there could be a race between ClientConn.Close()
362	// info being propagated to addrConn and http2Client. ClientConn.Close()
363	// cancel the context and result in http2Client to error. The error info is
364	// then caught by transport monitor and before addrConn.tearDown() is called
365	// in side ClientConn.Close(). Therefore, the addrConn will create a new
366	// transport. And when registering the new transport in channelz, its parent
367	// addrConn could have already been torn down and deleted from channelz
368	// tracking, and thus reach the code here.
369	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
370}
371
372func (d *dummyEntry) deleteChild(id int64) {
373	// It is possible for a normal program to reach here under race condition.
374	// Refer to the example described in addChild().
375	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
376}
377
378func (d *dummyEntry) triggerDelete() {
379	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
380}
381
382func (*dummyEntry) deleteSelfIfReady() {
383	// code should not reach here. deleteSelfIfReady is always called on an existing entry.
384}
385
386func (*dummyEntry) getParentID() int64 {
387	return 0
388}
389
390// Entity is implemented by all channelz types.
391type Entity interface {
392	isEntity()
393	fmt.Stringer
394	id() int64
395}