1// Copyright (c) 2019 FOSS contributors of https://github.com/nxadm/tail
2// Copyright (c) 2015 HPE Software Inc. All rights reserved.
3// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
4
5//nxadm/tail provides a Go library that emulates the features of the BSD `tail`
6//program. The library comes with full support for truncation/move detection as
7//it is designed to work with log rotation tools. The library works on all
8//operating systems supported by Go, including POSIX systems like Linux and
9//*BSD, and MS Windows. Go 1.9 is the oldest compiler release supported.
10package tail
11
12import (
13 "bufio"
14 "errors"
15 "fmt"
16 "io"
17 "io/ioutil"
18 "log"
19 "os"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/nxadm/tail/ratelimiter"
25 "github.com/nxadm/tail/util"
26 "github.com/nxadm/tail/watch"
27 "gopkg.in/tomb.v1"
28)
29
30var (
31 // ErrStop is returned when the tail of a file has been marked to be stopped.
32 ErrStop = errors.New("tail should now stop")
33)
34
35type Line struct {
36 Text string // The contents of the file
37 Num int // The line number
38 SeekInfo SeekInfo // SeekInfo
39 Time time.Time // Present time
40 Err error // Error from tail
41}
42
43// Deprecated: this function is no longer used internally and it has little of no
44// use in the API. As such, it will be removed from the API in a future major
45// release.
46//
47// NewLine returns a * pointer to a Line struct.
48func NewLine(text string, lineNum int) *Line {
49 return &Line{text, lineNum, SeekInfo{}, time.Now(), nil}
50}
51
52// SeekInfo represents arguments to io.Seek. See: https://golang.org/pkg/io/#SectionReader.Seek
53type SeekInfo struct {
54 Offset int64
55 Whence int
56}
57
58type logger interface {
59 Fatal(v ...interface{})
60 Fatalf(format string, v ...interface{})
61 Fatalln(v ...interface{})
62 Panic(v ...interface{})
63 Panicf(format string, v ...interface{})
64 Panicln(v ...interface{})
65 Print(v ...interface{})
66 Printf(format string, v ...interface{})
67 Println(v ...interface{})
68}
69
70// Config is used to specify how a file must be tailed.
71type Config struct {
72 // File-specifc
73 Location *SeekInfo // Tail from this location. If nil, start at the beginning of the file
74 ReOpen bool // Reopen recreated files (tail -F)
75 MustExist bool // Fail early if the file does not exist
76 Poll bool // Poll for file changes instead of using the default inotify
77 Pipe bool // The file is a named pipe (mkfifo)
78
79 // Generic IO
80 Follow bool // Continue looking for new lines (tail -f)
81 MaxLineSize int // If non-zero, split longer lines into multiple lines
82 CompleteLines bool // Only return complete lines (that end with "\n" or EOF when Follow is false)
83
84 // Optionally, use a ratelimiter (e.g. created by the ratelimiter/NewLeakyBucket function)
85 RateLimiter *ratelimiter.LeakyBucket
86
87 // Optionally use a Logger. When nil, the Logger is set to tail.DefaultLogger.
88 // To disable logging, set it to tail.DiscardingLogger
89 Logger logger
90}
91
92type Tail struct {
93 Filename string // The filename
94 Lines chan *Line // A consumable channel of *Line
95 Config // Tail.Configuration
96
97 file *os.File
98 reader *bufio.Reader
99 lineNum int
100
101 lineBuf *strings.Builder
102
103 watcher watch.FileWatcher
104 changes *watch.FileChanges
105
106 tomb.Tomb // provides: Done, Kill, Dying
107
108 lk sync.Mutex
109}
110
111var (
112 // DefaultLogger logs to os.Stderr and it is used when Config.Logger == nil
113 DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
114 // DiscardingLogger can be used to disable logging output
115 DiscardingLogger = log.New(ioutil.Discard, "", 0)
116)
117
118// TailFile begins tailing the file. And returns a pointer to a Tail struct
119// and an error. An output stream is made available via the Tail.Lines
120// channel (e.g. to be looped and printed). To handle errors during tailing,
121// after finishing reading from the Lines channel, invoke the `Wait` or `Err`
122// method on the returned *Tail.
123func TailFile(filename string, config Config) (*Tail, error) {
124 if config.ReOpen && !config.Follow {
125 util.Fatal("cannot set ReOpen without Follow.")
126 }
127
128 t := &Tail{
129 Filename: filename,
130 Lines: make(chan *Line),
131 Config: config,
132 }
133
134 if config.CompleteLines {
135 t.lineBuf = new(strings.Builder)
136 }
137
138 // when Logger was not specified in config, use default logger
139 if t.Logger == nil {
140 t.Logger = DefaultLogger
141 }
142
143 if t.Poll {
144 t.watcher = watch.NewPollingFileWatcher(filename)
145 } else {
146 t.watcher = watch.NewInotifyFileWatcher(filename)
147 }
148
149 if t.MustExist {
150 var err error
151 t.file, err = OpenFile(t.Filename)
152 if err != nil {
153 return nil, err
154 }
155 }
156
157 go t.tailFileSync()
158
159 return t, nil
160}
161
162// Tell returns the file's current position, like stdio's ftell() and an error.
163// Beware that this value may not be completely accurate because one line from
164// the chan(tail.Lines) may have been read already.
165func (tail *Tail) Tell() (offset int64, err error) {
166 if tail.file == nil {
167 return
168 }
169 offset, err = tail.file.Seek(0, io.SeekCurrent)
170 if err != nil {
171 return
172 }
173
174 tail.lk.Lock()
175 defer tail.lk.Unlock()
176 if tail.reader == nil {
177 return
178 }
179
180 offset -= int64(tail.reader.Buffered())
181 return
182}
183
184// Stop stops the tailing activity.
185func (tail *Tail) Stop() error {
186 tail.Kill(nil)
187 return tail.Wait()
188}
189
190// StopAtEOF stops tailing as soon as the end of the file is reached. The function
191// returns an error,
192func (tail *Tail) StopAtEOF() error {
193 tail.Kill(errStopAtEOF)
194 return tail.Wait()
195}
196
197var errStopAtEOF = errors.New("tail: stop at eof")
198
199func (tail *Tail) close() {
200 close(tail.Lines)
201 tail.closeFile()
202}
203
204func (tail *Tail) closeFile() {
205 if tail.file != nil {
206 tail.file.Close()
207 tail.file = nil
208 }
209}
210
211func (tail *Tail) reopen() error {
212 if tail.lineBuf != nil {
213 tail.lineBuf.Reset()
214 }
215 tail.closeFile()
216 tail.lineNum = 0
217 for {
218 var err error
219 tail.file, err = OpenFile(tail.Filename)
220 if err != nil {
221 if os.IsNotExist(err) {
222 tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
223 if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
224 if err == tomb.ErrDying {
225 return err
226 }
227 return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
228 }
229 continue
230 }
231 return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
232 }
233 break
234 }
235 return nil
236}
237
238func (tail *Tail) readLine() (string, error) {
239 tail.lk.Lock()
240 line, err := tail.reader.ReadString('\n')
241 tail.lk.Unlock()
242
243 newlineEnding := strings.HasSuffix(line, "\n")
244 line = strings.TrimRight(line, "\n")
245
246 // if we don't have to handle incomplete lines, we can return the line as-is
247 if !tail.Config.CompleteLines {
248 // Note ReadString "returns the data read before the error" in
249 // case of an error, including EOF, so we return it as is. The
250 // caller is expected to process it if err is EOF.
251 return line, err
252 }
253
254 if _, err := tail.lineBuf.WriteString(line); err != nil {
255 return line, err
256 }
257
258 if newlineEnding {
259 line = tail.lineBuf.String()
260 tail.lineBuf.Reset()
261 return line, nil
262 } else {
263 if tail.Config.Follow {
264 line = ""
265 }
266 return line, io.EOF
267 }
268}
269
270func (tail *Tail) tailFileSync() {
271 defer tail.Done()
272 defer tail.close()
273
274 if !tail.MustExist {
275 // deferred first open.
276 err := tail.reopen()
277 if err != nil {
278 if err != tomb.ErrDying {
279 tail.Kill(err)
280 }
281 return
282 }
283 }
284
285 // Seek to requested location on first open of the file.
286 if tail.Location != nil {
287 _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
288 if err != nil {
289 tail.Killf("Seek error on %s: %s", tail.Filename, err)
290 return
291 }
292 }
293
294 tail.openReader()
295
296 // Read line by line.
297 for {
298 // do not seek in named pipes
299 if !tail.Pipe {
300 // grab the position in case we need to back up in the event of a half-line
301 if _, err := tail.Tell(); err != nil {
302 tail.Kill(err)
303 return
304 }
305 }
306
307 line, err := tail.readLine()
308
309 // Process `line` even if err is EOF.
310 if err == nil {
311 cooloff := !tail.sendLine(line)
312 if cooloff {
313 // Wait a second before seeking till the end of
314 // file when rate limit is reached.
315 msg := ("Too much log activity; waiting a second before resuming tailing")
316 offset, _ := tail.Tell()
317 tail.Lines <- &Line{msg, tail.lineNum, SeekInfo{Offset: offset}, time.Now(), errors.New(msg)}
318 select {
319 case <-time.After(time.Second):
320 case <-tail.Dying():
321 return
322 }
323 if err := tail.seekEnd(); err != nil {
324 tail.Kill(err)
325 return
326 }
327 }
328 } else if err == io.EOF {
329 if !tail.Follow {
330 if line != "" {
331 tail.sendLine(line)
332 }
333 return
334 }
335
336 if tail.Follow && line != "" {
337 tail.sendLine(line)
338 if err := tail.seekEnd(); err != nil {
339 tail.Kill(err)
340 return
341 }
342 }
343
344 // When EOF is reached, wait for more data to become
345 // available. Wait strategy is based on the `tail.watcher`
346 // implementation (inotify or polling).
347 err := tail.waitForChanges()
348 if err != nil {
349 if err != ErrStop {
350 tail.Kill(err)
351 }
352 return
353 }
354 } else {
355 // non-EOF error
356 tail.Killf("Error reading %s: %s", tail.Filename, err)
357 return
358 }
359
360 select {
361 case <-tail.Dying():
362 if tail.Err() == errStopAtEOF {
363 continue
364 }
365 return
366 default:
367 }
368 }
369}
370
371// waitForChanges waits until the file has been appended, deleted,
372// moved or truncated. When moved or deleted - the file will be
373// reopened if ReOpen is true. Truncated files are always reopened.
374func (tail *Tail) waitForChanges() error {
375 if tail.changes == nil {
376 pos, err := tail.file.Seek(0, io.SeekCurrent)
377 if err != nil {
378 return err
379 }
380 tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
381 if err != nil {
382 return err
383 }
384 }
385
386 select {
387 case <-tail.changes.Modified:
388 return nil
389 case <-tail.changes.Deleted:
390 tail.changes = nil
391 if tail.ReOpen {
392 // XXX: we must not log from a library.
393 tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
394 if err := tail.reopen(); err != nil {
395 return err
396 }
397 tail.Logger.Printf("Successfully reopened %s", tail.Filename)
398 tail.openReader()
399 return nil
400 }
401 tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
402 return ErrStop
403 case <-tail.changes.Truncated:
404 // Always reopen truncated files (Follow is true)
405 tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
406 if err := tail.reopen(); err != nil {
407 return err
408 }
409 tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
410 tail.openReader()
411 return nil
412 case <-tail.Dying():
413 return ErrStop
414 }
415}
416
417func (tail *Tail) openReader() {
418 tail.lk.Lock()
419 if tail.MaxLineSize > 0 {
420 // add 2 to account for newline characters
421 tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
422 } else {
423 tail.reader = bufio.NewReader(tail.file)
424 }
425 tail.lk.Unlock()
426}
427
428func (tail *Tail) seekEnd() error {
429 return tail.seekTo(SeekInfo{Offset: 0, Whence: io.SeekEnd})
430}
431
432func (tail *Tail) seekTo(pos SeekInfo) error {
433 _, err := tail.file.Seek(pos.Offset, pos.Whence)
434 if err != nil {
435 return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
436 }
437 // Reset the read buffer whenever the file is re-seek'ed
438 tail.reader.Reset(tail.file)
439 return nil
440}
441
442// sendLine sends the line(s) to Lines channel, splitting longer lines
443// if necessary. Return false if rate limit is reached.
444func (tail *Tail) sendLine(line string) bool {
445 now := time.Now()
446 lines := []string{line}
447
448 // Split longer lines
449 if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
450 lines = util.PartitionString(line, tail.MaxLineSize)
451 }
452
453 for _, line := range lines {
454 tail.lineNum++
455 offset, _ := tail.Tell()
456 select {
457 case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
458 case <-tail.Dying():
459 return true
460 }
461 }
462
463 if tail.Config.RateLimiter != nil {
464 ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
465 if !ok {
466 tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.",
467 tail.Filename)
468 return false
469 }
470 }
471
472 return true
473}
474
475// Cleanup removes inotify watches added by the tail package. This function is
476// meant to be invoked from a process's exit handler. Linux kernel may not
477// automatically remove inotify watches after the process exits.
478// If you plan to re-read a file, don't call Cleanup in between.
479func (tail *Tail) Cleanup() {
480 watch.Cleanup(tail.Filename)
481}