1package fetcher
2
3import (
4 "errors"
5 "log"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/emersion/go-imap/v2/imapclient"
11 "github.com/floatpane/matcha/config"
12)
13
14// IdleUpdate is sent when IDLE detects a mailbox change.
15type IdleUpdate struct {
16 AccountID string
17 FolderName string
18}
19
20// IdleWatcher manages IDLE connections for multiple accounts.
21type IdleWatcher struct {
22 mu sync.Mutex
23 wg sync.WaitGroup
24 watchers map[string]*accountIdle // key: account ID
25 notify chan<- IdleUpdate
26}
27
28// ErrStopTimeout is returned when IDLE watcher goroutines do not stop before the timeout.
29var ErrStopTimeout = errors.New("idle watcher: stop timed out")
30
31// accountIdle manages a single IDLE connection for one account.
32type accountIdle struct {
33 account *config.Account
34 folder string
35 notify chan<- IdleUpdate
36 stop chan struct{}
37 done chan struct{}
38}
39
40// NewIdleWatcher creates a new IDLE watcher. Updates are sent to the notify channel.
41func NewIdleWatcher(notify chan<- IdleUpdate) *IdleWatcher {
42 return &IdleWatcher{
43 watchers: make(map[string]*accountIdle),
44 notify: notify,
45 }
46}
47
48// Watch starts (or restarts) an IDLE connection for the given account and folder.
49func (w *IdleWatcher) Watch(account *config.Account, folder string) {
50 w.mu.Lock()
51 defer w.mu.Unlock()
52
53 // Stop existing watcher for this account if any
54 if existing, ok := w.watchers[account.ID]; ok {
55 close(existing.stop)
56 delete(w.watchers, account.ID)
57 // Let old connection tear down in the background
58 }
59
60 a := &accountIdle{
61 account: account,
62 folder: folder,
63 notify: w.notify,
64 stop: make(chan struct{}),
65 done: make(chan struct{}),
66 }
67 w.watchers[account.ID] = a
68 w.wg.Add(1)
69 go func() {
70 defer w.wg.Done()
71 a.run()
72 }()
73}
74
75// Stop stops the IDLE watcher for a specific account.
76func (w *IdleWatcher) Stop(accountID string) {
77 w.mu.Lock()
78 defer w.mu.Unlock()
79
80 if a, ok := w.watchers[accountID]; ok {
81 close(a.stop)
82 delete(w.watchers, accountID)
83 // Let old connection tear down in the background
84 }
85}
86
87// StopAll stops all IDLE watchers.
88func (w *IdleWatcher) StopAll() {
89 w.mu.Lock()
90 defer w.mu.Unlock()
91
92 for id, a := range w.watchers {
93 close(a.stop)
94 delete(w.watchers, id)
95 }
96}
97
98// StopAllAndWait stops all IDLE watchers and waits for them to finish.
99func (w *IdleWatcher) StopAllAndWait() {
100 w.mu.Lock()
101 pending := make([]chan struct{}, 0, len(w.watchers))
102 for id, a := range w.watchers {
103 close(a.stop)
104 pending = append(pending, a.done)
105 delete(w.watchers, id)
106 }
107 w.mu.Unlock()
108
109 for _, done := range pending {
110 <-done
111 }
112 w.wg.Wait()
113}
114
115// StopAllAndWaitTimeout stops all IDLE watchers and waits for them to finish up to d.
116func (w *IdleWatcher) StopAllAndWaitTimeout(d time.Duration) error {
117 w.mu.Lock()
118 for id, a := range w.watchers {
119 close(a.stop)
120 delete(w.watchers, id)
121 }
122 w.mu.Unlock()
123
124 done := make(chan struct{})
125 go func() {
126 w.wg.Wait()
127 close(done)
128 }()
129
130 select {
131 case <-done:
132 return nil
133 case <-time.After(d):
134 return ErrStopTimeout
135 }
136}
137
138func (a *accountIdle) run() {
139 defer close(a.done)
140
141 initialBackoff := 5 * time.Second
142 maxBackoff := 2 * time.Minute
143 backoff := initialBackoff
144
145 for {
146 start := time.Now()
147 err := a.idleOnce()
148 if err == nil {
149 // Clean exit (stop was closed)
150 return
151 }
152
153 // Reset backoff if we had a successful IDLE session (ran for
154 // longer than the current backoff period without error).
155 if time.Since(start) > backoff {
156 backoff = initialBackoff
157 }
158
159 // Check if we were told to stop
160 select {
161 case <-a.stop:
162 return
163 default:
164 }
165
166 // Don't retry on authentication errors — they won't resolve by retrying
167 if strings.Contains(err.Error(), "authentication error") || strings.Contains(err.Error(), "XOAUTH2 authentication failed") {
168 log.Printf("IDLE stopped for account %s: %v", a.account.ID, err)
169 return
170 }
171
172 log.Printf("IDLE error for account %s: %v (reconnecting in %v)", a.account.ID, err, backoff)
173
174 // Wait with backoff before reconnecting
175 select {
176 case <-a.stop:
177 return
178 case <-time.After(backoff):
179 }
180
181 backoff *= 2
182 if backoff > maxBackoff {
183 backoff = maxBackoff
184 }
185 }
186}
187
188// idleOnce connects, selects the mailbox, and runs IDLE until an error or stop.
189// Returns nil if stopped cleanly.
190func (a *accountIdle) idleOnce() error {
191 mailboxUpdates := make(chan uint32, 32)
192 c, err := connectWithHandler(a.account, &imapclient.UnilateralDataHandler{
193 Mailbox: func(data *imapclient.UnilateralDataMailbox) {
194 if data.NumMessages == nil {
195 return
196 }
197 // Non-blocking send: the callback runs on the IMAP socket-reader
198 // goroutine, so a synchronous send would stall the socket if the
199 // channel is full. The consumer only acts on the latest count
200 // (see prevExists tracking below), so dropping a stale update is
201 // safe — the next update will deliver the current count.
202 select {
203 case mailboxUpdates <- *data.NumMessages:
204 default:
205 }
206 },
207 })
208 if err != nil {
209 return err
210 }
211 defer c.Close() //nolint:errcheck
212
213 // Select the mailbox in read-only mode
214 selectData, err := c.Select(a.folder, nil).Wait()
215 if err != nil {
216 return err
217 }
218 prevExists := selectData.NumMessages
219
220 // Start IDLE
221 idleCmd, err := c.Idle()
222 if err != nil {
223 return err
224 }
225
226 for {
227 select {
228 case <-a.stop:
229 idleCmd.Close() //nolint:errcheck,gosec
230 idleCmd.Wait() //nolint:errcheck,gosec
231 return nil
232
233 case newExists := <-mailboxUpdates:
234 if newExists > prevExists {
235 select {
236 case a.notify <- IdleUpdate{
237 AccountID: a.account.ID,
238 FolderName: a.folder,
239 }:
240 case <-a.stop:
241 idleCmd.Close() //nolint:errcheck,gosec
242 idleCmd.Wait() //nolint:errcheck,gosec
243 return nil
244 }
245 }
246 prevExists = newExists
247
248 case <-c.Closed():
249 if err := idleCmd.Close(); err != nil {
250 return err
251 }
252 return idleCmd.Wait()
253 }
254 }
255}