diff options
author | Koni Marti <koni.marti@gmail.com> | 2022-04-30 01:08:56 +0200 |
---|---|---|
committer | Robin Jarry <robin@jarry.cc> | 2022-05-04 14:07:15 +0200 |
commit | e5b339702a56fa02dedec770a79b64313fb30108 (patch) | |
tree | e9c345f5043c5cf748bc6e25c6be6c1bb173d33f | |
parent | 397a6f267f41c501f28d3adb9d641a9283af474f (diff) | |
download | aerc-e5b339702a56fa02dedec770a79b64313fb30108.zip |
imap: monitor the logout channel with an observer
Untangle the observer functionality from the message handling routine.
Observe the imap client's logout channel and trigger a connection error
when necessary to start the reconnect cycle.
Signed-off-by: Koni Marti <koni.marti@gmail.com>
Acked-by: Robin Jarry <robin@jarry.cc>
-rw-r--r-- | worker/imap/configure.go | 12 | ||||
-rw-r--r-- | worker/imap/observer.go | 152 | ||||
-rw-r--r-- | worker/imap/worker.go | 108 |
3 files changed, 186 insertions, 86 deletions
diff --git a/worker/imap/configure.go b/worker/imap/configure.go index 0bccbae..c25600d 100644 --- a/worker/imap/configure.go +++ b/worker/imap/configure.go @@ -50,6 +50,9 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error { w.config.keepalive_period = 0 * time.Second w.config.keepalive_probes = 3 w.config.keepalive_interval = 3 + + w.config.reconnect_maxwait = 30 * time.Second + for key, value := range msg.Config.Params { switch key { case "idle-timeout": @@ -60,6 +63,14 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error { value, err) } w.config.idle_timeout = val + case "reconnect-maxwait": + val, err := time.ParseDuration(value) + if err != nil || val < 0 { + return fmt.Errorf( + "invalid reconnect-maxwait value %v: %v", + value, err) + } + w.config.reconnect_maxwait = val case "connection-timeout": val, err := time.ParseDuration(value) if err != nil || val < 0 { @@ -96,6 +107,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error { } w.idler = newIdler(w.config, w.worker) + w.observer = newObserver(w.config, w.worker) return nil } diff --git a/worker/imap/observer.go b/worker/imap/observer.go new file mode 100644 index 0000000..e49744c --- /dev/null +++ b/worker/imap/observer.go @@ -0,0 +1,152 @@ +package imap + +import ( + "fmt" + "math" + "sync" + "time" + + "git.sr.ht/~rjarry/aerc/worker/types" + "github.com/emersion/go-imap" +) + +// observer monitors the loggedOut channel of the imap client. If the logout +// signal is received, the observer will emit a connection error to the ui in +// order to start the reconnect cycle. +type observer struct { + sync.Mutex + config imapConfig + client *imapClient + worker *types.Worker + done chan struct{} + autoReconnect bool + retries int + running bool +} + +func newObserver(cfg imapConfig, w *types.Worker) *observer { + return &observer{config: cfg, worker: w, done: make(chan struct{})} +} + +func (o *observer) SetClient(c *imapClient) { + o.Stop() + o.Lock() + o.client = c + o.Unlock() + o.Start() + o.retries = 0 +} + +func (o *observer) SetAutoReconnect(auto bool) { + o.autoReconnect = auto +} + +func (o *observer) AutoReconnect() bool { + return o.autoReconnect +} + +func (o *observer) isClientConnected() bool { + o.Lock() + defer o.Unlock() + return o.client != nil && o.client.State() == imap.SelectedState +} + +func (o *observer) EmitIfNotConnected() bool { + if !o.isClientConnected() { + o.emit("imap client not connected: attempt reconnect") + return true + } + return false +} + +func (o *observer) IsRunning() bool { + return o.running +} + +func (o *observer) Start() { + if o.running { + o.log("runs already") + return + } + if o.client == nil { + return + } + if o.EmitIfNotConnected() { + return + } + go func() { + select { + case <-o.client.LoggedOut(): + o.log("<-logout") + if o.autoReconnect { + o.emit("logged out") + } else { + o.log("ignore logout (auto-reconnect off)") + } + case <-o.done: + o.log("<-done") + } + o.running = false + o.log("stopped") + }() + o.running = true + o.log("started") +} + +func (o *observer) Stop() { + if o.client == nil { + return + } + if o.done != nil { + close(o.done) + } + o.done = make(chan struct{}) + o.running = false +} + +func (o *observer) DelayedReconnect() error { + if o.client == nil { + return nil + } + var wait time.Duration + var reterr error + + if o.retries > 0 { + backoff := int(math.Pow(1.8, float64(o.retries))) + var err error + wait, err = time.ParseDuration(fmt.Sprintf("%ds", backoff)) + if err != nil { + return err + } + if wait > o.config.reconnect_maxwait { + wait = o.config.reconnect_maxwait + } + + reterr = fmt.Errorf("reconnect in %v", wait) + } else { + reterr = fmt.Errorf("reconnect") + } + + go func() { + <-time.After(wait) + o.emit(reterr.Error()) + }() + + o.retries++ + return reterr +} + +func (o *observer) emit(errMsg string) { + o.log("disconnect done->") + o.worker.PostMessage(&types.Done{ + Message: types.RespondTo(&types.Disconnect{})}, nil) + o.log("connection error->") + o.worker.PostMessage(&types.ConnError{ + Error: fmt.Errorf(errMsg), + }, nil) +} + +func (o *observer) log(args ...interface{}) { + header := fmt.Sprintf("observer (%p) [running:%t]", o, o.running) + o.worker.Logger.Println(append([]interface{}{header}, args...)...) +} diff --git a/worker/imap/worker.go b/worker/imap/worker.go index d0f8482..6e47530 100644 --- a/worker/imap/worker.go +++ b/worker/imap/worker.go @@ -3,7 +3,6 @@ package imap import ( "crypto/tls" "fmt" - "math" "net" "net/url" "time" @@ -37,13 +36,14 @@ type imapClient struct { } type imapConfig struct { - scheme string - insecure bool - addr string - user *url.Userinfo - folders []string - oauthBearer lib.OAuthBearer - idle_timeout time.Duration + scheme string + insecure bool + addr string + user *url.Userinfo + folders []string + oauthBearer lib.OAuthBearer + idle_timeout time.Duration + reconnect_maxwait time.Duration // tcp connection parameters connection_timeout time.Duration keepalive_period time.Duration @@ -61,11 +61,8 @@ type IMAPWorker struct { // Map of sequence numbers to UIDs, index 0 is seq number 1 seqMap []uint32 - done chan struct{} - autoReconnect bool - retries int - - idler *idler + idler *idler + observer *observer } func NewIMAPWorker(worker *types.Worker) (types.Backend, error) { @@ -74,6 +71,7 @@ func NewIMAPWorker(worker *types.Worker) (types.Backend, error) { worker: worker, selected: &imap.MailboxStatus{}, idler: newIdler(imapConfig{}, worker), + observer: newObserver(imapConfig{}, worker), }, nil } @@ -81,6 +79,7 @@ func (w *IMAPWorker) newClient(c *client.Client) { c.Updates = w.updates w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)} w.idler.SetClient(w.client) + w.observer.SetClient(w.client) } func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { @@ -93,12 +92,6 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { var reterr error // will be returned at the end, needed to support idle - checkConn := func(wait time.Duration) { - time.Sleep(wait) - w.stopConnectionObserver() - w.startConnectionObserver() - } - // set connection timeout for calls to imap server if w.client != nil { w.client.Timeout = w.config.connection_timeout @@ -111,53 +104,43 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { reterr = w.handleConfigure(msg) case *types.Connect: if w.client != nil && w.client.State() == imap.SelectedState { - if !w.autoReconnect { - w.autoReconnect = true - checkConn(0) + if !w.observer.AutoReconnect() { + w.observer.SetAutoReconnect(true) + w.observer.EmitIfNotConnected() } reterr = errAlreadyConnected break } - w.autoReconnect = true + w.observer.SetAutoReconnect(true) c, err := w.connect() if err != nil { - checkConn(0) + w.observer.EmitIfNotConnected() reterr = err break } - w.stopConnectionObserver() - w.newClient(c) - w.startConnectionObserver() - w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil) case *types.Reconnect: - if !w.autoReconnect { + if !w.observer.AutoReconnect() { reterr = fmt.Errorf("auto-reconnect is disabled; run connect to enable it") break } c, err := w.connect() if err != nil { - wait, msg := w.exponentialBackoff() - go checkConn(wait) - w.retries++ - reterr = errors.Wrap(err, msg) + errReconnect := w.observer.DelayedReconnect() + reterr = errors.Wrap(errReconnect, err.Error()) break } - w.stopConnectionObserver() - w.newClient(c) - w.startConnectionObserver() - w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil) case *types.Disconnect: - w.autoReconnect = false - w.stopConnectionObserver() + w.observer.SetAutoReconnect(false) + w.observer.Stop() if w.client == nil || w.client.State() != imap.SelectedState { reterr = errNotConnected break @@ -267,51 +250,6 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) { } } -func (w *IMAPWorker) exponentialBackoff() (time.Duration, string) { - maxWait := 16 - if w.retries > 0 { - backoff := int(math.Pow(2.0, float64(w.retries))) - if backoff > maxWait { - backoff = maxWait - } - waitStr := fmt.Sprintf("%ds", backoff) - wait, err := time.ParseDuration(waitStr) - if err == nil { - return wait, fmt.Sprintf("wait %s before reconnect", waitStr) - } - } - return 0 * time.Second, "" -} - -func (w *IMAPWorker) startConnectionObserver() { - emitConnErr := func(errMsg string) { - w.worker.PostMessage(&types.ConnError{ - Error: fmt.Errorf(errMsg), - }, nil) - } - if w.client == nil { - emitConnErr("imap client not connected") - return - } - go func() { - select { - case <-w.client.LoggedOut(): - if w.autoReconnect { - emitConnErr("imap: logged out") - } - case <-w.done: - return - } - }() -} - -func (w *IMAPWorker) stopConnectionObserver() { - if w.done != nil { - close(w.done) - } - w.done = make(chan struct{}) -} - func (w *IMAPWorker) connect() (*client.Client, error) { var ( conn *net.TCPConn @@ -391,8 +329,6 @@ func (w *IMAPWorker) connect() (*client.Client, error) { return nil, err } - w.retries = 0 - return c, nil } |