idle.go

  1package fetcher
  2
  3import (
  4	"errors"
  5	"log"
  6	"strings"
  7	"sync"
  8	"time"
  9
 10	"github.com/emersion/go-imap/v2/imapclient"
 11	"github.com/floatpane/matcha/config"
 12)
 13
 14// IdleUpdate is sent when IDLE detects a mailbox change.
 15type IdleUpdate struct {
 16	AccountID  string
 17	FolderName string
 18}
 19
 20// IdleWatcher manages IDLE connections for multiple accounts.
 21type IdleWatcher struct {
 22	mu       sync.Mutex
 23	wg       sync.WaitGroup
 24	watchers map[string]*accountIdle // key: account ID
 25	notify   chan<- IdleUpdate
 26}
 27
 28// ErrStopTimeout is returned when IDLE watcher goroutines do not stop before the timeout.
 29var ErrStopTimeout = errors.New("idle watcher: stop timed out")
 30
 31// accountIdle manages a single IDLE connection for one account.
 32type accountIdle struct {
 33	account *config.Account
 34	folder  string
 35	notify  chan<- IdleUpdate
 36	stop    chan struct{}
 37	done    chan struct{}
 38}
 39
 40// NewIdleWatcher creates a new IDLE watcher. Updates are sent to the notify channel.
 41func NewIdleWatcher(notify chan<- IdleUpdate) *IdleWatcher {
 42	return &IdleWatcher{
 43		watchers: make(map[string]*accountIdle),
 44		notify:   notify,
 45	}
 46}
 47
 48// Watch starts (or restarts) an IDLE connection for the given account and folder.
 49func (w *IdleWatcher) Watch(account *config.Account, folder string) {
 50	// IDLE is an IMAP-only concept; non-IMAP backends (maildir, etc.) have
 51	// no remote socket to keep open. Skip silently rather than spinning the
 52	// reconnect loop forever.
 53	if account != nil && account.Protocol != "" && account.Protocol != "imap" {
 54		return
 55	}
 56
 57	w.mu.Lock()
 58	defer w.mu.Unlock()
 59
 60	// Stop existing watcher for this account if any
 61	if existing, ok := w.watchers[account.ID]; ok {
 62		close(existing.stop)
 63		delete(w.watchers, account.ID)
 64		// Let old connection tear down in the background
 65	}
 66
 67	a := &accountIdle{
 68		account: account,
 69		folder:  folder,
 70		notify:  w.notify,
 71		stop:    make(chan struct{}),
 72		done:    make(chan struct{}),
 73	}
 74	w.watchers[account.ID] = a
 75	w.wg.Add(1)
 76	go func() {
 77		defer w.wg.Done()
 78		a.run()
 79	}()
 80}
 81
 82// Stop stops the IDLE watcher for a specific account.
 83func (w *IdleWatcher) Stop(accountID string) {
 84	w.mu.Lock()
 85	defer w.mu.Unlock()
 86
 87	if a, ok := w.watchers[accountID]; ok {
 88		close(a.stop)
 89		delete(w.watchers, accountID)
 90		// Let old connection tear down in the background
 91	}
 92}
 93
 94// StopAll stops all IDLE watchers.
 95func (w *IdleWatcher) StopAll() {
 96	w.mu.Lock()
 97	defer w.mu.Unlock()
 98
 99	for id, a := range w.watchers {
100		close(a.stop)
101		delete(w.watchers, id)
102	}
103}
104
105// StopAllAndWait stops all IDLE watchers and waits for them to finish.
106func (w *IdleWatcher) StopAllAndWait() {
107	w.mu.Lock()
108	pending := make([]chan struct{}, 0, len(w.watchers))
109	for id, a := range w.watchers {
110		close(a.stop)
111		pending = append(pending, a.done)
112		delete(w.watchers, id)
113	}
114	w.mu.Unlock()
115
116	for _, done := range pending {
117		<-done
118	}
119	w.wg.Wait()
120}
121
122// StopAllAndWaitTimeout stops all IDLE watchers and waits for them to finish up to d.
123func (w *IdleWatcher) StopAllAndWaitTimeout(d time.Duration) error {
124	w.mu.Lock()
125	for id, a := range w.watchers {
126		close(a.stop)
127		delete(w.watchers, id)
128	}
129	w.mu.Unlock()
130
131	done := make(chan struct{})
132	go func() {
133		w.wg.Wait()
134		close(done)
135	}()
136
137	select {
138	case <-done:
139		return nil
140	case <-time.After(d):
141		return ErrStopTimeout
142	}
143}
144
145func (a *accountIdle) run() {
146	defer close(a.done)
147
148	initialBackoff := 5 * time.Second
149	maxBackoff := 2 * time.Minute
150	backoff := initialBackoff
151
152	for {
153		start := time.Now()
154		err := a.idleOnce()
155		if err == nil {
156			// Clean exit (stop was closed)
157			return
158		}
159
160		// Reset backoff if we had a successful IDLE session (ran for
161		// longer than the current backoff period without error).
162		if time.Since(start) > backoff {
163			backoff = initialBackoff
164		}
165
166		// Check if we were told to stop
167		select {
168		case <-a.stop:
169			return
170		default:
171		}
172
173		// Don't retry on authentication errors — they won't resolve by retrying
174		if strings.Contains(err.Error(), "authentication error") || strings.Contains(err.Error(), "XOAUTH2 authentication failed") {
175			log.Printf("IDLE stopped for account %s: %v", a.account.ID, err)
176			return
177		}
178
179		log.Printf("IDLE error for account %s: %v (reconnecting in %v)", a.account.ID, err, backoff)
180
181		// Wait with backoff before reconnecting
182		select {
183		case <-a.stop:
184			return
185		case <-time.After(backoff):
186		}
187
188		backoff *= 2
189		if backoff > maxBackoff {
190			backoff = maxBackoff
191		}
192	}
193}
194
195// idleOnce connects, selects the mailbox, and runs IDLE until an error or stop.
196// Returns nil if stopped cleanly.
197func (a *accountIdle) idleOnce() error {
198	mailboxUpdates := make(chan uint32, 32)
199	c, err := connectWithHandler(a.account, &imapclient.UnilateralDataHandler{
200		Mailbox: func(data *imapclient.UnilateralDataMailbox) {
201			if data.NumMessages == nil {
202				return
203			}
204			// Non-blocking send: the callback runs on the IMAP socket-reader
205			// goroutine, so a synchronous send would stall the socket if the
206			// channel is full. The consumer only acts on the latest count
207			// (see prevExists tracking below), so dropping a stale update is
208			// safe — the next update will deliver the current count.
209			select {
210			case mailboxUpdates <- *data.NumMessages:
211			default:
212			}
213		},
214	})
215	if err != nil {
216		return err
217	}
218	defer c.Close() //nolint:errcheck
219
220	// Select the mailbox in read-only mode
221	selectData, err := c.Select(a.folder, nil).Wait()
222	if err != nil {
223		return err
224	}
225	prevExists := selectData.NumMessages
226
227	// Start IDLE
228	idleCmd, err := c.Idle()
229	if err != nil {
230		return err
231	}
232
233	for {
234		select {
235		case <-a.stop:
236			idleCmd.Close() //nolint:errcheck,gosec
237			idleCmd.Wait()  //nolint:errcheck,gosec
238			return nil
239
240		case newExists := <-mailboxUpdates:
241			if newExists > prevExists {
242				select {
243				case a.notify <- IdleUpdate{
244					AccountID:  a.account.ID,
245					FolderName: a.folder,
246				}:
247				case <-a.stop:
248					idleCmd.Close() //nolint:errcheck,gosec
249					idleCmd.Wait()  //nolint:errcheck,gosec
250					return nil
251				}
252			}
253			prevExists = newExists
254
255		case <-c.Closed():
256			if err := idleCmd.Close(); err != nil {
257				return err
258			}
259			return idleCmd.Wait()
260		}
261	}
262}