From 2d69e816997d9bf4f51b6e35c6a9c408fb1c144a Mon Sep 17 00:00:00 2001
From: Devin Ragotzy <devin.ragotzy@gmail.com>
Date: Tue, 26 Jan 2021 21:54:35 -0500
Subject: [PATCH] WIP: send out push notification, impl pusher routes

It seems in order to test this I may also have to impl the email 3pid
route? I need to call the set_pusher route somehow.
---
 src/client_server/account.rs |  14 ++
 src/client_server/push.rs    |  34 ++-
 src/database.rs              |  11 +-
 src/database/pusher.rs       | 148 +++++++++++
 src/database/rooms.rs        |   3 +
 src/database/sending.rs      | 468 +++++++++++++++++++++++------------
 6 files changed, 513 insertions(+), 165 deletions(-)
 create mode 100644 src/database/pusher.rs

diff --git a/src/client_server/account.rs b/src/client_server/account.rs
index 76354b651..9f6c576c4 100644
--- a/src/client_server/account.rs
+++ b/src/client_server/account.rs
@@ -659,3 +659,17 @@ pub async fn deactivate_route(
     }
     .into())
 }
+
+/*/
+#[cfg_attr(
+    feature = "conduit_bin",
+    get("/_matrix/client/r0/account/3pid", data = "<body>")
+)]
+pub async fn third_party_route(
+    body: Ruma<account::add_3pid::Request<'_>>,
+) -> ConduitResult<account::add_3pid::Response> {
+    let sender_user = body.sender_user.as_ref().expect("user is authenticated");
+
+    Ok(account::add_3pid::Response::default().into())
+}
+*/
diff --git a/src/client_server/push.rs b/src/client_server/push.rs
index fd938c1cb..3a8167973 100644
--- a/src/client_server/push.rs
+++ b/src/client_server/push.rs
@@ -666,20 +666,36 @@ pub async fn delete_pushrule_route(
     Ok(delete_pushrule::Response.into())
 }
 
-#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))]
-pub async fn get_pushers_route() -> ConduitResult<get_pushers::Response> {
+#[cfg_attr(
+    feature = "conduit_bin",
+    get("/_matrix/client/r0/pushers", data = "<body>")
+)]
+pub async fn get_pushers_route(
+    db: State<'_, Database>,
+    body: Ruma<get_pushers::Request>,
+) -> ConduitResult<get_pushers::Response> {
+    let sender = body.sender_user.as_ref().expect("authenticated endpoint");
+
     Ok(get_pushers::Response {
-        pushers: Vec::new(),
+        pushers: db.pusher.get_pusher(sender)?,
     }
     .into())
 }
 
-#[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/pushers/set"))]
-pub async fn set_pushers_route(db: State<'_, Database>) -> ConduitResult<get_pushers::Response> {
+#[cfg_attr(
+    feature = "conduit_bin",
+    post("/_matrix/client/r0/pushers/set", data = "<body>")
+)]
+pub async fn set_pushers_route(
+    db: State<'_, Database>,
+    body: Ruma<set_pusher::Request>,
+) -> ConduitResult<set_pusher::Response> {
+    let sender = body.sender_user.as_ref().expect("authenticated endpoint");
+    let pusher = body.pusher.clone();
+
+    db.pusher.set_pusher(sender, pusher)?;
+
     db.flush().await?;
 
-    Ok(get_pushers::Response {
-        pushers: Vec::new(),
-    }
-    .into())
+    Ok(set_pusher::Response::default().into())
 }
diff --git a/src/database.rs b/src/database.rs
index 7ad18cb0e..b8dc52415 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -4,6 +4,7 @@
 pub mod globals;
 pub mod key_backups;
 pub mod media;
+pub mod pusher;
 pub mod rooms;
 pub mod sending;
 pub mod transaction_ids;
@@ -17,9 +18,11 @@
 use rocket::futures::{self, channel::mpsc};
 use ruma::{DeviceId, ServerName, UserId};
 use serde::Deserialize;
