1// Copyright (c) 2019 FOSS contributors of https://github.com/nxadm/tail
2// Copyright (c) 2015 HPE Software Inc. All rights reserved.
3// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
4
5package watch
6
7import (
8 "log"
9 "os"
10 "path/filepath"
11 "sync"
12 "syscall"
13
14 "github.com/nxadm/tail/util"
15
16 "github.com/fsnotify/fsnotify"
17)
18
19type InotifyTracker struct {
20 mux sync.Mutex
21 watcher *fsnotify.Watcher
22 chans map[string]chan fsnotify.Event
23 done map[string]chan bool
24 watchNums map[string]int
25 watch chan *watchInfo
26 remove chan *watchInfo
27 error chan error
28}
29
30type watchInfo struct {
31 op fsnotify.Op
32 fname string
33}
34
35func (this *watchInfo) isCreate() bool {
36 return this.op == fsnotify.Create
37}
38
39var (
40 // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
41 shared *InotifyTracker
42
43 // these are used to ensure the shared InotifyTracker is run exactly once
44 once = sync.Once{}
45 goRun = func() {
46 shared = &InotifyTracker{
47 mux: sync.Mutex{},
48 chans: make(map[string]chan fsnotify.Event),
49 done: make(map[string]chan bool),
50 watchNums: make(map[string]int),
51 watch: make(chan *watchInfo),
52 remove: make(chan *watchInfo),
53 error: make(chan error),
54 }
55 go shared.run()
56 }
57
58 logger = log.New(os.Stderr, "", log.LstdFlags)
59)
60
61// Watch signals the run goroutine to begin watching the input filename
62func Watch(fname string) error {
63 return watch(&watchInfo{
64 fname: fname,
65 })
66}
67
68// Watch create signals the run goroutine to begin watching the input filename
69// if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
70func WatchCreate(fname string) error {
71 return watch(&watchInfo{
72 op: fsnotify.Create,
73 fname: fname,
74 })
75}
76
77func watch(winfo *watchInfo) error {
78 // start running the shared InotifyTracker if not already running
79 once.Do(goRun)
80
81 winfo.fname = filepath.Clean(winfo.fname)
82 shared.watch <- winfo
83 return <-shared.error
84}
85
86// RemoveWatch signals the run goroutine to remove the watch for the input filename
87func RemoveWatch(fname string) error {
88 return remove(&watchInfo{
89 fname: fname,
90 })
91}
92
93// RemoveWatch create signals the run goroutine to remove the watch for the input filename
94func RemoveWatchCreate(fname string) error {
95 return remove(&watchInfo{
96 op: fsnotify.Create,
97 fname: fname,
98 })
99}
100
101func remove(winfo *watchInfo) error {
102 // start running the shared InotifyTracker if not already running
103 once.Do(goRun)
104
105 winfo.fname = filepath.Clean(winfo.fname)
106 shared.mux.Lock()
107 done := shared.done[winfo.fname]
108 if done != nil {
109 delete(shared.done, winfo.fname)
110 close(done)
111 }
112 shared.mux.Unlock()
113
114 shared.remove <- winfo
115 return <-shared.error
116}
117
118// Events returns a channel to which FileEvents corresponding to the input filename
119// will be sent. This channel will be closed when removeWatch is called on this
120// filename.
121func Events(fname string) <-chan fsnotify.Event {
122 shared.mux.Lock()
123 defer shared.mux.Unlock()
124
125 return shared.chans[fname]
126}
127
128// Cleanup removes the watch for the input filename if necessary.
129func Cleanup(fname string) error {
130 return RemoveWatch(fname)
131}
132
133// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
134// a new Watcher if the previous Watcher was closed.
135func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
136 shared.mux.Lock()
137 defer shared.mux.Unlock()
138
139 if shared.chans[winfo.fname] == nil {
140 shared.chans[winfo.fname] = make(chan fsnotify.Event)
141 }
142 if shared.done[winfo.fname] == nil {
143 shared.done[winfo.fname] = make(chan bool)
144 }
145
146 fname := winfo.fname
147 if winfo.isCreate() {
148 // Watch for new files to be created in the parent directory.
149 fname = filepath.Dir(fname)
150 }
151
152 var err error
153 // already in inotify watch
154 if shared.watchNums[fname] == 0 {
155 err = shared.watcher.Add(fname)
156 }
157 if err == nil {
158 shared.watchNums[fname]++
159 }
160 return err
161}
162
163// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
164// corresponding events channel.
165func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
166 shared.mux.Lock()
167
168 ch := shared.chans[winfo.fname]
169 if ch != nil {
170 delete(shared.chans, winfo.fname)
171 close(ch)
172 }
173
174 fname := winfo.fname
175 if winfo.isCreate() {
176 // Watch for new files to be created in the parent directory.
177 fname = filepath.Dir(fname)
178 }
179 shared.watchNums[fname]--
180 watchNum := shared.watchNums[fname]
181 if watchNum == 0 {
182 delete(shared.watchNums, fname)
183 }
184 shared.mux.Unlock()
185
186 var err error
187 // If we were the last ones to watch this file, unsubscribe from inotify.
188 // This needs to happen after releasing the lock because fsnotify waits
189 // synchronously for the kernel to acknowledge the removal of the watch
190 // for this file, which causes us to deadlock if we still held the lock.
191 if watchNum == 0 {
192 err = shared.watcher.Remove(fname)
193 }
194
195 return err
196}
197
198// sendEvent sends the input event to the appropriate Tail.
199func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
200 name := filepath.Clean(event.Name)
201
202 shared.mux.Lock()
203 ch := shared.chans[name]
204 done := shared.done[name]
205 shared.mux.Unlock()
206
207 if ch != nil && done != nil {
208 select {
209 case ch <- event:
210 case <-done:
211 }
212 }
213}
214
215// run starts the goroutine in which the shared struct reads events from its
216// Watcher's Event channel and sends the events to the appropriate Tail.
217func (shared *InotifyTracker) run() {
218 watcher, err := fsnotify.NewWatcher()
219 if err != nil {
220 util.Fatal("failed to create Watcher")
221 }
222 shared.watcher = watcher
223
224 for {
225 select {
226 case winfo := <-shared.watch:
227 shared.error <- shared.addWatch(winfo)
228
229 case winfo := <-shared.remove:
230 shared.error <- shared.removeWatch(winfo)
231
232 case event, open := <-shared.watcher.Events:
233 if !open {
234 return
235 }
236 shared.sendEvent(event)
237
238 case err, open := <-shared.watcher.Errors:
239 if !open {
240 return
241 } else if err != nil {
242 sysErr, ok := err.(*os.SyscallError)
243 if !ok || sysErr.Err != syscall.EINTR {
244 logger.Printf("Error in Watcher Error channel: %s", err)
245 }
246 }
247 }
248 }
249}