Newer
Older
use crate::{Database, Error, PduEvent, Result};
use log::{error, info, warn};
api::{
client::r0::push::{Pusher, PusherKind},
push_gateway::send_event_notification::{
self,
v1::{Device, Notification, NotificationCounts, NotificationPriority},
events::{room::power_levels::PowerLevelsEventContent, EventType},
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
uint, UInt, UserId,
#[derive(Debug, Clone)]
pub struct PushData {
/// UserId + pushkey -> Pusher
pub(super) senderkey_pusher: sled::Tree,
}
impl PushData {
pub fn new(db: &sled::Db) -> Result<Self> {
Ok(Self {
senderkey_pusher: db.open_tree("senderkey_pusher")?,
})
}
pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
let mut key = sender.as_bytes().to_vec();
key.extend_from_slice(pusher.pushkey.as_bytes());
// There are 2 kinds of pushers but the spec says: null deletes the pusher.
if pusher.kind.is_none() {
return self
.senderkey_pusher
.remove(key)
.map(|_| ())
.map_err(Into::into);
}
self.senderkey_pusher.insert(
&*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
)?;
Ok(())
}
pub fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<Pusher>> {
self.senderkey_pusher
.get(senderkey)?
.map(|push| {
Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
})
.transpose()
}
pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
self.senderkey_pusher
let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
})
pub fn get_pusher_senderkeys(&self, sender: &UserId) -> impl Iterator<Item = Result<IVec>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
self.senderkey_pusher
.scan_prefix(prefix)
.keys()
.map(|r| Ok(r?))
}
}
pub async fn send_request<T: OutgoingRequest>(
globals: &crate::database::globals::Globals,
destination: &str,
request: T,
) -> Result<T::IncomingResponse>
where
T: Debug,
{
let destination = destination.replace("/_matrix/push/v1/notify", "");
let http_request = request
.try_into_http_request(&destination, Some(""))
.map_err(|e| {
warn!("Failed to find destination {}: {}", destination, e);
Error::BadServerResponse("Invalid destination")
})?;
let reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests");
// TODO: we could keep this very short and let expo backoff do it's thing...
//*reqwest_request.timeout_mut() = Some(Duration::from_secs(5));
let url = reqwest_request.url().clone();
let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Because reqwest::Response -> http::Response is complicated:
match reqwest_response {
Ok(mut reqwest_response) => {
let status = reqwest_response.status();
let mut http_response = http::Response::builder().status(status);
let headers = http_response.headers_mut().unwrap();
for (k, v) in reqwest_response.headers_mut().drain() {
if let Some(key) = k {
headers.insert(key, v);
}
}
let status = reqwest_response.status();
let body = reqwest_response
.bytes()
.await
.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}) // TODO: handle timeout
.into_iter()
.collect::<Vec<_>>();
if status != 200 {
info!(
"Push gateway returned bad response {} {}\n{}\n{:?}",
destination,
status,
url,
crate::utils::string_from_bytes(&body)
);
}
let response = T::IncomingResponse::try_from(
http_response
.body(body)
.expect("reqwest body is valid http body"),
);
response.map_err(|_| {
info!(
"Push gateway returned invalid response bytes {}\n{}",
destination, url
);
Error::BadServerResponse("Push gateway returned bad response.")
})
}
Err(e) => Err(e.into()),
}
}
pub async fn send_push_notice(
user: &UserId,
ruleset: Ruleset,
pdu: &PduEvent,
for action in get_actions(user, &ruleset, pdu, db)? {
let n = match action {
Action::DontNotify => false,
// TODO: Implement proper support for coalesce
Action::Notify | Action::Coalesce => true,
Action::SetTweak(tweak) => {
tweaks.push(tweak.clone());
continue;
};
if notify.is_some() {
return Err(Error::bad_database(
r#"Malformed pushrule contains more than one of these actions: ["dont_notify", "notify", "coalesce"]"#,
));
// Else the event triggered no actions
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
pub fn get_actions<'a>(
user: &UserId,
ruleset: &'a Ruleset,
pdu: &PduEvent,
db: &Database,
) -> Result<impl 'a + Iterator<Item = Action>> {
let power_levels: PowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content)
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
let ctx = PushConditionRoomCtx {
room_id: pdu.room_id.clone(),
member_count: (db.rooms.room_members(&pdu.room_id).count() as u32).into(),
user_display_name: db
.users
.displayname(&user)?
.unwrap_or(user.localpart().to_owned()),
users_power_levels: power_levels.users,
default_power_level: power_levels.users_default,
notification_power_levels: power_levels.notifications,
};
Ok(ruleset
.get_actions(&pdu.to_sync_room_event(), &ctx)
.map(Clone::clone))
}
async fn send_notice(
unread: UInt,
tweaks: Vec<Tweak>,
event: &PduEvent,
db: &Database,
) -> Result<()> {
if pusher.kind == Some(PusherKind::Email) {
// TODO:
// Two problems with this
// 1. if "event_id_only" is the only format kind it seems we should never add more info
// 2. can pusher/devices have conflicting formats
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
let url = if let Some(url) = pusher.data.url.as_ref() {
url
} else {
error!("Http Pusher must have URL specified.");
return Ok(());
};
let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
let mut data_minus_url = pusher.data.clone();
// The url must be stripped off according to spec
data_minus_url.url = None;
device.data = Some(data_minus_url);
// Tweaks are only added if the format is NOT event_id_only
if !event_id_only {
device.tweaks = tweaks.clone();
}
let d = &[device];
let mut notifi = Notification::new(d);
notifi.prio = NotificationPriority::Low;
notifi.event_id = Some(&event.event_id);
notifi.room_id = Some(&event.room_id);
// TODO: missed calls
notifi.counts = NotificationCounts::new(unread, uint!(0));
if event.kind == EventType::RoomEncrypted
|| tweaks
.iter()
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
{
notifi.prio = NotificationPriority::High
}
if event_id_only {
send_request(
&db.globals,
&url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
} else {
notifi.sender = Some(&event.sender);
notifi.event_type = Some(&event.kind);
notifi.content = serde_json::value::to_raw_value(&event.content).ok();
if event.kind == EventType::RoomMember {
notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str());
let user_name = db.users.displayname(&event.sender)?;
notifi.sender_display_name = user_name.as_deref();
let room_name = db
.rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")?
.map(|pdu| match pdu.content.get("name") {
Some(serde_json::Value::String(s)) => Some(s.to_string()),
_ => None,
})
.flatten();
notifi.room_name = room_name.as_deref();
send_request(
&db.globals,
&url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
// TODO: email