tail.go

  1// Copyright (c) 2019 FOSS contributors of https://github.com/nxadm/tail
  2// Copyright (c) 2015 HPE Software Inc. All rights reserved.
  3// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
  4
  5//nxadm/tail provides a Go library that emulates the features of the BSD `tail`
  6//program. The library comes with full support for truncation/move detection as
  7//it is designed to work with log rotation tools. The library works on all
  8//operating systems supported by Go, including POSIX systems like Linux and
  9//*BSD, and MS Windows. Go 1.9 is the oldest compiler release supported.
 10package tail
 11
 12import (
 13	"bufio"
 14	"errors"
 15	"fmt"
 16	"io"
 17	"io/ioutil"
 18	"log"
 19	"os"
 20	"strings"
 21	"sync"
 22	"time"
 23
 24	"github.com/nxadm/tail/ratelimiter"
 25	"github.com/nxadm/tail/util"
 26	"github.com/nxadm/tail/watch"
 27	"gopkg.in/tomb.v1"
 28)
 29
 30var (
 31	// ErrStop is returned when the tail of a file has been marked to be stopped.
 32	ErrStop = errors.New("tail should now stop")
 33)
 34
 35type Line struct {
 36	Text     string    // The contents of the file
 37	Num      int       // The line number
 38	SeekInfo SeekInfo  // SeekInfo
 39	Time     time.Time // Present time
 40	Err      error     // Error from tail
 41}
 42
 43// Deprecated: this function is no longer used internally and it has little of no
 44// use in the API. As such, it will be removed from the API in a future major
 45// release.
 46//
 47// NewLine returns a * pointer to a Line struct.
 48func NewLine(text string, lineNum int) *Line {
 49	return &Line{text, lineNum, SeekInfo{}, time.Now(), nil}
 50}
 51
 52// SeekInfo represents arguments to io.Seek. See: https://golang.org/pkg/io/#SectionReader.Seek
 53type SeekInfo struct {
 54	Offset int64
 55	Whence int
 56}
 57
 58type logger interface {
 59	Fatal(v ...interface{})
 60	Fatalf(format string, v ...interface{})
 61	Fatalln(v ...interface{})
 62	Panic(v ...interface{})
 63	Panicf(format string, v ...interface{})
 64	Panicln(v ...interface{})
 65	Print(v ...interface{})
 66	Printf(format string, v ...interface{})
 67	Println(v ...interface{})
 68}
 69
 70// Config is used to specify how a file must be tailed.
 71type Config struct {
 72	// File-specifc
 73	Location  *SeekInfo // Tail from this location. If nil, start at the beginning of the file
 74	ReOpen    bool      // Reopen recreated files (tail -F)
 75	MustExist bool      // Fail early if the file does not exist
 76	Poll      bool      // Poll for file changes instead of using the default inotify
 77	Pipe      bool      // The file is a named pipe (mkfifo)
 78
 79	// Generic IO
 80	Follow        bool // Continue looking for new lines (tail -f)
 81	MaxLineSize   int  // If non-zero, split longer lines into multiple lines
 82	CompleteLines bool // Only return complete lines (that end with "\n" or EOF when Follow is false)
 83
 84	// Optionally, use a ratelimiter (e.g. created by the ratelimiter/NewLeakyBucket function)
 85	RateLimiter *ratelimiter.LeakyBucket
 86
 87	// Optionally use a Logger. When nil, the Logger is set to tail.DefaultLogger.
 88	// To disable logging, set it to tail.DiscardingLogger
 89	Logger logger
 90}
 91
 92type Tail struct {
 93	Filename string     // The filename
 94	Lines    chan *Line // A consumable channel of *Line
 95	Config              // Tail.Configuration
 96
 97	file    *os.File
 98	reader  *bufio.Reader
 99	lineNum int
100
101	lineBuf *strings.Builder
102
103	watcher watch.FileWatcher
104	changes *watch.FileChanges
105
106	tomb.Tomb // provides: Done, Kill, Dying
107
108	lk sync.Mutex
109}
110
111var (
112	// DefaultLogger logs to os.Stderr and it is used when Config.Logger == nil
113	DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
114	// DiscardingLogger can be used to disable logging output
115	DiscardingLogger = log.New(ioutil.Discard, "", 0)
116)
117
118// TailFile begins tailing the file. And returns a pointer to a Tail struct
119// and an error. An output stream is made available via the Tail.Lines
120// channel (e.g. to be looped and printed). To handle errors during tailing,
121// after finishing reading from the Lines channel, invoke the `Wait` or `Err`
122// method on the returned *Tail.
123func TailFile(filename string, config Config) (*Tail, error) {
124	if config.ReOpen && !config.Follow {
125		util.Fatal("cannot set ReOpen without Follow.")
126	}
127
128	t := &Tail{
129		Filename: filename,
130		Lines:    make(chan *Line),
131		Config:   config,
132	}
133
134	if config.CompleteLines {
135		t.lineBuf = new(strings.Builder)
136	}
137
138	// when Logger was not specified in config, use default logger
139	if t.Logger == nil {
140		t.Logger = DefaultLogger
141	}
142
143	if t.Poll {
144		t.watcher = watch.NewPollingFileWatcher(filename)
145	} else {
146		t.watcher = watch.NewInotifyFileWatcher(filename)
147	}
148
149	if t.MustExist {
150		var err error
151		t.file, err = OpenFile(t.Filename)
152		if err != nil {
153			return nil, err
154		}
155	}
156
157	go t.tailFileSync()
158
159	return t, nil
160}
161
162// Tell returns the file's current position, like stdio's ftell() and an error.
163// Beware that this value may not be completely accurate because one line from
164// the chan(tail.Lines) may have been read already.
165func (tail *Tail) Tell() (offset int64, err error) {
166	if tail.file == nil {
167		return
168	}
169	offset, err = tail.file.Seek(0, io.SeekCurrent)
170	if err != nil {
171		return
172	}
173
174	tail.lk.Lock()
175	defer tail.lk.Unlock()
176	if tail.reader == nil {
177		return
178	}
179
180	offset -= int64(tail.reader.Buffered())
181	return
182}
183
184// Stop stops the tailing activity.
185func (tail *Tail) Stop() error {
186	tail.Kill(nil)
187	return tail.Wait()
188}
189
190// StopAtEOF stops tailing as soon as the end of the file is reached. The function
191// returns an error,
192func (tail *Tail) StopAtEOF() error {
193	tail.Kill(errStopAtEOF)
194	return tail.Wait()
195}
196
197var errStopAtEOF = errors.New("tail: stop at eof")
198
199func (tail *Tail) close() {
200	close(tail.Lines)
201	tail.closeFile()
202}
203
204func (tail *Tail) closeFile() {
205	if tail.file != nil {
206		tail.file.Close()
207		tail.file = nil
208	}
209}
210
211func (tail *Tail) reopen() error {
212	if tail.lineBuf != nil {
213		tail.lineBuf.Reset()
214	}
215	tail.closeFile()
216	tail.lineNum = 0
217	for {
218		var err error
219		tail.file, err = OpenFile(tail.Filename)
220		if err != nil {
221			if os.IsNotExist(err) {
222				tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
223				if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
224					if err == tomb.ErrDying {
225						return err
226					}
227					return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
228				}
229				continue
230			}
231			return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
232		}
233		break
234	}
235	return nil
236}
237
238func (tail *Tail) readLine() (string, error) {
239	tail.lk.Lock()
240	line, err := tail.reader.ReadString('\n')
241	tail.lk.Unlock()
242
243	newlineEnding := strings.HasSuffix(line, "\n")
244	line = strings.TrimRight(line, "\n")
245
246	// if we don't have to handle incomplete lines, we can return the line as-is
247	if !tail.Config.CompleteLines {
248		// Note ReadString "returns the data read before the error" in
249		// case of an error, including EOF, so we return it as is. The
250		// caller is expected to process it if err is EOF.
251		return line, err
252	}
253
254	if _, err := tail.lineBuf.WriteString(line); err != nil {
255		return line, err
256	}
257
258	if newlineEnding {
259		line = tail.lineBuf.String()
260		tail.lineBuf.Reset()
261		return line, nil
262	} else {
263		if tail.Config.Follow {
264			line = ""
265		}
266		return line, io.EOF
267	}
268}
269
270func (tail *Tail) tailFileSync() {
271	defer tail.Done()
272	defer tail.close()
273
274	if !tail.MustExist {
275		// deferred first open.
276		err := tail.reopen()
277		if err != nil {
278			if err != tomb.ErrDying {
279				tail.Kill(err)
280			}
281			return
282		}
283	}
284
285	// Seek to requested location on first open of the file.
286	if tail.Location != nil {
287		_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
288		if err != nil {
289			tail.Killf("Seek error on %s: %s", tail.Filename, err)
290			return
291		}
292	}
293
294	tail.openReader()
295
296	// Read line by line.
297	for {
298		// do not seek in named pipes
299		if !tail.Pipe {
300			// grab the position in case we need to back up in the event of a half-line
301			if _, err := tail.Tell(); err != nil {
302				tail.Kill(err)
303				return
304			}
305		}
306
307		line, err := tail.readLine()
308
309		// Process `line` even if err is EOF.
310		if err == nil {
311			cooloff := !tail.sendLine(line)
312			if cooloff {
313				// Wait a second before seeking till the end of
314				// file when rate limit is reached.
315				msg := ("Too much log activity; waiting a second before resuming tailing")
316				offset, _ := tail.Tell()
317				tail.Lines <- &Line{msg, tail.lineNum, SeekInfo{Offset: offset}, time.Now(), errors.New(msg)}
318				select {
319				case <-time.After(time.Second):
320				case <-tail.Dying():
321					return
322				}
323				if err := tail.seekEnd(); err != nil {
324					tail.Kill(err)
325					return
326				}
327			}
328		} else if err == io.EOF {
329			if !tail.Follow {
330				if line != "" {
331					tail.sendLine(line)
332				}
333				return
334			}
335
336			if tail.Follow && line != "" {
337				tail.sendLine(line)
338				if err := tail.seekEnd(); err != nil {
339					tail.Kill(err)
340					return
341				}
342			}
343
344			// When EOF is reached, wait for more data to become
345			// available. Wait strategy is based on the `tail.watcher`
346			// implementation (inotify or polling).
347			err := tail.waitForChanges()
348			if err != nil {
349				if err != ErrStop {
350					tail.Kill(err)
351				}
352				return
353			}
354		} else {
355			// non-EOF error
356			tail.Killf("Error reading %s: %s", tail.Filename, err)
357			return
358		}
359
360		select {
361		case <-tail.Dying():
362			if tail.Err() == errStopAtEOF {
363				continue
364			}
365			return
366		default:
367		}
368	}
369}
370
371// waitForChanges waits until the file has been appended, deleted,
372// moved or truncated. When moved or deleted - the file will be
373// reopened if ReOpen is true. Truncated files are always reopened.
374func (tail *Tail) waitForChanges() error {
375	if tail.changes == nil {
376		pos, err := tail.file.Seek(0, io.SeekCurrent)
377		if err != nil {
378			return err
379		}
380		tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
381		if err != nil {
382			return err
383		}
384	}
385
386	select {
387	case <-tail.changes.Modified:
388		return nil
389	case <-tail.changes.Deleted:
390		tail.changes = nil
391		if tail.ReOpen {
392			// XXX: we must not log from a library.
393			tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
394			if err := tail.reopen(); err != nil {
395				return err
396			}
397			tail.Logger.Printf("Successfully reopened %s", tail.Filename)
398			tail.openReader()
399			return nil
400		}
401		tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
402		return ErrStop
403	case <-tail.changes.Truncated:
404		// Always reopen truncated files (Follow is true)
405		tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
406		if err := tail.reopen(); err != nil {
407			return err
408		}
409		tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
410		tail.openReader()
411		return nil
412	case <-tail.Dying():
413		return ErrStop
414	}
415}
416
417func (tail *Tail) openReader() {
418	tail.lk.Lock()
419	if tail.MaxLineSize > 0 {
420		// add 2 to account for newline characters
421		tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
422	} else {
423		tail.reader = bufio.NewReader(tail.file)
424	}
425	tail.lk.Unlock()
426}
427
428func (tail *Tail) seekEnd() error {
429	return tail.seekTo(SeekInfo{Offset: 0, Whence: io.SeekEnd})
430}
431
432func (tail *Tail) seekTo(pos SeekInfo) error {
433	_, err := tail.file.Seek(pos.Offset, pos.Whence)
434	if err != nil {
435		return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
436	}
437	// Reset the read buffer whenever the file is re-seek'ed
438	tail.reader.Reset(tail.file)
439	return nil
440}
441
442// sendLine sends the line(s) to Lines channel, splitting longer lines
443// if necessary. Return false if rate limit is reached.
444func (tail *Tail) sendLine(line string) bool {
445	now := time.Now()
446	lines := []string{line}
447
448	// Split longer lines
449	if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
450		lines = util.PartitionString(line, tail.MaxLineSize)
451	}
452
453	for _, line := range lines {
454		tail.lineNum++
455		offset, _ := tail.Tell()
456		select {
457		case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
458		case <-tail.Dying():
459			return true
460		}
461	}
462
463	if tail.Config.RateLimiter != nil {
464		ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
465		if !ok {
466			tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.",
467				tail.Filename)
468			return false
469		}
470	}
471
472	return true
473}
474
475// Cleanup removes inotify watches added by the tail package. This function is
476// meant to be invoked from a process's exit handler. Linux kernel may not
477// automatically remove inotify watches after the process exits.
478// If you plan to re-read a file, don't call Cleanup in between.
479func (tail *Tail) Cleanup() {
480	watch.Cleanup(tail.Filename)
481}