idle.go

  1package fetcher
  2
  3import (
  4	"errors"
  5	"log"
  6	"strings"
  7	"sync"
  8	"time"
  9
 10	"github.com/emersion/go-imap/v2/imapclient"
 11	"github.com/floatpane/matcha/config"
 12)
 13
 14// IdleUpdate is sent when IDLE detects a mailbox change.
 15type IdleUpdate struct {
 16	AccountID  string
 17	FolderName string
 18}
 19
 20// IdleWatcher manages IDLE connections for multiple accounts.
 21type IdleWatcher struct {
 22	mu       sync.Mutex
 23	wg       sync.WaitGroup
 24	watchers map[string]*accountIdle // key: account ID
 25	notify   chan<- IdleUpdate
 26}
 27
 28// ErrStopTimeout is returned when IDLE watcher goroutines do not stop before the timeout.
 29var ErrStopTimeout = errors.New("idle watcher: stop timed out")
 30
 31// accountIdle manages a single IDLE connection for one account.
 32type accountIdle struct {
 33	account *config.Account
 34	folder  string
 35	notify  chan<- IdleUpdate
 36	stop    chan struct{}
 37	done    chan struct{}
 38}
 39
 40// NewIdleWatcher creates a new IDLE watcher. Updates are sent to the notify channel.
 41func NewIdleWatcher(notify chan<- IdleUpdate) *IdleWatcher {
 42	return &IdleWatcher{
 43		watchers: make(map[string]*accountIdle),
 44		notify:   notify,
 45	}
 46}
 47
 48// Watch starts (or restarts) an IDLE connection for the given account and folder.
 49func (w *IdleWatcher) Watch(account *config.Account, folder string) {
 50	w.mu.Lock()
 51	defer w.mu.Unlock()
 52
 53	// Stop existing watcher for this account if any
 54	if existing, ok := w.watchers[account.ID]; ok {
 55		close(existing.stop)
 56		delete(w.watchers, account.ID)
 57		// Let old connection tear down in the background
 58	}
 59
 60	a := &accountIdle{
 61		account: account,
 62		folder:  folder,
 63		notify:  w.notify,
 64		stop:    make(chan struct{}),
 65		done:    make(chan struct{}),
 66	}
 67	w.watchers[account.ID] = a
 68	w.wg.Add(1)
 69	go func() {
 70		defer w.wg.Done()
 71		a.run()
 72	}()
 73}
 74
 75// Stop stops the IDLE watcher for a specific account.
 76func (w *IdleWatcher) Stop(accountID string) {
 77	w.mu.Lock()
 78	defer w.mu.Unlock()
 79
 80	if a, ok := w.watchers[accountID]; ok {
 81		close(a.stop)
 82		delete(w.watchers, accountID)
 83		// Let old connection tear down in the background
 84	}
 85}
 86
 87// StopAll stops all IDLE watchers.
 88func (w *IdleWatcher) StopAll() {
 89	w.mu.Lock()
 90	defer w.mu.Unlock()
 91
 92	for id, a := range w.watchers {
 93		close(a.stop)
 94		delete(w.watchers, id)
 95	}
 96}
 97
 98// StopAllAndWait stops all IDLE watchers and waits for them to finish.
 99func (w *IdleWatcher) StopAllAndWait() {
100	w.mu.Lock()
101	pending := make([]chan struct{}, 0, len(w.watchers))
102	for id, a := range w.watchers {
103		close(a.stop)
104		pending = append(pending, a.done)
105		delete(w.watchers, id)
106	}
107	w.mu.Unlock()
108
109	for _, done := range pending {
110		<-done
111	}
112	w.wg.Wait()
113}
114
115// StopAllAndWaitTimeout stops all IDLE watchers and waits for them to finish up to d.
116func (w *IdleWatcher) StopAllAndWaitTimeout(d time.Duration) error {
117	w.mu.Lock()
118	for id, a := range w.watchers {
119		close(a.stop)
120		delete(w.watchers, id)
121	}
122	w.mu.Unlock()
123
124	done := make(chan struct{})
125	go func() {
126		w.wg.Wait()
127		close(done)
128	}()
129
130	select {
131	case <-done:
132		return nil
133	case <-time.After(d):
134		return ErrStopTimeout
135	}
136}
137
138func (a *accountIdle) run() {
139	defer close(a.done)
140
141	initialBackoff := 5 * time.Second
142	maxBackoff := 2 * time.Minute
143	backoff := initialBackoff
144
145	for {
146		start := time.Now()
147		err := a.idleOnce()
148		if err == nil {
149			// Clean exit (stop was closed)
150			return
151		}
152
153		// Reset backoff if we had a successful IDLE session (ran for
154		// longer than the current backoff period without error).
155		if time.Since(start) > backoff {
156			backoff = initialBackoff
157		}
158
159		// Check if we were told to stop
160		select {
161		case <-a.stop:
162			return
163		default:
164		}
165
166		// Don't retry on authentication errors — they won't resolve by retrying
167		if strings.Contains(err.Error(), "authentication error") || strings.Contains(err.Error(), "XOAUTH2 authentication failed") {
168			log.Printf("IDLE stopped for account %s: %v", a.account.ID, err)
169			return
170		}
171
172		log.Printf("IDLE error for account %s: %v (reconnecting in %v)", a.account.ID, err, backoff)
173
174		// Wait with backoff before reconnecting
175		select {
176		case <-a.stop:
177			return
178		case <-time.After(backoff):
179		}
180
181		backoff *= 2
182		if backoff > maxBackoff {
183			backoff = maxBackoff
184		}
185	}
186}
187
188// idleOnce connects, selects the mailbox, and runs IDLE until an error or stop.
189// Returns nil if stopped cleanly.
190func (a *accountIdle) idleOnce() error {
191	mailboxUpdates := make(chan uint32, 32)
192	c, err := connectWithHandler(a.account, &imapclient.UnilateralDataHandler{
193		Mailbox: func(data *imapclient.UnilateralDataMailbox) {
194			if data.NumMessages == nil {
195				return
196			}
197			// Non-blocking send: the callback runs on the IMAP socket-reader
198			// goroutine, so a synchronous send would stall the socket if the
199			// channel is full. The consumer only acts on the latest count
200			// (see prevExists tracking below), so dropping a stale update is
201			// safe — the next update will deliver the current count.
202			select {
203			case mailboxUpdates <- *data.NumMessages:
204			default:
205			}
206		},
207	})
208	if err != nil {
209		return err
210	}
211	defer c.Close() //nolint:errcheck
212
213	// Select the mailbox in read-only mode
214	selectData, err := c.Select(a.folder, nil).Wait()
215	if err != nil {
216		return err
217	}
218	prevExists := selectData.NumMessages
219
220	// Start IDLE
221	idleCmd, err := c.Idle()
222	if err != nil {
223		return err
224	}
225
226	for {
227		select {
228		case <-a.stop:
229			idleCmd.Close() //nolint:errcheck,gosec
230			idleCmd.Wait()  //nolint:errcheck,gosec
231			return nil
232
233		case newExists := <-mailboxUpdates:
234			if newExists > prevExists {
235				select {
236				case a.notify <- IdleUpdate{
237					AccountID:  a.account.ID,
238					FolderName: a.folder,
239				}:
240				case <-a.stop:
241					idleCmd.Close() //nolint:errcheck,gosec
242					idleCmd.Wait()  //nolint:errcheck,gosec
243					return nil
244				}
245			}
246			prevExists = newExists
247
248		case <-c.Closed():
249			if err := idleCmd.Close(); err != nil {
250				return err
251			}
252			return idleCmd.Wait()
253		}
254	}
255}