idle.go

  1package fetcher
  2
  3import (
  4	"log"
  5	"strings"
  6	"sync"
  7	"time"
  8
  9	"github.com/emersion/go-imap/v2/imapclient"
 10	"github.com/floatpane/matcha/config"
 11)
 12
 13// IdleUpdate is sent when IDLE detects a mailbox change.
 14type IdleUpdate struct {
 15	AccountID  string
 16	FolderName string
 17}
 18
 19// IdleWatcher manages IDLE connections for multiple accounts.
 20type IdleWatcher struct {
 21	mu       sync.Mutex
 22	watchers map[string]*accountIdle // key: account ID
 23	notify   chan<- IdleUpdate
 24}
 25
 26// accountIdle manages a single IDLE connection for one account.
 27type accountIdle struct {
 28	account *config.Account
 29	folder  string
 30	notify  chan<- IdleUpdate
 31	stop    chan struct{}
 32	done    chan struct{}
 33}
 34
 35// NewIdleWatcher creates a new IDLE watcher. Updates are sent to the notify channel.
 36func NewIdleWatcher(notify chan<- IdleUpdate) *IdleWatcher {
 37	return &IdleWatcher{
 38		watchers: make(map[string]*accountIdle),
 39		notify:   notify,
 40	}
 41}
 42
 43// Watch starts (or restarts) an IDLE connection for the given account and folder.
 44func (w *IdleWatcher) Watch(account *config.Account, folder string) {
 45	w.mu.Lock()
 46	defer w.mu.Unlock()
 47
 48	// Stop existing watcher for this account if any
 49	if existing, ok := w.watchers[account.ID]; ok {
 50		close(existing.stop)
 51		delete(w.watchers, account.ID)
 52		// Let old connection tear down in the background
 53	}
 54
 55	a := &accountIdle{
 56		account: account,
 57		folder:  folder,
 58		notify:  w.notify,
 59		stop:    make(chan struct{}),
 60		done:    make(chan struct{}),
 61	}
 62	w.watchers[account.ID] = a
 63	go a.run()
 64}
 65
 66// Stop stops the IDLE watcher for a specific account.
 67func (w *IdleWatcher) Stop(accountID string) {
 68	w.mu.Lock()
 69	defer w.mu.Unlock()
 70
 71	if a, ok := w.watchers[accountID]; ok {
 72		close(a.stop)
 73		delete(w.watchers, accountID)
 74		// Let old connection tear down in the background
 75	}
 76}
 77
 78// StopAll stops all IDLE watchers.
 79func (w *IdleWatcher) StopAll() {
 80	w.mu.Lock()
 81	defer w.mu.Unlock()
 82
 83	for id, a := range w.watchers {
 84		close(a.stop)
 85		delete(w.watchers, id)
 86	}
 87}
 88
 89// StopAllAndWait stops all IDLE watchers and waits for them to finish.
 90func (w *IdleWatcher) StopAllAndWait() {
 91	w.mu.Lock()
 92	var pending []chan struct{}
 93	for id, a := range w.watchers {
 94		close(a.stop)
 95		pending = append(pending, a.done)
 96		delete(w.watchers, id)
 97	}
 98	w.mu.Unlock()
 99
100	for _, done := range pending {
101		<-done
102	}
103}
104
105func (a *accountIdle) run() {
106	defer close(a.done)
107
108	initialBackoff := 5 * time.Second
109	maxBackoff := 2 * time.Minute
110	backoff := initialBackoff
111
112	for {
113		start := time.Now()
114		err := a.idleOnce()
115		if err == nil {
116			// Clean exit (stop was closed)
117			return
118		}
119
120		// Reset backoff if we had a successful IDLE session (ran for
121		// longer than the current backoff period without error).
122		if time.Since(start) > backoff {
123			backoff = initialBackoff
124		}
125
126		// Check if we were told to stop
127		select {
128		case <-a.stop:
129			return
130		default:
131		}
132
133		// Don't retry on authentication errors — they won't resolve by retrying
134		if strings.Contains(err.Error(), "authentication error") || strings.Contains(err.Error(), "XOAUTH2 authentication failed") {
135			log.Printf("IDLE stopped for account %s: %v", a.account.ID, err)
136			return
137		}
138
139		log.Printf("IDLE error for account %s: %v (reconnecting in %v)", a.account.ID, err, backoff)
140
141		// Wait with backoff before reconnecting
142		select {
143		case <-a.stop:
144			return
145		case <-time.After(backoff):
146		}
147
148		backoff *= 2
149		if backoff > maxBackoff {
150			backoff = maxBackoff
151		}
152	}
153}
154
155// idleOnce connects, selects the mailbox, and runs IDLE until an error or stop.
156// Returns nil if stopped cleanly.
157func (a *accountIdle) idleOnce() error {
158	mailboxUpdates := make(chan uint32, 32)
159	c, err := connectWithHandler(a.account, &imapclient.UnilateralDataHandler{
160		Mailbox: func(data *imapclient.UnilateralDataMailbox) {
161			if data.NumMessages == nil {
162				return
163			}
164			// Non-blocking send: the callback runs on the IMAP socket-reader
165			// goroutine, so a synchronous send would stall the socket if the
166			// channel is full. The consumer only acts on the latest count
167			// (see prevExists tracking below), so dropping a stale update is
168			// safe — the next update will deliver the current count.
169			select {
170			case mailboxUpdates <- *data.NumMessages:
171			default:
172			}
173		},
174	})
175	if err != nil {
176		return err
177	}
178	defer c.Close()
179
180	// Select the mailbox in read-only mode
181	selectData, err := c.Select(a.folder, nil).Wait()
182	if err != nil {
183		return err
184	}
185	prevExists := selectData.NumMessages
186
187	// Start IDLE
188	idleCmd, err := c.Idle()
189	if err != nil {
190		return err
191	}
192
193	for {
194		select {
195		case <-a.stop:
196			idleCmd.Close()
197			idleCmd.Wait()
198			return nil
199
200		case newExists := <-mailboxUpdates:
201			if newExists > prevExists {
202				select {
203				case a.notify <- IdleUpdate{
204					AccountID:  a.account.ID,
205					FolderName: a.folder,
206				}:
207				case <-a.stop:
208					idleCmd.Close()
209					idleCmd.Wait()
210					return nil
211				}
212			}
213			prevExists = newExists
214
215		case <-c.Closed():
216			if err := idleCmd.Close(); err != nil {
217				return err
218			}
219			return idleCmd.Wait()
220		}
221	}
222}