summaryrefslogtreecommitdiff
path: root/src/service/rooms/threads/mod.rs
blob: fb703839d4672d3d3a290a03f8f7b2c7416d4405 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
mod data;

pub use data::Data;
use ruma::{
    api::client::{error::ErrorKind, threads::get_threads::v1::IncludeThreads},
    events::relation::BundledThread,
    uint, CanonicalJsonValue, EventId, RoomId, UserId,
};

use serde_json::json;

use crate::{services, Error, PduEvent, Result};

pub struct Service {
    pub db: &'static dyn Data,
}

impl Service {
    pub fn threads_until<'a>(
        &'a self,
        user_id: &'a UserId,
        room_id: &'a RoomId,
        until: u64,
        include: &'a IncludeThreads,
    ) -> Result<impl Iterator<Item = Result<(u64, PduEvent)>> + 'a> {
        self.db.threads_until(user_id, room_id, until, include)
    }

    pub fn add_to_thread<'a>(&'a self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
        let root_id = &services()
            .rooms
            .timeline
            .get_pdu_id(root_event_id)?
            .ok_or_else(|| {
                Error::BadRequest(
                    ErrorKind::InvalidParam,
                    "Invalid event id in thread message",
                )
            })?;

        let root_pdu = services()
            .rooms
            .timeline
            .get_pdu_from_id(root_id)?
            .ok_or_else(|| {
                Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
            })?;

        let mut root_pdu_json = services()
            .rooms
            .timeline
            .get_pdu_json_from_id(root_id)?
            .ok_or_else(|| {
                Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
            })?;

        if let CanonicalJsonValue::Object(unsigned) = root_pdu_json
            .entry("unsigned".to_owned())
            .or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
        {
            if let Some(mut relations) = unsigned
                .get("m.relations")
                .and_then(|r| r.as_object())
                .and_then(|r| r.get("m.thread"))
                .and_then(|relations| {
                    serde_json::from_value::<BundledThread>(relations.clone().into()).ok()
                })
            {
                // Thread already existed
                relations.count += uint!(1);
                relations.latest_event = pdu.to_message_like_event();

                let content = serde_json::to_value(relations).expect("to_value always works");

                unsigned.insert(
                    "m.relations".to_owned(),
                    json!({ "m.thread": content })
                        .try_into()
                        .expect("thread is valid json"),
                );
            } else {
                // New thread
                let relations = BundledThread {
                    latest_event: pdu.to_message_like_event(),
                    count: uint!(1),
                    current_user_participated: true,
                };

                let content = serde_json::to_value(relations).expect("to_value always works");

                unsigned.insert(
                    "m.relations".to_owned(),
                    json!({ "m.thread": content })
                        .try_into()
                        .expect("thread is valid json"),
                );
            }

            services()
                .rooms
                .timeline
                .replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
        }

        let mut users = Vec::new();
        if let Some(userids) = self.db.get_participants(&root_id)? {
            users.extend_from_slice(&userids);
            users.push(pdu.sender.clone());
        } else {
            users.push(root_pdu.sender);
            users.push(pdu.sender.clone());
        }

        self.db.update_participants(root_id, &users)
    }
}