From dc2a2c2dfd6dc327fe40fbf2da922ef6c3d520be Mon Sep 17 00:00:00 2001 From: y0ast Date: Fri, 12 Nov 2021 18:12:02 +0100 Subject: messages: allow displaying email threads Display threads in the message list. For now, only supported by the notmuch backend and on IMAP when the server supports the THREAD extension. Setting threading-enable=true is global and will cause the message list to be empty with maildir:// accounts. Co-authored-by: Kevin Kuehler Co-authored-by: Reto Brunner Signed-off-by: Robin Jarry --- worker/imap/open.go | 63 +++++++++++++++++++++ worker/imap/worker.go | 7 ++- worker/notmuch/lib/database.go | 121 ++++++++++++++++++++++++++++++++++++++--- worker/notmuch/lib/thread.go | 14 +++++ worker/notmuch/worker.go | 30 +++++++++- worker/types/messages.go | 10 ++++ worker/types/thread.go | 99 +++++++++++++++++++++++++++++++++ worker/types/thread_test.go | 108 ++++++++++++++++++++++++++++++++++++ 8 files changed, 439 insertions(+), 13 deletions(-) create mode 100644 worker/notmuch/lib/thread.go create mode 100644 worker/types/thread.go create mode 100644 worker/types/thread_test.go (limited to 'worker') diff --git a/worker/imap/open.go b/worker/imap/open.go index 4b4e943..2849573 100644 --- a/worker/imap/open.go +++ b/worker/imap/open.go @@ -1,6 +1,8 @@ package imap import ( + "sort" + "github.com/emersion/go-imap" sortthread "github.com/emersion/go-imap-sortthread" @@ -92,3 +94,64 @@ func translateSortCriterions( } return result } + +func (imapw *IMAPWorker) handleDirectoryThreaded( + msg *types.FetchDirectoryThreaded) { + imapw.worker.Logger.Printf("Fetching threaded UID list") + + seqSet := &imap.SeqSet{} + seqSet.AddRange(1, imapw.selected.Messages) + threads, err := imapw.client.thread.UidThread(sortthread.References, + &imap.SearchCriteria{SeqNum: seqSet}) + if err != nil { + imapw.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + } else { + aercThreads, count := convertThreads(threads, nil) + sort.Sort(types.ByUID(aercThreads)) + imapw.worker.Logger.Printf("Found %d threaded messages", count) + imapw.seqMap = make([]uint32, count) + imapw.worker.PostMessage(&types.DirectoryThreaded{ + Message: types.RespondTo(msg), + Threads: aercThreads, + }, nil) + imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) + } +} + +func convertThreads(threads []*sortthread.Thread, parent *types.Thread) ([]*types.Thread, int) { + if threads == nil { + return nil, 0 + } + conv := make([]*types.Thread, len(threads)) + count := 0 + + for i := 0; i < len(threads); i++ { + t := threads[i] + conv[i] = &types.Thread{ + Uid: t.Id, + } + + // Set the first child node + children, childCount := convertThreads(t.Children, conv[i]) + if len(children) > 0 { + conv[i].FirstChild = children[0] + } + + // Set the parent node + if parent != nil { + conv[i].Parent = parent + + // elements of threads are siblings + if i > 0 { + conv[i].PrevSibling = conv[i-1] + conv[i-1].NextSibling = conv[i] + } + } + + count += childCount + 1 + } + return conv, count +} diff --git a/worker/imap/worker.go b/worker/imap/worker.go index cd52536..2c0e6a6 100644 --- a/worker/imap/worker.go +++ b/worker/imap/worker.go @@ -26,7 +26,8 @@ var errUnsupported = fmt.Errorf("unsupported command") type imapClient struct { *client.Client - sort *sortthread.SortClient + thread *sortthread.ThreadClient + sort *sortthread.SortClient } type IMAPWorker struct { @@ -158,7 +159,7 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { } c.Updates = w.updates - w.client = &imapClient{c, sortthread.NewSortClient(c)} + w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)} w.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) case *types.Disconnect: if w.client == nil { @@ -175,6 +176,8 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { w.handleOpenDirectory(msg) case *types.FetchDirectoryContents: w.handleFetchDirectoryContents(msg) + case *types.FetchDirectoryThreaded: + w.handleDirectoryThreaded(msg) case *types.CreateDirectory: w.handleCreateDirectory(msg) case *types.RemoveDirectory: diff --git a/worker/notmuch/lib/database.go b/worker/notmuch/lib/database.go index 683ace5..ad670c5 100644 --- a/worker/notmuch/lib/database.go +++ b/worker/notmuch/lib/database.go @@ -7,6 +7,8 @@ import ( "log" "time" + "git.sr.ht/~rjarry/aerc/lib/uidstore" + "git.sr.ht/~rjarry/aerc/worker/types" notmuch "github.com/zenhack/go.notmuch" ) @@ -18,6 +20,7 @@ type DB struct { logger *log.Logger lastOpenTime time.Time db *notmuch.DB + uidStore *uidstore.Store } func NewDB(path string, excludedTags []string, @@ -26,6 +29,7 @@ func NewDB(path string, excludedTags []string, path: path, excludedTags: excludedTags, logger: logger, + uidStore: uidstore.NewStore(), } return db } @@ -106,7 +110,7 @@ func (db *DB) ListTags() ([]string, error) { //It also configures the query as specified on the worker func (db *DB) newQuery(ndb *notmuch.DB, query string) (*notmuch.Query, error) { q := ndb.NewQuery(query) - q.SetExcludeScheme(notmuch.EXCLUDE_TRUE) + q.SetExcludeScheme(notmuch.EXCLUDE_ALL) q.SetSortScheme(notmuch.SORT_OLDEST_FIRST) for _, t := range db.excludedTags { err := q.AddTagExclude(t) @@ -125,18 +129,37 @@ func (db *DB) MsgIDsFromQuery(q string) ([]string, error) { return err } defer query.Close() - msgs, err := query.Messages() + msgIDs, err = msgIdsFromQuery(query) + return err + }) + return msgIDs, err +} + +func (db *DB) ThreadsFromQuery(q string) ([]*types.Thread, error) { + var res []*types.Thread + err := db.withConnection(false, func(ndb *notmuch.DB) error { + query, err := db.newQuery(ndb, q) if err != nil { return err } - defer msgs.Close() - var msg *notmuch.Message - for msgs.Next(&msg) { - msgIDs = append(msgIDs, msg.ID()) + defer query.Close() + qMsgIDs, err := msgIdsFromQuery(query) + if err != nil { + return err } - return nil + valid := make(map[string]struct{}) + for _, id := range qMsgIDs { + valid[id] = struct{}{} + } + threads, err := query.Threads() + if err != nil { + return err + } + defer threads.Close() + res, err = db.enumerateThread(threads, valid) + return err }) - return msgIDs, err + return res, err } type MessageCount struct { @@ -236,3 +259,85 @@ func (db *DB) MsgModifyTags(key string, add, remove []string) error { }) return err } + +func msgIdsFromQuery(query *notmuch.Query) ([]string, error) { + var msgIDs []string + msgs, err := query.Messages() + if err != nil { + return nil, err + } + defer msgs.Close() + var msg *notmuch.Message + for msgs.Next(&msg) { + msgIDs = append(msgIDs, msg.ID()) + } + return msgIDs, nil +} + +func (db *DB) UidFromKey(key string) uint32 { + return db.uidStore.GetOrInsert(key) +} + +func (db *DB) KeyFromUid(uid uint32) (string, bool) { + return db.uidStore.GetKey(uid) +} + +func (db *DB) enumerateThread(nt *notmuch.Threads, + valid map[string]struct{}) ([]*types.Thread, error) { + var res []*types.Thread + var thread *notmuch.Thread + for nt.Next(&thread) { + root := db.makeThread(nil, thread.TopLevelMessages(), valid) + res = append(res, root) + } + return res, nil +} + +func (db *DB) makeThread(parent *types.Thread, msgs *notmuch.Messages, + valid map[string]struct{}) *types.Thread { + var lastSibling *types.Thread + var msg *notmuch.Message + for msgs.Next(&msg) { + msgID := msg.ID() + _, inQuery := valid[msgID] + node := &types.Thread{ + Uid: db.uidStore.GetOrInsert(msgID), + Parent: parent, + Hidden: !inQuery, + } + if parent != nil && parent.FirstChild == nil { + parent.FirstChild = node + } + if lastSibling != nil { + if lastSibling.NextSibling != nil { + panic(fmt.Sprintf( + "%v already had a NextSibling, tried setting it", + lastSibling)) + } + lastSibling.NextSibling = node + } + lastSibling = node + replies, err := msg.Replies() + if err != nil { + // if there are no replies it will return an error + continue + } + defer replies.Close() + db.makeThread(node, replies, valid) + } + + // We want to return the root node + var root *types.Thread + if parent != nil { + root = parent + } else if lastSibling != nil { + root = lastSibling // first iteration has no parent + } else { + return nil // we don't have any messages at all + } + + for ; root.Parent != nil; root = root.Parent { + // move to the root + } + return root +} diff --git a/worker/notmuch/lib/thread.go b/worker/notmuch/lib/thread.go new file mode 100644 index 0000000..297260d --- /dev/null +++ b/worker/notmuch/lib/thread.go @@ -0,0 +1,14 @@ +//+build notmuch + +package lib + +type ThreadNode struct { + Uid string + From string + Subject string + InQuery bool // is the msg included in the query + + Parent *ThreadNode + NextSibling *ThreadNode + FirstChild *ThreadNode +} diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go index f8f8b11..dc362af 100644 --- a/worker/notmuch/worker.go +++ b/worker/notmuch/worker.go @@ -107,6 +107,8 @@ func (w *worker) handleMessage(msg types.WorkerMessage) error { return w.handleOpenDirectory(msg) case *types.FetchDirectoryContents: return w.handleFetchDirectoryContents(msg) + case *types.FetchDirectoryThreaded: + return w.handleFetchDirectoryThreaded(msg) case *types.FetchMessageHeaders: return w.handleFetchMessageHeaders(msg) case *types.FetchMessageBodyPart: @@ -157,7 +159,6 @@ func (w *worker) handleConfigure(msg *types.Configure) error { return fmt.Errorf("could not resolve home directory: %v", err) } pathToDB := filepath.Join(home, u.Path) - w.uidStore = uidstore.NewStore() err = w.loadQueryMap(msg.Config) if err != nil { return fmt.Errorf("could not load query map configuration: %v", err) @@ -267,6 +268,17 @@ func (w *worker) handleFetchDirectoryContents( return nil } +func (w *worker) handleFetchDirectoryThreaded( + msg *types.FetchDirectoryThreaded) error { + // w.currentSortCriteria = msg.SortCriteria + err := w.emitDirectoryThreaded(msg) + if err != nil { + return err + } + w.done(msg) + return nil +} + func (w *worker) handleFetchMessageHeaders( msg *types.FetchMessageHeaders) error { for _, uid := range msg.Uids { @@ -294,7 +306,7 @@ func (w *worker) uidsFromQuery(query string) ([]uint32, error) { } var uids []uint32 for _, id := range msgIDs { - uid := w.uidStore.GetOrInsert(id) + uid := w.db.UidFromKey(id) uids = append(uids, uid) } @@ -302,7 +314,7 @@ func (w *worker) uidsFromQuery(query string) ([]uint32, error) { } func (w *worker) msgFromUid(uid uint32) (*Message, error) { - key, ok := w.uidStore.GetKey(uid) + key, ok := w.db.KeyFromUid(uid) if !ok { return nil, fmt.Errorf("Invalid uid: %v", uid) } @@ -528,6 +540,18 @@ func (w *worker) emitDirectoryContents(parent types.WorkerMessage) error { return nil } +func (w *worker) emitDirectoryThreaded(parent types.WorkerMessage) error { + threads, err := w.db.ThreadsFromQuery(w.query) + if err != nil { + return err + } + w.w.PostMessage(&types.DirectoryThreaded{ + Message: types.RespondTo(parent), + Threads: threads, + }, nil) + return nil +} + func (w *worker) emitMessageInfo(m *Message, parent types.WorkerMessage) error { info, err := m.MessageInfo() diff --git a/worker/types/messages.go b/worker/types/messages.go index 599e870..fb701bd 100644 --- a/worker/types/messages.go +++ b/worker/types/messages.go @@ -81,11 +81,21 @@ type FetchDirectoryContents struct { SortCriteria []*SortCriterion } +type FetchDirectoryThreaded struct { + Message + SortCriteria []*SortCriterion +} + type SearchDirectory struct { Message Argv []string } +type DirectoryThreaded struct { + Message + Threads []*Thread +} + type CreateDirectory struct { Message Directory string diff --git a/worker/types/thread.go b/worker/types/thread.go new file mode 100644 index 0000000..09f9dbb --- /dev/null +++ b/worker/types/thread.go @@ -0,0 +1,99 @@ +package types + +import ( + "errors" + "fmt" +) + +type Thread struct { + Uid uint32 + Parent *Thread + PrevSibling *Thread + NextSibling *Thread + FirstChild *Thread + + Hidden bool // if this flag is set the message isn't rendered in the UI + Deleted bool // if this flag is set the message was deleted +} + +func (t *Thread) Walk(walkFn NewThreadWalkFn) error { + err := newWalk(t, walkFn, 0, nil) + if err == ErrSkipThread { + return nil + } + return err +} + +func (t *Thread) String() string { + if t == nil { + return "" + } + parent := -1 + if t.Parent != nil { + parent = int(t.Parent.Uid) + } + next := -1 + if t.NextSibling != nil { + next = int(t.NextSibling.Uid) + } + child := -1 + if t.FirstChild != nil { + child = int(t.FirstChild.Uid) + } + return fmt.Sprintf( + "[%d] (parent:%v, next:%v, child:%v)", + t.Uid, parent, next, child, + ) +} + +func newWalk(node *Thread, walkFn NewThreadWalkFn, lvl int, ce error) error { + if node == nil { + return nil + } + err := walkFn(node, lvl, ce) + if err != nil { + return err + } + for child := node.FirstChild; child != nil; child = child.NextSibling { + err = newWalk(child, walkFn, lvl+1, err) + if err == ErrSkipThread { + err = nil + continue + } else if err != nil { + return err + } + } + return nil +} + +var ErrSkipThread = errors.New("skip this Thread") + +type NewThreadWalkFn func(t *Thread, level int, currentErr error) error + +//Implement interface to be able to sort threads by newest (max UID) +type ByUID []*Thread + +func getMaxUID(thread *Thread) uint32 { + // TODO: should we make this part of the Thread type to avoid recomputation? + var Uid uint32 + + thread.Walk(func(t *Thread, _ int, currentErr error) error { + if t.Uid > Uid { + Uid = t.Uid + } + return nil + }) + return Uid +} + +func (s ByUID) Len() int { + return len(s) +} +func (s ByUID) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s ByUID) Less(i, j int) bool { + maxUID_i := getMaxUID(s[i]) + maxUID_j := getMaxUID(s[j]) + return maxUID_i < maxUID_j +} diff --git a/worker/types/thread_test.go b/worker/types/thread_test.go new file mode 100644 index 0000000..e79dddd --- /dev/null +++ b/worker/types/thread_test.go @@ -0,0 +1,108 @@ +package types + +import ( + "fmt" + "strings" + "testing" +) + +func genFakeTree() *Thread { + tree := &Thread{ + Uid: 0, + } + var prevChild *Thread + for i := 1; i < 3; i++ { + child := &Thread{ + Uid: uint32(i * 10), + Parent: tree, + PrevSibling: prevChild, + } + if prevChild != nil { + prevChild.NextSibling = child + } else if tree.FirstChild == nil { + tree.FirstChild = child + } else { + panic("unreachable") + } + prevChild = child + var prevSecond *Thread + for j := 1; j < 3; j++ { + second := &Thread{ + Uid: child.Uid + uint32(j), + Parent: child, + PrevSibling: prevSecond, + } + if prevSecond != nil { + prevSecond.NextSibling = second + } else if child.FirstChild == nil { + child.FirstChild = second + } else { + panic("unreachable") + } + prevSecond = second + var prevThird *Thread + limit := 3 + if j == 2 { + limit = 8 + } + for k := 1; k < limit; k++ { + third := &Thread{ + Uid: second.Uid*10 + uint32(k), + Parent: second, + PrevSibling: prevThird, + } + if prevThird != nil { + prevThird.NextSibling = third + } else if second.FirstChild == nil { + second.FirstChild = third + } else { + panic("unreachable") + } + prevThird = third + } + } + } + return tree +} + +func TestNewWalk(t *testing.T) { + tree := genFakeTree() + var prefix []string + lastLevel := 0 + tree.Walk(func(t *Thread, lvl int, e error) error { + // if t.Uid%2 != 0 { + // return ErrSkipThread + // } + if e != nil { + fmt.Printf("ERROR: %v\n", e) + } + if lvl > lastLevel && lvl > 1 { + // we actually just descended... so figure out what connector we need + // level 1 is flush to the root, so we avoid the indentation there + if t.Parent.NextSibling != nil { + prefix = append(prefix, "│ ") + } else { + prefix = append(prefix, " ") + } + } else if lvl < lastLevel { + //ascended, need to trim the prefix layers + diff := lastLevel - lvl + prefix = prefix[:len(prefix)-diff] + } + + var arrow string + if t.Parent != nil { + if t.NextSibling != nil { + arrow = "├─>" + } else { + arrow = "└─>" + } + } + + // format + fmt.Printf("%s%s%s\n", strings.Join(prefix, ""), arrow, t) + + lastLevel = lvl + return nil + }) +} -- cgit v1.2.3