1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "context"
23 "net"
24 "strings"
25 "sync/atomic"
26 "time"
27
28 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31 "google.golang.org/protobuf/proto"
32 "google.golang.org/protobuf/types/known/durationpb"
33 "google.golang.org/protobuf/types/known/timestamppb"
34)
35
36type callIDGenerator struct {
37 id uint64
38}
39
40func (g *callIDGenerator) next() uint64 {
41 id := atomic.AddUint64(&g.id, 1)
42 return id
43}
44
45// reset is for testing only, and doesn't need to be thread safe.
46func (g *callIDGenerator) reset() {
47 g.id = 0
48}
49
50var idGen callIDGenerator
51
52// MethodLogger is the sub-logger for each method.
53//
54// This is used in the 1.0 release of gcp/observability, and thus must not be
55// deleted or changed.
56type MethodLogger interface {
57 Log(context.Context, LogEntryConfig)
58}
59
60// TruncatingMethodLogger is a method logger that truncates headers and messages
61// based on configured fields.
62type TruncatingMethodLogger struct {
63 headerMaxLen, messageMaxLen uint64
64
65 callID uint64
66 idWithinCallGen *callIDGenerator
67
68 sink Sink // TODO(blog): make this pluggable.
69}
70
71// NewTruncatingMethodLogger returns a new truncating method logger.
72//
73// This is used in the 1.0 release of gcp/observability, and thus must not be
74// deleted or changed.
75func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
76 return &TruncatingMethodLogger{
77 headerMaxLen: h,
78 messageMaxLen: m,
79
80 callID: idGen.next(),
81 idWithinCallGen: &callIDGenerator{},
82
83 sink: DefaultSink, // TODO(blog): make it pluggable.
84 }
85}
86
87// Build is an internal only method for building the proto message out of the
88// input event. It's made public to enable other library to reuse as much logic
89// in TruncatingMethodLogger as possible.
90func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
91 m := c.toProto()
92 timestamp := timestamppb.Now()
93 m.Timestamp = timestamp
94 m.CallId = ml.callID
95 m.SequenceIdWithinCall = ml.idWithinCallGen.next()
96
97 switch pay := m.Payload.(type) {
98 case *binlogpb.GrpcLogEntry_ClientHeader:
99 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
100 case *binlogpb.GrpcLogEntry_ServerHeader:
101 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
102 case *binlogpb.GrpcLogEntry_Message:
103 m.PayloadTruncated = ml.truncateMessage(pay.Message)
104 }
105 return m
106}
107
108// Log creates a proto binary log entry, and logs it to the sink.
109func (ml *TruncatingMethodLogger) Log(_ context.Context, c LogEntryConfig) {
110 ml.sink.Write(ml.Build(c))
111}
112
113func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
114 if ml.headerMaxLen == maxUInt {
115 return false
116 }
117 var (
118 bytesLimit = ml.headerMaxLen
119 index int
120 )
121 // At the end of the loop, index will be the first entry where the total
122 // size is greater than the limit:
123 //
124 // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
125 for ; index < len(mdPb.Entry); index++ {
126 entry := mdPb.Entry[index]
127 if entry.Key == "grpc-trace-bin" {
128 // "grpc-trace-bin" is a special key. It's kept in the log entry,
129 // but not counted towards the size limit.
130 continue
131 }
132 currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
133 if currentEntryLen > bytesLimit {
134 break
135 }
136 bytesLimit -= currentEntryLen
137 }
138 truncated = index < len(mdPb.Entry)
139 mdPb.Entry = mdPb.Entry[:index]
140 return truncated
141}
142
143func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
144 if ml.messageMaxLen == maxUInt {
145 return false
146 }
147 if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
148 return false
149 }
150 msgPb.Data = msgPb.Data[:ml.messageMaxLen]
151 return true
152}
153
154// LogEntryConfig represents the configuration for binary log entry.
155//
156// This is used in the 1.0 release of gcp/observability, and thus must not be
157// deleted or changed.
158type LogEntryConfig interface {
159 toProto() *binlogpb.GrpcLogEntry
160}
161
162// ClientHeader configs the binary log entry to be a ClientHeader entry.
163type ClientHeader struct {
164 OnClientSide bool
165 Header metadata.MD
166 MethodName string
167 Authority string
168 Timeout time.Duration
169 // PeerAddr is required only when it's on server side.
170 PeerAddr net.Addr
171}
172
173func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
174 // This function doesn't need to set all the fields (e.g. seq ID). The Log
175 // function will set the fields when necessary.
176 clientHeader := &binlogpb.ClientHeader{
177 Metadata: mdToMetadataProto(c.Header),
178 MethodName: c.MethodName,
179 Authority: c.Authority,
180 }
181 if c.Timeout > 0 {
182 clientHeader.Timeout = durationpb.New(c.Timeout)
183 }
184 ret := &binlogpb.GrpcLogEntry{
185 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
186 Payload: &binlogpb.GrpcLogEntry_ClientHeader{
187 ClientHeader: clientHeader,
188 },
189 }
190 if c.OnClientSide {
191 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
192 } else {
193 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
194 }
195 if c.PeerAddr != nil {
196 ret.Peer = addrToProto(c.PeerAddr)
197 }
198 return ret
199}
200
201// ServerHeader configs the binary log entry to be a ServerHeader entry.
202type ServerHeader struct {
203 OnClientSide bool
204 Header metadata.MD
205 // PeerAddr is required only when it's on client side.
206 PeerAddr net.Addr
207}
208
209func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
210 ret := &binlogpb.GrpcLogEntry{
211 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
212 Payload: &binlogpb.GrpcLogEntry_ServerHeader{
213 ServerHeader: &binlogpb.ServerHeader{
214 Metadata: mdToMetadataProto(c.Header),
215 },
216 },
217 }
218 if c.OnClientSide {
219 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
220 } else {
221 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
222 }
223 if c.PeerAddr != nil {
224 ret.Peer = addrToProto(c.PeerAddr)
225 }
226 return ret
227}
228
229// ClientMessage configs the binary log entry to be a ClientMessage entry.
230type ClientMessage struct {
231 OnClientSide bool
232 // Message can be a proto.Message or []byte. Other messages formats are not
233 // supported.
234 Message any
235}
236
237func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
238 var (
239 data []byte
240 err error
241 )
242 if m, ok := c.Message.(proto.Message); ok {
243 data, err = proto.Marshal(m)
244 if err != nil {
245 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
246 }
247 } else if b, ok := c.Message.([]byte); ok {
248 data = b
249 } else {
250 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
251 }
252 ret := &binlogpb.GrpcLogEntry{
253 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
254 Payload: &binlogpb.GrpcLogEntry_Message{
255 Message: &binlogpb.Message{
256 Length: uint32(len(data)),
257 Data: data,
258 },
259 },
260 }
261 if c.OnClientSide {
262 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
263 } else {
264 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
265 }
266 return ret
267}
268
269// ServerMessage configs the binary log entry to be a ServerMessage entry.
270type ServerMessage struct {
271 OnClientSide bool
272 // Message can be a proto.Message or []byte. Other messages formats are not
273 // supported.
274 Message any
275}
276
277func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
278 var (
279 data []byte
280 err error
281 )
282 if m, ok := c.Message.(proto.Message); ok {
283 data, err = proto.Marshal(m)
284 if err != nil {
285 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
286 }
287 } else if b, ok := c.Message.([]byte); ok {
288 data = b
289 } else {
290 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
291 }
292 ret := &binlogpb.GrpcLogEntry{
293 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
294 Payload: &binlogpb.GrpcLogEntry_Message{
295 Message: &binlogpb.Message{
296 Length: uint32(len(data)),
297 Data: data,
298 },
299 },
300 }
301 if c.OnClientSide {
302 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
303 } else {
304 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
305 }
306 return ret
307}
308
309// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
310type ClientHalfClose struct {
311 OnClientSide bool
312}
313
314func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
315 ret := &binlogpb.GrpcLogEntry{
316 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
317 Payload: nil, // No payload here.
318 }
319 if c.OnClientSide {
320 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
321 } else {
322 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
323 }
324 return ret
325}
326
327// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
328type ServerTrailer struct {
329 OnClientSide bool
330 Trailer metadata.MD
331 // Err is the status error.
332 Err error
333 // PeerAddr is required only when it's on client side and the RPC is trailer
334 // only.
335 PeerAddr net.Addr
336}
337
338func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
339 st, ok := status.FromError(c.Err)
340 if !ok {
341 grpclogLogger.Info("binarylogging: error in trailer is not a status error")
342 }
343 var (
344 detailsBytes []byte
345 err error
346 )
347 stProto := st.Proto()
348 if stProto != nil && len(stProto.Details) != 0 {
349 detailsBytes, err = proto.Marshal(stProto)
350 if err != nil {
351 grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
352 }
353 }
354 ret := &binlogpb.GrpcLogEntry{
355 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
356 Payload: &binlogpb.GrpcLogEntry_Trailer{
357 Trailer: &binlogpb.Trailer{
358 Metadata: mdToMetadataProto(c.Trailer),
359 StatusCode: uint32(st.Code()),
360 StatusMessage: st.Message(),
361 StatusDetails: detailsBytes,
362 },
363 },
364 }
365 if c.OnClientSide {
366 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
367 } else {
368 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
369 }
370 if c.PeerAddr != nil {
371 ret.Peer = addrToProto(c.PeerAddr)
372 }
373 return ret
374}
375
376// Cancel configs the binary log entry to be a Cancel entry.
377type Cancel struct {
378 OnClientSide bool
379}
380
381func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
382 ret := &binlogpb.GrpcLogEntry{
383 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
384 Payload: nil,
385 }
386 if c.OnClientSide {
387 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
388 } else {
389 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
390 }
391 return ret
392}
393
394// metadataKeyOmit returns whether the metadata entry with this key should be
395// omitted.
396func metadataKeyOmit(key string) bool {
397 switch key {
398 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
399 return true
400 case "grpc-trace-bin": // grpc-trace-bin is special because it's visible to users.
401 return false
402 }
403 return strings.HasPrefix(key, "grpc-")
404}
405
406func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
407 ret := &binlogpb.Metadata{}
408 for k, vv := range md {
409 if metadataKeyOmit(k) {
410 continue
411 }
412 for _, v := range vv {
413 ret.Entry = append(ret.Entry,
414 &binlogpb.MetadataEntry{
415 Key: k,
416 Value: []byte(v),
417 },
418 )
419 }
420 }
421 return ret
422}
423
424func addrToProto(addr net.Addr) *binlogpb.Address {
425 ret := &binlogpb.Address{}
426 switch a := addr.(type) {
427 case *net.TCPAddr:
428 if a.IP.To4() != nil {
429 ret.Type = binlogpb.Address_TYPE_IPV4
430 } else if a.IP.To16() != nil {
431 ret.Type = binlogpb.Address_TYPE_IPV6
432 } else {
433 ret.Type = binlogpb.Address_TYPE_UNKNOWN
434 // Do not set address and port fields.
435 break
436 }
437 ret.Address = a.IP.String()
438 ret.IpPort = uint32(a.Port)
439 case *net.UnixAddr:
440 ret.Type = binlogpb.Address_TYPE_UNIX
441 ret.Address = a.String()
442 default:
443 ret.Type = binlogpb.Address_TYPE_UNKNOWN
444 }
445 return ret
446}