method_logger.go

  1/*
  2 *
  3 * Copyright 2018 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package binarylog
 20
 21import (
 22	"context"
 23	"net"
 24	"strings"
 25	"sync/atomic"
 26	"time"
 27
 28	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
 29	"google.golang.org/grpc/metadata"
 30	"google.golang.org/grpc/status"
 31	"google.golang.org/protobuf/proto"
 32	"google.golang.org/protobuf/types/known/durationpb"
 33	"google.golang.org/protobuf/types/known/timestamppb"
 34)
 35
 36type callIDGenerator struct {
 37	id uint64
 38}
 39
 40func (g *callIDGenerator) next() uint64 {
 41	id := atomic.AddUint64(&g.id, 1)
 42	return id
 43}
 44
 45// reset is for testing only, and doesn't need to be thread safe.
 46func (g *callIDGenerator) reset() {
 47	g.id = 0
 48}
 49
 50var idGen callIDGenerator
 51
 52// MethodLogger is the sub-logger for each method.
 53//
 54// This is used in the 1.0 release of gcp/observability, and thus must not be
 55// deleted or changed.
 56type MethodLogger interface {
 57	Log(context.Context, LogEntryConfig)
 58}
 59
 60// TruncatingMethodLogger is a method logger that truncates headers and messages
 61// based on configured fields.
 62type TruncatingMethodLogger struct {
 63	headerMaxLen, messageMaxLen uint64
 64
 65	callID          uint64
 66	idWithinCallGen *callIDGenerator
 67
 68	sink Sink // TODO(blog): make this pluggable.
 69}
 70
 71// NewTruncatingMethodLogger returns a new truncating method logger.
 72//
 73// This is used in the 1.0 release of gcp/observability, and thus must not be
 74// deleted or changed.
 75func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
 76	return &TruncatingMethodLogger{
 77		headerMaxLen:  h,
 78		messageMaxLen: m,
 79
 80		callID:          idGen.next(),
 81		idWithinCallGen: &callIDGenerator{},
 82
 83		sink: DefaultSink, // TODO(blog): make it pluggable.
 84	}
 85}
 86
 87// Build is an internal only method for building the proto message out of the
 88// input event. It's made public to enable other library to reuse as much logic
 89// in TruncatingMethodLogger as possible.
 90func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
 91	m := c.toProto()
 92	timestamp := timestamppb.Now()
 93	m.Timestamp = timestamp
 94	m.CallId = ml.callID
 95	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
 96
 97	switch pay := m.Payload.(type) {
 98	case *binlogpb.GrpcLogEntry_ClientHeader:
 99		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
100	case *binlogpb.GrpcLogEntry_ServerHeader:
101		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
102	case *binlogpb.GrpcLogEntry_Message:
103		m.PayloadTruncated = ml.truncateMessage(pay.Message)
104	}
105	return m
106}
107
108// Log creates a proto binary log entry, and logs it to the sink.
109func (ml *TruncatingMethodLogger) Log(_ context.Context, c LogEntryConfig) {
110	ml.sink.Write(ml.Build(c))
111}
112
113func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
114	if ml.headerMaxLen == maxUInt {
115		return false
116	}
117	var (
118		bytesLimit = ml.headerMaxLen
119		index      int
120	)
121	// At the end of the loop, index will be the first entry where the total
122	// size is greater than the limit:
123	//
124	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
125	for ; index < len(mdPb.Entry); index++ {
126		entry := mdPb.Entry[index]
127		if entry.Key == "grpc-trace-bin" {
128			// "grpc-trace-bin" is a special key. It's kept in the log entry,
129			// but not counted towards the size limit.
130			continue
131		}
132		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
133		if currentEntryLen > bytesLimit {
134			break
135		}
136		bytesLimit -= currentEntryLen
137	}
138	truncated = index < len(mdPb.Entry)
139	mdPb.Entry = mdPb.Entry[:index]
140	return truncated
141}
142
143func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
144	if ml.messageMaxLen == maxUInt {
145		return false
146	}
147	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
148		return false
149	}
150	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
151	return true
152}
153
154// LogEntryConfig represents the configuration for binary log entry.
155//
156// This is used in the 1.0 release of gcp/observability, and thus must not be
157// deleted or changed.
158type LogEntryConfig interface {
159	toProto() *binlogpb.GrpcLogEntry
160}
161
162// ClientHeader configs the binary log entry to be a ClientHeader entry.
163type ClientHeader struct {
164	OnClientSide bool
165	Header       metadata.MD
166	MethodName   string
167	Authority    string
168	Timeout      time.Duration
169	// PeerAddr is required only when it's on server side.
170	PeerAddr net.Addr
171}
172
173func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
174	// This function doesn't need to set all the fields (e.g. seq ID). The Log
175	// function will set the fields when necessary.
176	clientHeader := &binlogpb.ClientHeader{
177		Metadata:   mdToMetadataProto(c.Header),
178		MethodName: c.MethodName,
179		Authority:  c.Authority,
180	}
181	if c.Timeout > 0 {
182		clientHeader.Timeout = durationpb.New(c.Timeout)
183	}
184	ret := &binlogpb.GrpcLogEntry{
185		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
186		Payload: &binlogpb.GrpcLogEntry_ClientHeader{
187			ClientHeader: clientHeader,
188		},
189	}
190	if c.OnClientSide {
191		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
192	} else {
193		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
194	}
195	if c.PeerAddr != nil {
196		ret.Peer = addrToProto(c.PeerAddr)
197	}
198	return ret
199}
200
201// ServerHeader configs the binary log entry to be a ServerHeader entry.
202type ServerHeader struct {
203	OnClientSide bool
204	Header       metadata.MD
205	// PeerAddr is required only when it's on client side.
206	PeerAddr net.Addr
207}
208
209func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
210	ret := &binlogpb.GrpcLogEntry{
211		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
212		Payload: &binlogpb.GrpcLogEntry_ServerHeader{
213			ServerHeader: &binlogpb.ServerHeader{
214				Metadata: mdToMetadataProto(c.Header),
215			},
216		},
217	}
218	if c.OnClientSide {
219		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
220	} else {
221		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
222	}
223	if c.PeerAddr != nil {
224		ret.Peer = addrToProto(c.PeerAddr)
225	}
226	return ret
227}
228
229// ClientMessage configs the binary log entry to be a ClientMessage entry.
230type ClientMessage struct {
231	OnClientSide bool
232	// Message can be a proto.Message or []byte. Other messages formats are not
233	// supported.
234	Message any
235}
236
237func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
238	var (
239		data []byte
240		err  error
241	)
242	if m, ok := c.Message.(proto.Message); ok {
243		data, err = proto.Marshal(m)
244		if err != nil {
245			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
246		}
247	} else if b, ok := c.Message.([]byte); ok {
248		data = b
249	} else {
250		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
251	}
252	ret := &binlogpb.GrpcLogEntry{
253		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
254		Payload: &binlogpb.GrpcLogEntry_Message{
255			Message: &binlogpb.Message{
256				Length: uint32(len(data)),
257				Data:   data,
258			},
259		},
260	}
261	if c.OnClientSide {
262		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
263	} else {
264		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
265	}
266	return ret
267}
268
269// ServerMessage configs the binary log entry to be a ServerMessage entry.
270type ServerMessage struct {
271	OnClientSide bool
272	// Message can be a proto.Message or []byte. Other messages formats are not
273	// supported.
274	Message any
275}
276
277func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
278	var (
279		data []byte
280		err  error
281	)
282	if m, ok := c.Message.(proto.Message); ok {
283		data, err = proto.Marshal(m)
284		if err != nil {
285			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
286		}
287	} else if b, ok := c.Message.([]byte); ok {
288		data = b
289	} else {
290		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
291	}
292	ret := &binlogpb.GrpcLogEntry{
293		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
294		Payload: &binlogpb.GrpcLogEntry_Message{
295			Message: &binlogpb.Message{
296				Length: uint32(len(data)),
297				Data:   data,
298			},
299		},
300	}
301	if c.OnClientSide {
302		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
303	} else {
304		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
305	}
306	return ret
307}
308
309// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
310type ClientHalfClose struct {
311	OnClientSide bool
312}
313
314func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
315	ret := &binlogpb.GrpcLogEntry{
316		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
317		Payload: nil, // No payload here.
318	}
319	if c.OnClientSide {
320		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
321	} else {
322		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
323	}
324	return ret
325}
326
327// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
328type ServerTrailer struct {
329	OnClientSide bool
330	Trailer      metadata.MD
331	// Err is the status error.
332	Err error
333	// PeerAddr is required only when it's on client side and the RPC is trailer
334	// only.
335	PeerAddr net.Addr
336}
337
338func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
339	st, ok := status.FromError(c.Err)
340	if !ok {
341		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
342	}
343	var (
344		detailsBytes []byte
345		err          error
346	)
347	stProto := st.Proto()
348	if stProto != nil && len(stProto.Details) != 0 {
349		detailsBytes, err = proto.Marshal(stProto)
350		if err != nil {
351			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
352		}
353	}
354	ret := &binlogpb.GrpcLogEntry{
355		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
356		Payload: &binlogpb.GrpcLogEntry_Trailer{
357			Trailer: &binlogpb.Trailer{
358				Metadata:      mdToMetadataProto(c.Trailer),
359				StatusCode:    uint32(st.Code()),
360				StatusMessage: st.Message(),
361				StatusDetails: detailsBytes,
362			},
363		},
364	}
365	if c.OnClientSide {
366		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
367	} else {
368		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
369	}
370	if c.PeerAddr != nil {
371		ret.Peer = addrToProto(c.PeerAddr)
372	}
373	return ret
374}
375
376// Cancel configs the binary log entry to be a Cancel entry.
377type Cancel struct {
378	OnClientSide bool
379}
380
381func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
382	ret := &binlogpb.GrpcLogEntry{
383		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
384		Payload: nil,
385	}
386	if c.OnClientSide {
387		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
388	} else {
389		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
390	}
391	return ret
392}
393
394// metadataKeyOmit returns whether the metadata entry with this key should be
395// omitted.
396func metadataKeyOmit(key string) bool {
397	switch key {
398	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
399		return true
400	case "grpc-trace-bin": // grpc-trace-bin is special because it's visible to users.
401		return false
402	}
403	return strings.HasPrefix(key, "grpc-")
404}
405
406func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
407	ret := &binlogpb.Metadata{}
408	for k, vv := range md {
409		if metadataKeyOmit(k) {
410			continue
411		}
412		for _, v := range vv {
413			ret.Entry = append(ret.Entry,
414				&binlogpb.MetadataEntry{
415					Key:   k,
416					Value: []byte(v),
417				},
418			)
419		}
420	}
421	return ret
422}
423
424func addrToProto(addr net.Addr) *binlogpb.Address {
425	ret := &binlogpb.Address{}
426	switch a := addr.(type) {
427	case *net.TCPAddr:
428		if a.IP.To4() != nil {
429			ret.Type = binlogpb.Address_TYPE_IPV4
430		} else if a.IP.To16() != nil {
431			ret.Type = binlogpb.Address_TYPE_IPV6
432		} else {
433			ret.Type = binlogpb.Address_TYPE_UNKNOWN
434			// Do not set address and port fields.
435			break
436		}
437		ret.Address = a.IP.String()
438		ret.IpPort = uint32(a.Port)
439	case *net.UnixAddr:
440		ret.Type = binlogpb.Address_TYPE_UNIX
441		ret.Address = a.String()
442	default:
443		ret.Type = binlogpb.Address_TYPE_UNKNOWN
444	}
445	return ret
446}