inotify_tracker.go

  1// Copyright (c) 2019 FOSS contributors of https://github.com/nxadm/tail
  2// Copyright (c) 2015 HPE Software Inc. All rights reserved.
  3// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
  4
  5package watch
  6
  7import (
  8	"log"
  9	"os"
 10	"path/filepath"
 11	"sync"
 12	"syscall"
 13
 14	"github.com/nxadm/tail/util"
 15
 16    "github.com/fsnotify/fsnotify"
 17)
 18
 19type InotifyTracker struct {
 20	mux       sync.Mutex
 21	watcher   *fsnotify.Watcher
 22	chans     map[string]chan fsnotify.Event
 23	done      map[string]chan bool
 24	watchNums map[string]int
 25	watch     chan *watchInfo
 26	remove    chan *watchInfo
 27	error     chan error
 28}
 29
 30type watchInfo struct {
 31	op    fsnotify.Op
 32	fname string
 33}
 34
 35func (this *watchInfo) isCreate() bool {
 36	return this.op == fsnotify.Create
 37}
 38
 39var (
 40	// globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
 41	shared *InotifyTracker
 42
 43	// these are used to ensure the shared InotifyTracker is run exactly once
 44	once  = sync.Once{}
 45	goRun = func() {
 46		shared = &InotifyTracker{
 47			mux:       sync.Mutex{},
 48			chans:     make(map[string]chan fsnotify.Event),
 49			done:      make(map[string]chan bool),
 50			watchNums: make(map[string]int),
 51			watch:     make(chan *watchInfo),
 52			remove:    make(chan *watchInfo),
 53			error:     make(chan error),
 54		}
 55		go shared.run()
 56	}
 57
 58	logger = log.New(os.Stderr, "", log.LstdFlags)
 59)
 60
 61// Watch signals the run goroutine to begin watching the input filename
 62func Watch(fname string) error {
 63	return watch(&watchInfo{
 64		fname: fname,
 65	})
 66}
 67
 68// Watch create signals the run goroutine to begin watching the input filename
 69// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
 70func WatchCreate(fname string) error {
 71	return watch(&watchInfo{
 72		op:    fsnotify.Create,
 73		fname: fname,
 74	})
 75}
 76
 77func watch(winfo *watchInfo) error {
 78	// start running the shared InotifyTracker if not already running
 79	once.Do(goRun)
 80
 81	winfo.fname = filepath.Clean(winfo.fname)
 82	shared.watch <- winfo
 83	return <-shared.error
 84}
 85
 86// RemoveWatch signals the run goroutine to remove the watch for the input filename
 87func RemoveWatch(fname string) error {
 88	return remove(&watchInfo{
 89		fname: fname,
 90	})
 91}
 92
 93// RemoveWatch create signals the run goroutine to remove the watch for the input filename
 94func RemoveWatchCreate(fname string) error {
 95	return remove(&watchInfo{
 96		op:    fsnotify.Create,
 97		fname: fname,
 98	})
 99}
100
101func remove(winfo *watchInfo) error {
102	// start running the shared InotifyTracker if not already running
103	once.Do(goRun)
104
105	winfo.fname = filepath.Clean(winfo.fname)
106	shared.mux.Lock()
107	done := shared.done[winfo.fname]
108	if done != nil {
109		delete(shared.done, winfo.fname)
110		close(done)
111	}
112	shared.mux.Unlock()
113
114	shared.remove <- winfo
115	return <-shared.error
116}
117
118// Events returns a channel to which FileEvents corresponding to the input filename
119// will be sent. This channel will be closed when removeWatch is called on this
120// filename.
121func Events(fname string) <-chan fsnotify.Event {
122	shared.mux.Lock()
123	defer shared.mux.Unlock()
124
125	return shared.chans[fname]
126}
127
128// Cleanup removes the watch for the input filename if necessary.
129func Cleanup(fname string) error {
130	return RemoveWatch(fname)
131}
132
133// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
134// a new Watcher if the previous Watcher was closed.
135func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
136	shared.mux.Lock()
137	defer shared.mux.Unlock()
138
139	if shared.chans[winfo.fname] == nil {
140		shared.chans[winfo.fname] = make(chan fsnotify.Event)
141	}
142	if shared.done[winfo.fname] == nil {
143		shared.done[winfo.fname] = make(chan bool)
144	}
145
146	fname := winfo.fname
147	if winfo.isCreate() {
148		// Watch for new files to be created in the parent directory.
149		fname = filepath.Dir(fname)
150	}
151
152	var err error
153	// already in inotify watch
154	if shared.watchNums[fname] == 0 {
155		err = shared.watcher.Add(fname)
156	}
157	if err == nil {
158		shared.watchNums[fname]++
159	}
160	return err
161}
162
163// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
164// corresponding events channel.
165func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
166	shared.mux.Lock()
167
168	ch := shared.chans[winfo.fname]
169	if ch != nil {
170		delete(shared.chans, winfo.fname)
171		close(ch)
172	}
173
174	fname := winfo.fname
175	if winfo.isCreate() {
176		// Watch for new files to be created in the parent directory.
177		fname = filepath.Dir(fname)
178	}
179	shared.watchNums[fname]--
180	watchNum := shared.watchNums[fname]
181	if watchNum == 0 {
182		delete(shared.watchNums, fname)
183	}
184	shared.mux.Unlock()
185
186	var err error
187	// If we were the last ones to watch this file, unsubscribe from inotify.
188	// This needs to happen after releasing the lock because fsnotify waits
189	// synchronously for the kernel to acknowledge the removal of the watch
190	// for this file, which causes us to deadlock if we still held the lock.
191	if watchNum == 0 {
192		err = shared.watcher.Remove(fname)
193	}
194
195	return err
196}
197
198// sendEvent sends the input event to the appropriate Tail.
199func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
200	name := filepath.Clean(event.Name)
201
202	shared.mux.Lock()
203	ch := shared.chans[name]
204	done := shared.done[name]
205	shared.mux.Unlock()
206
207	if ch != nil && done != nil {
208		select {
209		case ch <- event:
210		case <-done:
211		}
212	}
213}
214
215// run starts the goroutine in which the shared struct reads events from its
216// Watcher's Event channel and sends the events to the appropriate Tail.
217func (shared *InotifyTracker) run() {
218	watcher, err := fsnotify.NewWatcher()
219	if err != nil {
220		util.Fatal("failed to create Watcher")
221	}
222	shared.watcher = watcher
223
224	for {
225		select {
226		case winfo := <-shared.watch:
227			shared.error <- shared.addWatch(winfo)
228
229		case winfo := <-shared.remove:
230			shared.error <- shared.removeWatch(winfo)
231
232		case event, open := <-shared.watcher.Events:
233			if !open {
234				return
235			}
236			shared.sendEvent(event)
237
238		case err, open := <-shared.watcher.Errors:
239			if !open {
240				return
241			} else if err != nil {
242				sysErr, ok := err.(*os.SyscallError)
243				if !ok || sysErr.Err != syscall.EINTR {
244					logger.Printf("Error in Watcher Error channel: %s", err)
245				}
246			}
247		}
248	}
249}