Skip to content
Snippets Groups Projects
message.rs 5.65 KiB
Newer Older
  • Learn to ignore specific revisions
  • Jonathan de Jong's avatar
    Jonathan de Jong committed
    use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma};
    
    use ruma::{
        api::client::{
            error::ErrorKind,
            r0::message::{get_message_events, send_message_event},
        },
    
    Timo Kösters's avatar
    Timo Kösters committed
        events::EventType,
    
    Timo's avatar
    Timo committed
        EventId,
    
    use std::{
        collections::BTreeMap,
        convert::{TryFrom, TryInto},
    };
    
    
    #[cfg(feature = "conduit_bin")]
    use rocket::{get, put};
    
    #[cfg_attr(
        feature = "conduit_bin",
        put("/_matrix/client/r0/rooms/<_>/send/<_>/<_>", data = "<body>")
    )]
    
    #[tracing::instrument(skip(db, body))]
    
    pub async fn send_message_event_route(
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
        db: DatabaseGuard,
    
    Timo Kösters's avatar
    Timo Kösters committed
        body: Ruma<send_message_event::Request<'_>>,
    
    Jonas Platte's avatar
    Jonas Platte committed
    ) -> ConduitResult<send_message_event::Response> {
    
        let sender_user = body.sender_user.as_ref().expect("user is authenticated");
    
        let sender_device = body.sender_device.as_deref();
    
    Timo's avatar
    Timo committed
    
        // Check if this is a new transaction id
    
        if let Some(response) =
            db.transaction_ids
                .existing_txnid(sender_user, sender_device, &body.txn_id)?
    
    Timo's avatar
    Timo committed
        {
            // The client might have sent a txnid of the /sendToDevice endpoint
            // This txnid has no response associated with it
            if response.is_empty() {
                return Err(Error::BadRequest(
                    ErrorKind::InvalidParam,
                    "Tried to use txn id already used for an incompatible endpoint.",
                ));
            }
    
            let event_id = EventId::try_from(
                utils::string_from_bytes(&response)
                    .map_err(|_| Error::bad_database("Invalid txnid bytes in database."))?,
            )
            .map_err(|_| Error::bad_database("Invalid event id in txnid data."))?;
            return Ok(send_message_event::Response { event_id }.into());
        }
    
        let mut unsigned = BTreeMap::new();
    
        unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into());
    
    
    Timo Kösters's avatar
    Timo Kösters committed
        let event_id = db.rooms.build_and_append_pdu(
            PduBuilder {
    
    Timo Kösters's avatar
    Timo Kösters committed
                event_type: EventType::from(&body.event_type),
    
                content: serde_json::from_str(body.body.body.json().get())
                    .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?,
    
    Timo Kösters's avatar
    Timo Kösters committed
                unsigned: Some(unsigned),
                state_key: None,
                redacts: None,
            },
    
            &sender_user,
    
    Timo Kösters's avatar
    Timo Kösters committed
            &body.room_id,
    
    Devin Ragotzy's avatar
    Devin Ragotzy committed
            &db,
    
    Timo Kösters's avatar
    Timo Kösters committed
        )?;
    
        db.transaction_ids.add_txnid(
            sender_user,
            sender_device,
            &body.txn_id,
            event_id.as_bytes(),
        )?;
    
        Ok(send_message_event::Response::new(event_id).into())
    
    }
    
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/client/r0/rooms/<_>/messages", data = "<body>")
    )]
    
    #[tracing::instrument(skip(db, body))]
    
    pub async fn get_message_events_route(
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
        db: DatabaseGuard,
    
    Timo Kösters's avatar
    Timo Kösters committed
        body: Ruma<get_message_events::Request<'_>>,
    
    ) -> ConduitResult<get_message_events::Response> {
    
        let sender_user = body.sender_user.as_ref().expect("user is authenticated");
    
        if !db.rooms.is_joined(sender_user, &body.room_id)? {
    
            return Err(Error::BadRequest(
                ErrorKind::Forbidden,
                "You don't have permission to view this room.",
            ));
        }
    
        let from = body
            .from
            .clone()
            .parse()
            .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from` value."))?;
    
        let to = body.to.as_ref().map(|t| t.parse());
    
        // Use limit or else 10
        let limit = body
            .limit
            .try_into()
            .map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?;
    
        match body.dir {
            get_message_events::Direction::Forward => {
                let events_after = db
                    .rooms
    
                    .pdus_after(&sender_user, &body.room_id, from)
    
                    .take(limit)
                    .filter_map(|r| r.ok()) // Filter out buggy events
    
                    .filter_map(|(pdu_id, pdu)| {
                        db.rooms
                            .pdu_count(&pdu_id)
                            .map(|pdu_count| (pdu_count, pdu))
                            .ok()
                    })
    
                    .take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
                    .collect::<Vec<_>>();
    
                let end_token = events_after.last().map(|(count, _)| count.to_string());
    
                let events_after = events_after
                    .into_iter()
                    .map(|(_, pdu)| pdu.to_room_event())
                    .collect::<Vec<_>>();
    
    
                let mut resp = get_message_events::Response::new();
    
    Timo Kösters's avatar
    Timo Kösters committed
                resp.start = Some(body.from.to_owned());
    
                resp.end = end_token;
                resp.chunk = events_after;
                resp.state = Vec::new();
    
                Ok(resp.into())
    
            }
            get_message_events::Direction::Backward => {
                let events_before = db
                    .rooms
    
                    .pdus_until(&sender_user, &body.room_id, from)
    
                    .take(limit)
                    .filter_map(|r| r.ok()) // Filter out buggy events
    
                    .filter_map(|(pdu_id, pdu)| {
                        db.rooms
                            .pdu_count(&pdu_id)
                            .map(|pdu_count| (pdu_count, pdu))
                            .ok()
                    })
    
                    .take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to`
                    .collect::<Vec<_>>();
    
                let start_token = events_before.last().map(|(count, _)| count.to_string());
    
                let events_before = events_before
                    .into_iter()
                    .map(|(_, pdu)| pdu.to_room_event())
                    .collect::<Vec<_>>();
    
    
                let mut resp = get_message_events::Response::new();
    
    Timo Kösters's avatar
    Timo Kösters committed
                resp.start = Some(body.from.to_owned());
    
                resp.end = start_token;
                resp.chunk = events_before;
                resp.state = Vec::new();
    
                Ok(resp.into())