sink.go

  1/*
  2 *
  3 * Copyright 2018 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package binarylog
 20
 21import (
 22	"bufio"
 23	"encoding/binary"
 24	"io"
 25	"sync"
 26	"time"
 27
 28	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
 29	"google.golang.org/protobuf/proto"
 30)
 31
 32var (
 33	// DefaultSink is the sink where the logs will be written to. It's exported
 34	// for the binarylog package to update.
 35	DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
 36)
 37
 38// Sink writes log entry into the binary log sink.
 39//
 40// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
 41type Sink interface {
 42	// Write will be called to write the log entry into the sink.
 43	//
 44	// It should be thread-safe so it can be called in parallel.
 45	Write(*binlogpb.GrpcLogEntry) error
 46	// Close will be called when the Sink is replaced by a new Sink.
 47	Close() error
 48}
 49
 50type noopSink struct{}
 51
 52func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
 53func (ns *noopSink) Close() error                       { return nil }
 54
 55// newWriterSink creates a binary log sink with the given writer.
 56//
 57// Write() marshals the proto message and writes it to the given writer. Each
 58// message is prefixed with a 4 byte big endian unsigned integer as the length.
 59//
 60// No buffer is done, Close() doesn't try to close the writer.
 61func newWriterSink(w io.Writer) Sink {
 62	return &writerSink{out: w}
 63}
 64
 65type writerSink struct {
 66	out io.Writer
 67}
 68
 69func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
 70	b, err := proto.Marshal(e)
 71	if err != nil {
 72		grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
 73		return err
 74	}
 75	hdr := make([]byte, 4)
 76	binary.BigEndian.PutUint32(hdr, uint32(len(b)))
 77	if _, err := ws.out.Write(hdr); err != nil {
 78		return err
 79	}
 80	if _, err := ws.out.Write(b); err != nil {
 81		return err
 82	}
 83	return nil
 84}
 85
 86func (ws *writerSink) Close() error { return nil }
 87
 88type bufferedSink struct {
 89	mu             sync.Mutex
 90	closer         io.Closer
 91	out            Sink          // out is built on buf.
 92	buf            *bufio.Writer // buf is kept for flush.
 93	flusherStarted bool
 94
 95	writeTicker *time.Ticker
 96	done        chan struct{}
 97}
 98
 99func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
100	fs.mu.Lock()
101	defer fs.mu.Unlock()
102	if !fs.flusherStarted {
103		// Start the write loop when Write is called.
104		fs.startFlushGoroutine()
105		fs.flusherStarted = true
106	}
107	if err := fs.out.Write(e); err != nil {
108		return err
109	}
110	return nil
111}
112
113const (
114	bufFlushDuration = 60 * time.Second
115)
116
117func (fs *bufferedSink) startFlushGoroutine() {
118	fs.writeTicker = time.NewTicker(bufFlushDuration)
119	go func() {
120		for {
121			select {
122			case <-fs.done:
123				return
124			case <-fs.writeTicker.C:
125			}
126			fs.mu.Lock()
127			if err := fs.buf.Flush(); err != nil {
128				grpclogLogger.Warningf("failed to flush to Sink: %v", err)
129			}
130			fs.mu.Unlock()
131		}
132	}()
133}
134
135func (fs *bufferedSink) Close() error {
136	fs.mu.Lock()
137	defer fs.mu.Unlock()
138	if fs.writeTicker != nil {
139		fs.writeTicker.Stop()
140	}
141	close(fs.done)
142	if err := fs.buf.Flush(); err != nil {
143		grpclogLogger.Warningf("failed to flush to Sink: %v", err)
144	}
145	if err := fs.closer.Close(); err != nil {
146		grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
147	}
148	if err := fs.out.Close(); err != nil {
149		grpclogLogger.Warningf("failed to close the Sink: %v", err)
150	}
151	return nil
152}
153
154// NewBufferedSink creates a binary log sink with the given WriteCloser.
155//
156// Write() marshals the proto message and writes it to the given writer. Each
157// message is prefixed with a 4 byte big endian unsigned integer as the length.
158//
159// Content is kept in a buffer, and is flushed every 60 seconds.
160//
161// Close closes the WriteCloser.
162func NewBufferedSink(o io.WriteCloser) Sink {
163	bufW := bufio.NewWriter(o)
164	return &bufferedSink{
165		closer: o,
166		out:    newWriterSink(bufW),
167		buf:    bufW,
168		done:   make(chan struct{}),
169	}
170}