From 83b3fd9c608ca35c5b4c3ee230d1849ca80bd7bb Mon Sep 17 00:00:00 2001 From: Matt Van Horn Date: Tue, 26 May 2026 01:22:22 -0700 Subject: [PATCH] fix(fetcher): wait for IDLE goroutines (#1342) ## What? Track every IDLE watcher goroutine on the `IdleWatcher` via `sync.WaitGroup` so `StopAllAndWait` catches lingering goroutines whose map entry was already replaced. Add `StopAllAndWaitTimeout` for shutdown paths that need a bounded wait, and switch the daemon shutdown from the fire-and-forget `StopAll()` to the bounded variant so one stuck IMAP connection cannot block the daemon. ## Why? Closes #731 (filed by @andrinoff). The existing pattern in `fetcher/idle.go:48-60` and `:72-75` does `close(existing.stop); delete(w.watchers, account.ID)` and leaves the goroutine to tear down in the background. If the IMAP connection is stuck (server stops responding mid-IDLE, network hangs without RST), the goroutine never exits, the map no longer holds the `done` channel, and `StopAllAndWait()` has no way to see it. The original report's suggested fix was "use WaitGroup or ensure goroutines terminate within timeout" - this PR does both. The daemon side surfaces the problem: `daemon/daemon.go:129` previously called `idleWatcher.StopAll()` and immediately continued to `closeAllClients()`. If any underlying connection hung, the lingering IDLE goroutines outlived the daemon's shutdown path with no log entry. The new bounded-wait shutdown logs `idle watcher: stop timed out` when goroutines don't exit within 5s, so operators see leaks instead of silently losing them. Two new tests cover both behaviors: replaced-goroutine tracking via WaitGroup, and timeout-on-slow-exit returning `ErrStopTimeout`. Fixes #731 --- daemon/daemon.go | 4 ++- fetcher/idle.go | 35 ++++++++++++++++++++- fetcher/idle_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 fetcher/idle_test.go diff --git a/daemon/daemon.go b/daemon/daemon.go index 0fcbe5b46bb5db92ed76834abaafa343a06003e0..bd0b20c65ab92c3a9fff1eeb77c8272a31cd502a 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -132,7 +132,9 @@ func (d *Daemon) Run() error { // Cleanup. log.Println("daemon: shutting down") d.listener.Close() //nolint:errcheck,gosec - d.idleWatcher.StopAll() + if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil { + log.Printf("daemon: %v", err) + } cancel() d.closeAllClients() d.closeProviders() diff --git a/fetcher/idle.go b/fetcher/idle.go index 7757b04e531d984ba1be08a3cfdf365253bf876a..6daf59e0d5d0517cda58809f0d72ead6f9e7c254 100644 --- a/fetcher/idle.go +++ b/fetcher/idle.go @@ -1,6 +1,7 @@ package fetcher import ( + "errors" "log" "strings" "sync" @@ -19,10 +20,14 @@ type IdleUpdate struct { // IdleWatcher manages IDLE connections for multiple accounts. type IdleWatcher struct { mu sync.Mutex + wg sync.WaitGroup watchers map[string]*accountIdle // key: account ID notify chan<- IdleUpdate } +// ErrStopTimeout is returned when IDLE watcher goroutines do not stop before the timeout. +var ErrStopTimeout = errors.New("idle watcher: stop timed out") + // accountIdle manages a single IDLE connection for one account. type accountIdle struct { account *config.Account @@ -60,7 +65,11 @@ func (w *IdleWatcher) Watch(account *config.Account, folder string) { done: make(chan struct{}), } w.watchers[account.ID] = a - go a.run() + w.wg.Add(1) + go func() { + defer w.wg.Done() + a.run() + }() } // Stop stops the IDLE watcher for a specific account. @@ -100,6 +109,30 @@ func (w *IdleWatcher) StopAllAndWait() { for _, done := range pending { <-done } + w.wg.Wait() +} + +// StopAllAndWaitTimeout stops all IDLE watchers and waits for them to finish up to d. +func (w *IdleWatcher) StopAllAndWaitTimeout(d time.Duration) error { + w.mu.Lock() + for id, a := range w.watchers { + close(a.stop) + delete(w.watchers, id) + } + w.mu.Unlock() + + done := make(chan struct{}) + go func() { + w.wg.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-time.After(d): + return ErrStopTimeout + } } func (a *accountIdle) run() { diff --git a/fetcher/idle_test.go b/fetcher/idle_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a7ac96a21cf66316af5b38bacdd6010fbe30b1f2 --- /dev/null +++ b/fetcher/idle_test.go @@ -0,0 +1,72 @@ +package fetcher + +import ( + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/floatpane/matcha/config" +) + +func TestIdleWatcher_StopAllAndWait_TracksReplacedGoroutines(t *testing.T) { + w := NewIdleWatcher(make(chan IdleUpdate)) + stopCh := make(chan struct{}) + doneCh := make(chan struct{}) + var exits atomic.Int64 + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer close(doneCh) + <-stopCh + exits.Add(1) + }() + + w.watchers["acct"] = &accountIdle{ + account: &config.Account{ID: "acct"}, + stop: stopCh, + done: doneCh, + } + + if err := w.StopAllAndWaitTimeout(time.Second); err != nil { + t.Fatalf("StopAllAndWaitTimeout returned error: %v", err) + } + if got := exits.Load(); got != 1 { + t.Fatalf("expected synthetic watcher to exit once, got %d", got) + } +} + +func TestIdleWatcher_StopAllAndWaitTimeout_ReturnsOnSlowExit(t *testing.T) { + w := NewIdleWatcher(make(chan IdleUpdate)) + stopCh := make(chan struct{}) + doneCh := make(chan struct{}) + release := make(chan struct{}) + exited := make(chan struct{}) + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer close(doneCh) + defer close(exited) + <-release + }() + + w.watchers["acct"] = &accountIdle{ + account: &config.Account{ID: "acct"}, + stop: stopCh, + done: doneCh, + } + + err := w.StopAllAndWaitTimeout(50 * time.Millisecond) + if !errors.Is(err, ErrStopTimeout) { + t.Fatalf("expected ErrStopTimeout, got %v", err) + } + + close(release) + select { + case <-exited: + case <-time.After(time.Second): + t.Fatal("synthetic watcher did not exit during cleanup") + } +}