-use std::collections::HashMap;
-use std::fs::remove_dir_all;
-use std::sync::{Arc, RwLock};
+use std::{
+    collections::HashMap,
+    fs::remove_dir_all,
+    sync::{Arc, RwLock},
+};
 use tokio::sync::Semaphore;
 
 #[derive(Clone, Debug, Deserialize)]
@@ -73,6 +76,7 @@ pub struct Database {
     pub sending: sending::Sending,
     pub admin: admin::Admin,
     pub appservice: appservice::Appservice,
+    pub pusher: pusher::PushData,
     pub _db: sled::Db,
 }
 
@@ -187,6 +191,7 @@ pub async fn load_or_create(config: Config) -> Result<Self> {
                 cached_registrations: Arc::new(RwLock::new(HashMap::new())),
                 id_appserviceregistrations: db.open_tree("id_appserviceregistrations")?,
             },
+            pusher: pusher::PushData::new(&db)?,
             _db: db,
         };
 
diff --git a/src/database/pusher.rs b/src/database/pusher.rs
new file mode 100644
index 000000000..041085d33
--- /dev/null
+++ b/src/database/pusher.rs
@@ -0,0 +1,148 @@
+use crate::{Error, PduEvent, Result};
+use ruma::{
+    api::client::r0::push::{Pusher, PusherKind},
+    events::{
+        room::{
+            member::MemberEventContent,
+            message::{MessageEventContent, TextMessageEventContent},
+        },
+        EventType,
+    },
+    push::{PushCondition, Ruleset},
+    UserId,
+};
+
+#[derive(Debug, Clone)]
+pub struct PushData {
+    /// UserId + pushkey -> Pusher
+    pub(super) senderkey_pusher: sled::Tree,
+}
+
+impl PushData {
+    pub fn new(db: &sled::Db) -> Result<Self> {
+        Ok(Self {
+            senderkey_pusher: db.open_tree("senderkey_pusher")?,
+        })
+    }
+
+    pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
+        let mut key = sender.as_bytes().to_vec();
+        key.extend_from_slice(pusher.pushkey.as_bytes());
+
+        self.senderkey_pusher.insert(
+            key,
+            &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
+        )?;
+
+        Ok(())
+    }
+
+    pub fn get_pusher(&self, sender: &UserId) -> Result<Vec<Pusher>> {
+        self.senderkey_pusher
+            .scan_prefix(sender.as_bytes())
+            .values()
+            .map(|push: std::result::Result<sled::IVec, _>| {
+                let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
+                Ok(serde_json::from_slice(&*push)
+                    .map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
+            })
+            .collect::<Result<Vec<_>>>()
+    }
+}
+
+pub async fn send_push_notice(
+    user: &UserId,
+    pusher: &Pusher,
+    ruleset: Ruleset,
+    pdu: &PduEvent,
+) -> Result<()> {
+    for rule in ruleset.into_iter() {
+        // TODO: can actions contain contradictory Actions
+        if rule
+            .actions
+            .iter()
+            .any(|act| matches!(act, ruma::push::Action::DontNotify))
+            || !rule.enabled
+        {
+            continue;
+        }
+
+        match rule.rule_id.as_str() {
+            ".m.rule.master" => {}
+            ".m.rule.suppress_notices" => {}
+            ".m.rule.invite_for_me" => {}
+            ".m.rule.member_event" => {
+                if let EventType::RoomMember = &pdu.kind {
+                    // TODO use this?
+                    let _member = serde_json::from_value::<MemberEventContent>(pdu.content.clone())
+                        .map_err(|_| Error::bad_database("PDU contained bad message content"))?;
+                    if let Some(conditions) = rule.conditions {
+                        if conditions.iter().any(|cond| match cond {
+                            PushCondition::EventMatch { key, pattern } => {
+                                let mut json =
+                                    serde_json::to_value(pdu).expect("PDU is valid JSON");
+                                for key in key.split('.') {
+                                    json = json[key].clone();
+                                }
+                                // TODO: this is baddddd
+                                json.to_string().contains(pattern)
+                            }
+                            _ => false,
+                        }) {}
+                    }
+                }
+            }
+            ".m.rule.contains_display_name" => {
+                if let EventType::RoomMessage = &pdu.kind {
+                    let msg_content =
+                        serde_json::from_value::<MessageEventContent>(pdu.content.clone())
+                            .map_err(|_| {
+                                Error::bad_database("PDU contained bad message content")
+                            })?;
+                    if let MessageEventContent::Text(TextMessageEventContent { body, .. }) =
+                        &msg_content
+                    {
+                        if body.contains(user.localpart()) {
+                            send_notice(user, &pusher, &pdu).await?;
+                        }
+                    }
+                }
+            }
+            ".m.rule.tombstone" => {}
+            ".m.rule.roomnotif" => {}
+            ".m.rule.contains_user_name" => {
+                if let EventType::RoomMessage = &pdu.kind {
+                    let msg_content =
+                        serde_json::from_value::<MessageEventContent>(pdu.content.clone())
+                            .map_err(|_| {
+                                Error::bad_database("PDU contained bad message content")
+                            })?;
+                    if let MessageEventContent::Text(TextMessageEventContent { body, .. }) =
+                        &msg_content
+                    {
+                        if body.contains(user.localpart()) {
+                            send_notice(user, &pusher, &pdu).await?;
+                        }
+                    }
+                }
+            }
+            ".m.rule.call" => {}
+            ".m.rule.encrypted_room_one_to_one" => {}
+            ".m.rule.room_one_to_one" => {}
+            ".m.rule.message" => {}
+            ".m.rule.encrypted" => {}
+            _ => {}
+        }
+    }
+    Ok(())
+}
+
+async fn send_notice(_sender: &UserId, pusher: &Pusher, _event: &PduEvent) -> Result<()> {
+    if let Some(PusherKind::Http) = pusher.kind {
+        log::error!("YAHOOO");
+    } else {
+        // EMAIL
+        todo!("send an email")
+    }
+    Ok(())
+}
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index d459aeec5..19554f6d7 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -531,6 +531,9 @@ pub fn append_pdu(
         self.eventid_pduid
             .insert(pdu.event_id.as_bytes(), &*pdu_id)?;
 
+        // See if the event matches any known pushers
+        db.sending.send_push_pdu(&*pdu_id)?;
+
         match pdu.kind {
             EventType::RoomRedaction => {
                 if let Some(redact_id) = &pdu.redacts {
diff --git a/src/database/sending.rs b/src/database/sending.rs
index 4b0309f74..a47850183 100644
--- a/src/database/sending.rs
+++ b/src/database/sending.rs
@@ -1,43 +1,62 @@
 use std::{
     collections::HashMap,
     convert::TryFrom,
-    fmt::Debug,
+    fmt::{Debug, Display, Formatter},
     sync::Arc,
     time::{Duration, Instant, SystemTime},
 };
 
-use crate::{appservice_server, server_server, utils, Error, PduEvent, Result};
+use crate::{appservice_server, server_server, utils, Database, Error, PduEvent, Result};
 use federation::transactions::send_transaction_message;
 use log::info;
 use rocket::futures::stream::{FuturesUnordered, StreamExt};
 use ruma::{
     api::{appservice, federation, OutgoingRequest},
+    events::{push_rules, EventType},
     ServerName,
 };
 use sled::IVec;
-use tokio::select;
-use tokio::sync::Semaphore;
+use tokio::{select, sync::Semaphore};
+
+use super::{
+    account_data::AccountData, appservice::Appservice, globals::Globals, pusher::PushData,
+    rooms::Rooms,
+};
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub enum OutgoingKind {
+    Appservice(Box<ServerName>),
+    Push(Vec<u8>),
+    Normal(Box<ServerName>),
+}
+
+impl Display for OutgoingKind {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            OutgoingKind::Appservice(name) => f.write_str(name.as_str()),
+            OutgoingKind::Normal(name) => f.write_str(name.as_str()),
+            OutgoingKind::Push(_) => f.write_str("Push notification TODO"),
+        }
+    }
+}
 
 #[derive(Clone)]
 pub struct Sending {
     /// The state for a given state hash.
-    pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+)ServerName + PduId
-    pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+)ServerName + PduId (pduid can be empty for reservation)
+    pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)ServerName / UserId + PduId
+    pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
     pub(super) maximum_requests: Arc<Semaphore>,
 }
 
 impl Sending {
-    pub fn start_handler(
-        &self,
-        globals: &super::globals::Globals,
-        rooms: &super::rooms::Rooms,
-        appservice: &super::appservice::Appservice,
-    ) {
+    pub fn start_handler(&self, db: &Database) {
         let servernamepduids = self.servernamepduids.clone();
         let servercurrentpdus = self.servercurrentpdus.clone();
-        let rooms = rooms.clone();
-        let globals = globals.clone();
-        let appservice = appservice.clone();
+        let rooms = db.rooms.clone();
+        let globals = db.globals.clone();
+        let appservice = db.appservice.clone();
+        let pusher = db.pusher.clone();
+        let account_data = db.account_data.clone();
 
         tokio::spawn(async move {
             let mut futures = FuturesUnordered::new();
@@ -45,45 +64,57 @@ pub fn start_handler(
             // Retry requests we could not finish yet
             let mut current_transactions = HashMap::new();
 
-            for (server, pdu, is_appservice) in servercurrentpdus
+            for (outgoing_kind, pdu) in servercurrentpdus
                 .iter()
                 .filter_map(|r| r.ok())
                 .filter_map(|(key, _)| Self::parse_servercurrentpdus(key).ok())
-                .filter(|(_, pdu, _)| !pdu.is_empty()) // Skip reservation key
+                .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key
                 .take(50)
             // This should not contain more than 50 anyway
             {
                 current_transactions
-                    .entry((server, is_appservice))
+                    .entry(outgoing_kind)
                     .or_insert_with(Vec::new)
                     .push(pdu);
             }
 
-            for ((server, is_appservice), pdus) in current_transactions {
+            for (outgoing_kind, pdus) in current_transactions {
                 futures.push(Self::handle_event(
-                    server,
-                    is_appservice,
+                    outgoing_kind,
                     pdus,
-                    &globals,
                     &rooms,
+                    &globals,
                     &appservice,
+                    &pusher,
+                    &account_data,
                 ));
             }
 
-            let mut last_failed_try: HashMap<Box<ServerName>, (u32, Instant)> = HashMap::new();
+            let mut last_failed_try: HashMap<OutgoingKind, (u32, Instant)> = HashMap::new();
 
             let mut subscriber = servernamepduids.watch_prefix(b"");
             loop {
                 select! {
                     Some(response) = futures.next() => {
                         match response {
-                            Ok((server, is_appservice)) => {
-                                let mut prefix = if is_appservice {
-                                    b"+".to_vec()
-                                } else {
-                                    Vec::new()
+                            Ok(outgoing_kind) => {
+                                let mut prefix = match &outgoing_kind {
+                                    OutgoingKind::Appservice(server) => {
+                                        let mut p = b"+".to_vec();
+                                        p.extend_from_slice(server.as_bytes());
+                                        p
+                                    }
+                                    OutgoingKind::Push(id) => {
+                                        let mut p = b"$".to_vec();
+                                        p.extend_from_slice(&id);
+                                        p
+                                    },
+                                    OutgoingKind::Normal(server) => {
+                                        let mut p = vec![];
+                                        p.extend_from_slice(server.as_bytes());
+                                        p
+                                    },
                                 };
-                                prefix.extend_from_slice(server.as_bytes());
                                 prefix.push(0xff);
 
                                 for key in servercurrentpdus
@@ -116,22 +147,45 @@ pub fn start_handler(
                                         servernamepduids.remove(&current_key).unwrap();
                                     }
 
-                                    futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice));
+                                    futures.push(
+                                        Self::handle_event(
+                                            outgoing_kind.clone(),
+                                            new_pdus,
+                                            &rooms,
+                                            &globals,
+                                            &appservice,
+                                            &pusher,
+                                            &account_data
+                                        )
+                                    );
                                 } else {
                                     servercurrentpdus.remove(&prefix).unwrap();
                                     // servercurrentpdus with the prefix should be empty now
                                 }
                             }
-                            Err((server, is_appservice, e)) => {
-                                info!("Couldn't send transaction to {}\n{}", server, e);
-                                let mut prefix = if is_appservice {
-                                    b"+".to_vec()
-                                } else {
-                                    Vec::new()
+                            Err((outgoing_kind, e)) => {
+                                info!("Couldn't send transaction to {}\n{}", outgoing_kind, e);
+                                let mut prefix = match &outgoing_kind {
+                                    OutgoingKind::Appservice(serv) => {
+                                        let mut p = b"+".to_vec();
+                                        p.extend_from_slice(serv.as_bytes());
+                                        p
+                                    },
+                                    OutgoingKind::Push(id) => {
+                                        let mut p = b"$".to_vec();
+                                        p.extend_from_slice(&id);
+                                        p
+                                    },
+                                    OutgoingKind::Normal(serv) => {
+                                        let mut p = vec![];
+                                        p.extend_from_slice(serv.as_bytes());
+                                        p
+                                    },
                                 };
-                                prefix.extend_from_slice(server.as_bytes());
+
                                 prefix.push(0xff);
-                                last_failed_try.insert(server.clone(), match last_failed_try.get(&server) {
+
+                                last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) {
                                     Some(last_failed) => {
                                         (last_failed.0+1, Instant::now())
                                     },
@@ -157,40 +211,56 @@ pub fn start_handler(
 
                                 instant.elapsed() < min_elapsed_duration
                             };
-                            if let Some((server, is_appservice, pdu_id)) = utils::string_from_bytes(
+                            if let Some((outgoing_kind, pdu_id)) = utils::string_from_bytes(
                                     parts
                                         .next()
                                         .expect("splitn will always return 1 or more elements"),
                                 )
-                                .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid."))
-                                .map(|server_str| {
+                                .map_err(|_| Error::bad_database("[Utf8] ServerName in servernamepduid bytes are invalid."))
+                                .and_then(|ident_str| {
                                     // Appservices start with a plus
-                                    if server_str.starts_with('+') {
-                                        (server_str[1..].to_owned(), true)
+                                    Ok(if ident_str.starts_with('+') {
+                                        OutgoingKind::Appservice(
+                                            Box::<ServerName>::try_from(&ident_str[1..])
+                                                .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))?
+                                        )
+                                    } else if ident_str.starts_with('$') {
+                                        OutgoingKind::Push(ident_str[1..].as_bytes().to_vec())
                                     } else {
-                                        (server_str, false)
-                                    }
+                                        OutgoingKind::Normal(
+                                            Box::<ServerName>::try_from(ident_str)
+                                                .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))?
+                                        )
+                                    })
                                 })
-                                .and_then(|(server_str, is_appservice)| Box::<ServerName>::try_from(server_str)
-                                    .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid.")).map(|s| (s, is_appservice)))
-                                .ok()
-                                .and_then(|(server, is_appservice)| parts
+                                .and_then(|outgoing_kind| parts
                                     .next()
                                     .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db."))
-                                    .ok()
-                                    .map(|pdu_id| (server, is_appservice, pdu_id))
+                                    .map(|pdu_id| (outgoing_kind, pdu_id))
                                 )
-                                .filter(|(server, is_appservice, _)| {
-                                    if last_failed_try.get(server).map_or(false, exponential_backoff) {
+                                .ok()
+                                .filter(|(outgoing_kind, _)| {
+                                    if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) {
                                         return false;
                                     }
 
-                                    let mut prefix = if *is_appservice {
-                                        b"+".to_vec()
-                                    } else {
-                                        Vec::new()
+                                    let mut prefix = match outgoing_kind {
+                                        OutgoingKind::Appservice(serv) => {
+                                            let mut p = b"+".to_vec();
+                                            p.extend_from_slice(serv.as_bytes());
+                                            p
+                                    },
+                                        OutgoingKind::Push(id) => {
+                                            let mut p = b"$".to_vec();
+                                            p.extend_from_slice(&id);
+                                            p
+                                        },
+                                        OutgoingKind::Normal(serv) => {
+                                            let mut p = vec![];
+                                            p.extend_from_slice(serv.as_bytes());
+                                            p
+                                        },
                                     };
-                                    prefix.extend_from_slice(server.as_bytes());
                                     prefix.push(0xff);
 
                                     servercurrentpdus
@@ -201,7 +271,17 @@ pub fn start_handler(
                                 servercurrentpdus.insert(&key, &[]).unwrap();
                                 servernamepduids.remove(&key).unwrap();
 
-                                futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice));
+                                futures.push(
+                                    Self::handle_event(
+                                        outgoing_kind,
+                                        vec![pdu_id.into()],
+                                        &rooms,
+                                        &globals,
+                                        &appservice,
+                                        &pusher,
+                                        &account_data
+                                    )
+                                );
                             }
                         }
                     }
@@ -210,6 +290,22 @@ pub fn start_handler(
         });
     }
 
+    pub fn send_push_pdu(&self, pdu_id: &[u8]) -> Result<()> {
+        // Make sure we don't cause utf8 errors when parsing to a String...
+        let pduid = String::from_utf8_lossy(pdu_id).as_bytes().to_vec();
+
+        // these are valid ServerName chars
+        // (byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'.')
+        let mut key = b"$".to_vec();
+        // keep each pdu push unique
+        key.extend_from_slice(pduid.as_slice());
+        key.push(0xff);
+        key.extend_from_slice(pdu_id);
+        self.servernamepduids.insert(key, b"")?;
+
+        Ok(())
+    }
+
     pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> {
         let mut key = server.as_bytes().to_vec();
         key.push(0xff);
@@ -230,95 +326,154 @@ pub fn send_pdu_appservice(&self, appservice_id: &str, pdu_id: &[u8]) -> Result<
     }
 
     async fn handle_event(
-        server: Box<ServerName>,
-        is_appservice: bool,
+        kind: OutgoingKind,
         pdu_ids: Vec<IVec>,
-        globals: &super::globals::Globals,
-        rooms: &super::rooms::Rooms,
-        appservice: &super::appservice::Appservice,
-    ) -> std::result::Result<(Box<ServerName>, bool), (Box<ServerName>, bool, Error)> {
-        if is_appservice {
-            let pdu_jsons = pdu_ids
-                .iter()
-                .map(|pdu_id| {
-                    Ok::<_, (Box<ServerName>, Error)>(
-                        rooms
-                            .get_pdu_from_id(pdu_id)
-                            .map_err(|e| (server.clone(), e))?
-                            .ok_or_else(|| {
-                                (
-                                    server.clone(),
-                                    Error::bad_database(
-                                        "Event in servernamepduids not found in db.",
-                                    ),
+        rooms: &Rooms,
+        globals: &Globals,
+        appservice: &Appservice,
+        pusher: &PushData,
+        account_data: &AccountData,
+    ) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
+        match kind {
+            OutgoingKind::Appservice(server) => {
+                let pdu_jsons = pdu_ids
+                    .iter()
+                    .map(|pdu_id| {
+                        Ok::<_, (Box<ServerName>, Error)>(
+                            rooms
+                                .get_pdu_from_id(pdu_id)
+                                .map_err(|e| (server.clone(), e))?
+                                .ok_or_else(|| {
+                                    (
+                                        server.clone(),
+                                        Error::bad_database(
+                                            "[Appservice] Event in servernamepduids not found in ",
+                                        ),
+                                    )
+                                })?
+                                .to_any_event(),
+                        )
+                    })
+                    .filter_map(|r| r.ok())
+                    .collect::<Vec<_>>();
+                appservice_server::send_request(
+                    &globals,
+                    appservice
+                        .get_registration(server.as_str())
+                        .unwrap()
+                        .unwrap(), // TODO: handle error
+                    appservice::event::push_events::v1::Request {
+                        events: &pdu_jsons,
+                        txn_id: &utils::random_string(16),
+                    },
+                )
+                .await
+                .map(|_response| OutgoingKind::Appservice(server.clone()))
+                .map_err(|e| (OutgoingKind::Appservice(server.clone()), e))
+            }
+            OutgoingKind::Push(id) => {
+                let pdus = pdu_ids
+                    .iter()
+                    .map(|pdu_id| {
+                        Ok::<_, (Vec<u8>, Error)>(
+                            rooms
+                                .get_pdu_from_id(pdu_id)
+                                .map_err(|e| (id.clone(), e))?
+                                .ok_or_else(|| {
+                                    (
+                                        id.clone(),
+                                        Error::bad_database(
+                                            "[Push] Event in servernamepduids not found in db.",
+                                        ),
+                                    )
+                                })?,
+                        )
+                    })
+                    .filter_map(|r| r.ok())
+                    .collect::<Vec<_>>();
+                dbg!(&pdus);
+                for pdu in &pdus {
+                    for user in rooms.room_members(&pdu.room_id) {
+                        dbg!(&user);
+                        let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
+                        for pusher in pusher
+                            .get_pusher(&user)
+                            .map_err(|e| (OutgoingKind::Push(id.clone()), e))?
+                        {
+                            let rules_for_user = account_data
+                                .get::<push_rules::PushRulesEvent>(
+                                    None,
+                                    &user,
+                                    EventType::PushRules,
                                 )
-                            })?
-                            .to_any_event(),
-                    )
-                })
-                .filter_map(|r| r.ok())
-                .collect::<Vec<_>>();
-            appservice_server::send_request(
-                &globals,
-                appservice
-                    .get_registration(server.as_str())
-                    .unwrap()
-                    .unwrap(), // TODO: handle error
-                appservice::event::push_events::v1::Request {
-                    events: &pdu_jsons,
-                    txn_id: &utils::random_string(16),
-                },
-            )
-            .await
-            .map(|_response| (server.clone(), is_appservice))
-            .map_err(|e| (server, is_appservice, e))
-        } else {
-            let pdu_jsons = pdu_ids
-                .iter()
-                .map(|pdu_id| {
-                    Ok::<_, (Box<ServerName>, Error)>(
-                        // TODO: check room version and remove event_id if needed
-                        serde_json::from_str(
-                            PduEvent::convert_to_outgoing_federation_event(
-                                rooms
-                                    .get_pdu_json_from_id(pdu_id)
-                                    .map_err(|e| (server.clone(), e))?
-                                    .ok_or_else(|| {
-                                        (
-                                            server.clone(),
-                                            Error::bad_database(
-                                                "Event in servernamepduids not found in db.",
-                                            ),
-                                        )
-                                    })?,
+                                .map_err(|e| (OutgoingKind::Push(id.clone()), e))?
+                                .map(|ev| ev.content.global)
+                                .unwrap_or_else(|| crate::push_rules::default_pushrules(&user));
+                            dbg!(&pusher);
+                            dbg!(&rules_for_user);
+
+                            crate::database::pusher::send_push_notice(
+                                &user,
+                                &pusher,
+                                rules_for_user,
+                                pdu,
                             )
-                            .json()
-                            .get(),
+                            .await
+                            .map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
+                        }
+                    }
+                }
+
+                Ok(OutgoingKind::Push(id))
+            }
+            OutgoingKind::Normal(server) => {
+                let pdu_jsons = pdu_ids
+                    .iter()
+                    .map(|pdu_id| {
+                        Ok::<_, (OutgoingKind, Error)>(
+                            // TODO: check room version and remove event_id if needed
+                            serde_json::from_str(
+                                PduEvent::convert_to_outgoing_federation_event(
+                                    rooms
+                                        .get_pdu_json_from_id(pdu_id)
+                                        .map_err(|e| (OutgoingKind::Normal(server.clone()), e))?
+                                        .ok_or_else(|| {
+                                            (
+                                                OutgoingKind::Normal(server.clone()),
+                                                Error::bad_database(
+                                                    "[Normal] Event in servernamepduids not found in db.",
+                                                ),
+                                            )
+                                        })?,
+                                )
+                                .json()
+                                .get(),
+                            )
+                            .expect("Raw<..> is always valid"),
                         )
-                        .expect("Raw<..> is always valid"),
-                    )
-                })
-                .filter_map(|r| r.ok())
-                .collect::<Vec<_>>();
-
-            server_server::send_request(
-                &globals,
-                &*server,
-                send_transaction_message::v1::Request {
-                    origin: globals.server_name(),
-                    pdus: &pdu_jsons,
-                    edus: &[],
-                    origin_server_ts: SystemTime::now(),
-                    transaction_id: &utils::random_string(16),
-                },
-            )
-            .await
-            .map(|_response| (server.clone(), is_appservice))
-            .map_err(|e| (server, is_appservice, e))
+                    })
+                    .filter_map(|r| r.ok())
+                    .collect::<Vec<_>>();
+
+                server_server::send_request(
+                    &globals,
+                    &*server,
+                    send_transaction_message::v1::Request {
+                        origin: globals.server_name(),
+                        pdus: &pdu_jsons,
+                        edus: &[],
+                        origin_server_ts: SystemTime::now(),
+                        transaction_id: &utils::random_string(16),
+                    },
+                )
+                .await
+                .map(|_response| OutgoingKind::Normal(server.clone()))
+                .map_err(|e| (OutgoingKind::Normal(server.clone()), e))
+            }
         }
     }
 
-    fn parse_servercurrentpdus(key: IVec) -> Result<(Box<ServerName>, IVec, bool)> {
+    fn parse_servercurrentpdus(key: IVec) -> Result<(OutgoingKind, IVec)> {
         let mut parts = key.splitn(2, |&b| b == 0xff);
         let server = parts.next().expect("splitn always returns one element");
         let pdu = parts
@@ -330,19 +485,26 @@ fn parse_servercurrentpdus(key: IVec) -> Result<(Box<ServerName>, IVec, bool)> {
         })?;
 
         // Appservices start with a plus
-        let (server, is_appservice) = if server.starts_with('+') {
-            (&server[1..], true)
+        Ok::<_, Error>(if server.starts_with('+') {
+            (
+                OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
+                    Error::bad_database("Invalid server string in server_currenttransaction")
+                })?),
+                IVec::from(pdu),
+            )
+        } else if server.starts_with('$') {
+            (
+                OutgoingKind::Push(server.as_bytes().to_vec()),
+                IVec::from(pdu),
+            )
         } else {
-            (&*server, false)
-        };
-
-        Ok::<_, Error>((
-            Box::<ServerName>::try_from(server).map_err(|_| {
-                Error::bad_database("Invalid server string in server_currenttransaction")
-            })?,
-            IVec::from(pdu),
-            is_appservice,
-        ))
+            (
+                OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
+                    Error::bad_database("Invalid server string in server_currenttransaction")
+                })?),
+                IVec::from(pdu),
+            )
+        })
     }
 
     pub async fn send_federation_request<T: OutgoingRequest>(
-- 
GitLab