1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
mod data;
pub use data::Data;
use ruma::{
api::client::{error::ErrorKind, threads::get_threads::v1::IncludeThreads},
events::relation::BundledThread,
uint, CanonicalJsonValue, EventId, RoomId, UserId,
};
use serde_json::json;
use crate::{services, Error, PduEvent, Result};
pub struct Service {
pub db: &'static dyn Data,
}
impl Service {
pub fn threads_until<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
until: u64,
include: &'a IncludeThreads,
) -> Result<impl Iterator<Item = Result<(u64, PduEvent)>> + 'a> {
self.db.threads_until(user_id, room_id, until, include)
}
pub fn add_to_thread<'a>(&'a self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
let root_id = &services()
.rooms
.timeline
.get_pdu_id(root_event_id)?
.ok_or_else(|| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid event id in thread message",
)
})?;
let root_pdu = services()
.rooms
.timeline
.get_pdu_from_id(root_id)?
.ok_or_else(|| {
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
})?;
let mut root_pdu_json = services()
.rooms
.timeline
.get_pdu_json_from_id(root_id)?
.ok_or_else(|| {
Error::BadRequest(ErrorKind::InvalidParam, "Thread root pdu not found")
})?;
if let CanonicalJsonValue::Object(unsigned) = root_pdu_json
.entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
{
if let Some(mut relations) = unsigned
.get("m.relations")
.and_then(|r| r.as_object())
.and_then(|r| r.get("m.thread"))
.and_then(|relations| {
serde_json::from_value::<BundledThread>(relations.clone().into()).ok()
})
{
// Thread already existed
relations.count += uint!(1);
relations.latest_event = pdu.to_message_like_event();
let content = serde_json::to_value(relations).expect("to_value always works");
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
} else {
// New thread
let relations = BundledThread {
latest_event: pdu.to_message_like_event(),
count: uint!(1),
current_user_participated: true,
};
let content = serde_json::to_value(relations).expect("to_value always works");
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
}
services()
.rooms
.timeline
.replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
}
let mut users = Vec::new();
if let Some(userids) = self.db.get_participants(&root_id)? {
users.extend_from_slice(&userids);
users.push(pdu.sender.clone());
} else {
users.push(root_pdu.sender);
users.push(pdu.sender.clone());
}
self.db.update_participants(root_id, &users)
}
}
|