Skip to content
Snippets Groups Projects
pusher.rs 10 KiB
Newer Older
  • Learn to ignore specific revisions
  • use crate::{Database, Error, PduEvent, Result};
    use log::{error, info, warn};
    
        api::{
            client::r0::push::{Pusher, PusherKind},
            push_gateway::send_event_notification::{
                self,
                v1::{Device, Notification, NotificationCounts, NotificationPriority},
    
    Jonas Platte's avatar
    Jonas Platte committed
        events::{room::power_levels::PowerLevelsEventContent, EventType},
        push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
    
    use std::{convert::TryFrom, fmt::Debug};
    
    #[derive(Debug, Clone)]
    pub struct PushData {
        /// UserId + pushkey -> Pusher
        pub(super) senderkey_pusher: sled::Tree,
    }
    
    impl PushData {
        pub fn new(db: &sled::Db) -> Result<Self> {
            Ok(Self {
                senderkey_pusher: db.open_tree("senderkey_pusher")?,
            })
        }
    
        pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
            let mut key = sender.as_bytes().to_vec();
    
    Timo Kösters's avatar
    Timo Kösters committed
            key.push(0xff);
    
            key.extend_from_slice(pusher.pushkey.as_bytes());
    
    
            // There are 2 kinds of pushers but the spec says: null deletes the pusher.
            if pusher.kind.is_none() {
                return self
                    .senderkey_pusher
                    .remove(key)
                    .map(|_| ())
                    .map_err(Into::into);
            }
    
    
            self.senderkey_pusher.insert(
    
                &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
            )?;
    
            Ok(())
        }
    
    
        pub fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<Pusher>> {
            self.senderkey_pusher
                .get(senderkey)?
                .map(|push| {
                    Ok(serde_json::from_slice(&*push)
                        .map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
                })
                .transpose()
        }
    
        pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
    
    Timo Kösters's avatar
    Timo Kösters committed
            let mut prefix = sender.as_bytes().to_vec();
            prefix.push(0xff);
    
    
            self.senderkey_pusher
    
                .scan_prefix(prefix)
    
    Timo Kösters's avatar
    Timo Kösters committed
                .map(|push| {
    
                    let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
    
                    Ok(serde_json::from_slice(&*push)
                        .map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
                })
    
    
        pub fn get_pusher_senderkeys(&self, sender: &UserId) -> impl Iterator<Item = Result<IVec>> {
            let mut prefix = sender.as_bytes().to_vec();
            prefix.push(0xff);
    
            self.senderkey_pusher
                .scan_prefix(prefix)
                .keys()
                .map(|r| Ok(r?))
        }
    
    }
    
    pub async fn send_request<T: OutgoingRequest>(
        globals: &crate::database::globals::Globals,
        destination: &str,
        request: T,
    ) -> Result<T::IncomingResponse>
    where
        T: Debug,
    {
        let destination = destination.replace("/_matrix/push/v1/notify", "");
    
        let http_request = request
            .try_into_http_request(&destination, Some(""))
            .map_err(|e| {
                warn!("Failed to find destination {}: {}", destination, e);
                Error::BadServerResponse("Invalid destination")
            })?;
    
    
    Timo Kösters's avatar
    Timo Kösters committed
        let reqwest_request = reqwest::Request::try_from(http_request)
    
            .expect("all http requests are valid reqwest requests");
    
        // TODO: we could keep this very short and let expo backoff do it's thing...
    
    Timo Kösters's avatar
    Timo Kösters committed
        //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5));
    
    
        let url = reqwest_request.url().clone();
    
        let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
    
    
        // Because reqwest::Response -> http::Response is complicated:
        match reqwest_response {
            Ok(mut reqwest_response) => {
                let status = reqwest_response.status();
                let mut http_response = http::Response::builder().status(status);
                let headers = http_response.headers_mut().unwrap();
    
                for (k, v) in reqwest_response.headers_mut().drain() {
                    if let Some(key) = k {
                        headers.insert(key, v);
                    }
                }
    
                let status = reqwest_response.status();
    
                let body = reqwest_response
                    .bytes()
                    .await
                    .unwrap_or_else(|e| {
                        warn!("server error {}", e);
                        Vec::new().into()
                    }) // TODO: handle timeout
                    .into_iter()
                    .collect::<Vec<_>>();
    
                if status != 200 {
                    info!(
                        "Push gateway returned bad response {} {}\n{}\n{:?}",
                        destination,
                        status,
                        url,
                        crate::utils::string_from_bytes(&body)
                    );
                }
    
                let response = T::IncomingResponse::try_from(
                    http_response
                        .body(body)
                        .expect("reqwest body is valid http body"),
                );
                response.map_err(|_| {
                    info!(
                        "Push gateway returned invalid response bytes {}\n{}",
                        destination, url
                    );
                    Error::BadServerResponse("Push gateway returned bad response.")
                })
            }
            Err(e) => Err(e.into()),
    
        }
    }
    
    pub async fn send_push_notice(
        user: &UserId,
    
        ruleset: Ruleset,
        pdu: &PduEvent,
    
    Jonas Platte's avatar
    Jonas Platte committed
        let mut notify = None;
        let mut tweaks = Vec::new();
    
    
        for action in get_actions(user, &ruleset, pdu, db)? {
    
    Jonas Platte's avatar
    Jonas Platte committed
            let n = match action {
                Action::DontNotify => false,
                // TODO: Implement proper support for coalesce
                Action::Notify | Action::Coalesce => true,
                Action::SetTweak(tweak) => {
                    tweaks.push(tweak.clone());
                    continue;
    
    Jonas Platte's avatar
    Jonas Platte committed
            };
    
            if notify.is_some() {
                return Err(Error::bad_database(
                    r#"Malformed pushrule contains more than one of these actions: ["dont_notify", "notify", "coalesce"]"#,
                ));
    
    Jonas Platte's avatar
    Jonas Platte committed
    
            notify = Some(n);
        }
    
    
        if notify == Some(true) {
    
    Jonas Platte's avatar
    Jonas Platte committed
            send_notice(unread, pusher, tweaks, pdu, db).await?;
    
        // Else the event triggered no actions
    
    pub fn get_actions<'a>(
        user: &UserId,
        ruleset: &'a Ruleset,
        pdu: &PduEvent,
        db: &Database,
    ) -> Result<impl 'a + Iterator<Item = Action>> {
        let power_levels: PowerLevelsEventContent = db
            .rooms
            .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
            .map(|ev| {
                serde_json::from_value(ev.content)
                    .map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
            })
            .transpose()?
            .unwrap_or_default();
    
        let ctx = PushConditionRoomCtx {
            room_id: pdu.room_id.clone(),
            member_count: (db.rooms.room_members(&pdu.room_id).count() as u32).into(),
            user_display_name: db
                .users
                .displayname(&user)?
                .unwrap_or(user.localpart().to_owned()),
            users_power_levels: power_levels.users,
            default_power_level: power_levels.users_default,
            notification_power_levels: power_levels.notifications,
        };
    
        Ok(ruleset
            .get_actions(&pdu.to_sync_room_event(), &ctx)
            .map(Clone::clone))
    }
    
    
    async fn send_notice(
        unread: UInt,
    
        tweaks: Vec<Tweak>,
        event: &PduEvent,
        db: &Database,
    ) -> Result<()> {
    
        if pusher.kind == Some(PusherKind::Email) {
    
    
        // TODO:
        // Two problems with this
        // 1. if "event_id_only" is the only format kind it seems we should never add more info
        // 2. can pusher/devices have conflicting formats
    
        let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
        let url = if let Some(url) = pusher.data.url.as_ref() {
            url
        } else {
            error!("Http Pusher must have URL specified.");
            return Ok(());
        };
    
        let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
        let mut data_minus_url = pusher.data.clone();
        // The url must be stripped off according to spec
        data_minus_url.url = None;
        device.data = Some(data_minus_url);
    
        // Tweaks are only added if the format is NOT event_id_only
        if !event_id_only {
            device.tweaks = tweaks.clone();
        }
    
        let d = &[device];
        let mut notifi = Notification::new(d);
    
        notifi.prio = NotificationPriority::Low;
        notifi.event_id = Some(&event.event_id);
        notifi.room_id = Some(&event.room_id);
        // TODO: missed calls
        notifi.counts = NotificationCounts::new(unread, uint!(0));
    
        if event.kind == EventType::RoomEncrypted
            || tweaks
                .iter()
                .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
        {
            notifi.prio = NotificationPriority::High
        }
    
        if event_id_only {
            send_request(
                &db.globals,
                &url,
                send_event_notification::v1::Request::new(notifi),
            )
            .await?;
        } else {
            notifi.sender = Some(&event.sender);
            notifi.event_type = Some(&event.kind);
            notifi.content = serde_json::value::to_raw_value(&event.content).ok();
    
            if event.kind == EventType::RoomMember {
                notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str());
    
            let user_name = db.users.displayname(&event.sender)?;
            notifi.sender_display_name = user_name.as_deref();
            let room_name = db
                .rooms
                .room_state_get(&event.room_id, &EventType::RoomName, "")?
                .map(|pdu| match pdu.content.get("name") {
                    Some(serde_json::Value::String(s)) => Some(s.to_string()),
                    _ => None,
                })
                .flatten();
            notifi.room_name = room_name.as_deref();
    
            send_request(
                &db.globals,
                &url,
                send_event_notification::v1::Request::new(notifi),
            )
            .await?;