Skip to content
Snippets Groups Projects
server_server.rs 13.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma};
    
    use http::header::{HeaderValue, AUTHORIZATION, HOST};
    
    use rocket::{get, post, put, response::content::Json, State};
    
    Timo Kösters's avatar
    Timo Kösters committed
    use ruma::{
        api::{
            federation::{
    
                directory::{get_public_rooms, get_public_rooms_filtered},
    
    Timo Kösters's avatar
    Timo Kösters committed
                discovery::{
                    get_server_keys, get_server_version::v1 as get_server_version, ServerKey, VerifyKey,
                },
    
                event::get_missing_events,
    
    Timo Kösters's avatar
    Timo Kösters committed
                transactions::send_transaction_message,
    
    Timo Kösters's avatar
    Timo Kösters committed
            OutgoingRequest,
    
        directory::{IncomingFilter, IncomingRoomNetwork},
    
    timokoesters's avatar
    timokoesters committed
    };
    
    timokoesters's avatar
    timokoesters committed
    use std::{
    
        collections::BTreeMap,
        convert::TryFrom,
    
    timokoesters's avatar
    timokoesters committed
        time::{Duration, SystemTime},
    };
    
    use trust_dns_resolver::AsyncResolver;
    
    timokoesters's avatar
    timokoesters committed
    
    
    pub async fn request_well_known(
        globals: &crate::database::globals::Globals,
        destination: &str,
    ) -> Option<String> {
    
        let body: serde_json::Value = serde_json::from_str(
    
                .reqwest_client()
                .get(&format!(
                    "https://{}/.well-known/matrix/server",
                    destination
                ))
                .send()
                .await
                .ok()?
                .text()
                .await
                .ok()?,
        )
        .ok()?;
    
    timokoesters's avatar
    timokoesters committed
        Some(body.get("m.server")?.as_str()?.to_owned())
    }
    
    
    pub async fn send_request<T: OutgoingRequest>(
    
        globals: &crate::database::globals::Globals,
    
        destination: Box<ServerName>,
    
    timokoesters's avatar
    timokoesters committed
        request: T,
    
    ) -> Result<T::IncomingResponse>
    where
        T: Debug,
    {
    
        let resolver = AsyncResolver::tokio_from_system_conf()
            .await
            .map_err(|_| Error::BadConfig("Failed to set up trust dns resolver with system config."))?;
    
        let mut host = None;
    
    
        let actual_destination = "https://".to_owned()
    
            + &if let Some(mut delegated_hostname) =
                request_well_known(globals, &destination.as_str()).await
            {
                if let Ok(Some(srv)) = resolver
                    .srv_lookup(format!("_matrix._tcp.{}", delegated_hostname))
                    .await
                    .map(|srv| srv.iter().next().map(|result| result.target().to_string()))
                {
                    host = Some(delegated_hostname);
                    srv.trim_end_matches('.').to_owned()
                } else {
                    if delegated_hostname.find(':').is_none() {
                        delegated_hostname += ":8448";
    
                    delegated_hostname
                }
            } else {
                let mut destination = destination.as_str().to_owned();
                if destination.find(':').is_none() {
                    destination += ":8448";
                }
                destination
            };
    
    
        let mut http_request = request
            .try_into_http_request(&actual_destination, Some(""))
    
            .map_err(|e| {
                warn!("{}: {}", actual_destination, e);
                Error::BadServerResponse("Invalid destination")
            })?;
    
        let mut request_map = serde_json::Map::new();
    
    timokoesters's avatar
    timokoesters committed
    
    
        if !http_request.body().is_empty() {
    
            request_map.insert(
                "content".to_owned(),
    
                serde_json::from_slice(http_request.body())
                    .expect("body is valid json, we just created it"),
    
    timokoesters's avatar
    timokoesters committed
    
    
        request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
    
        request_map.insert(
            "uri".to_owned(),
            http_request
                .uri()
                .path_and_query()
                .expect("all requests have a path")
                .to_string()
                .into(),
        );
    
        request_map.insert("origin".to_owned(), globals.server_name().as_str().into());
    
        request_map.insert("destination".to_owned(), destination.as_str().into());
    
    
        let mut request_json = request_map.into();
    
        ruma::signatures::sign_json(
    
            globals.server_name().as_str(),
            globals.keypair(),
    
    timokoesters's avatar
    timokoesters committed
            &mut request_json,
        )
    
        .expect("our request json is what ruma expects");
    
    timokoesters's avatar
    timokoesters committed
    
    
        let signatures = request_json["signatures"]
            .as_object()
            .unwrap()
            .values()
    
            .map(|v| {
                v.as_object()
                    .unwrap()
                    .iter()
                    .map(|(k, v)| (k, v.as_str().unwrap()))
            });
    
        for signature_server in signatures {
            for s in signature_server {
                http_request.headers_mut().insert(
                    AUTHORIZATION,
                    HeaderValue::from_str(&format!(
                        "X-Matrix origin={},key=\"{}\",sig=\"{}\"",
    
                        globals.server_name(),
    
        if let Some(host) = host {
            http_request
                .headers_mut()
                .insert(HOST, HeaderValue::from_str(&host).unwrap());
        }
    
        let mut reqwest_request = reqwest::Request::try_from(http_request)
    
            .expect("all http requests are valid reqwest requests");
    
    
        *reqwest_request.timeout_mut() = Some(Duration::from_secs(30));
    
    
        let url = reqwest_request.url().clone();
    
        let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
    
    
        // Because reqwest::Response -> http::Response is complicated:
        match reqwest_response {
            Ok(mut reqwest_response) => {
                let status = reqwest_response.status();
                let mut http_response = http::Response::builder().status(status);
                let headers = http_response.headers_mut().unwrap();
    
                for (k, v) in reqwest_response.headers_mut().drain() {
                    if let Some(key) = k {
                        headers.insert(key, v);
                    }
                }
    
                let body = reqwest_response
                    .bytes()
                    .await
                    .unwrap()
                    .into_iter()
                    .collect();
    
                let response = T::IncomingResponse::try_from(
                    http_response
                        .body(body)
                        .expect("reqwest body is valid http body"),
                );
    
                response.map_err(|e| {
    
                    warn!(
                        "Server returned bad response {} ({}): {:?}",
                        destination, url, e
                    );
    
                    Error::BadServerResponse("Server returned bad response.")
                })
    
            Err(e) => Err(e.into()),
    
    timokoesters's avatar
    timokoesters committed
    }
    
    #[cfg_attr(feature = "conduit_bin", get("/_matrix/federation/v1/version"))]
    pub fn get_server_version() -> ConduitResult<get_server_version::Response> {
        Ok(get_server_version::Response {
    
    timokoesters's avatar
    timokoesters committed
            server: Some(get_server_version::Server {
    
    timokoesters's avatar
    timokoesters committed
                name: Some("Conduit".to_owned()),
                version: Some(env!("CARGO_PKG_VERSION").to_owned()),
    
    timokoesters's avatar
    timokoesters committed
            }),
    
    #[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server"))]
    
    pub fn get_server_keys(db: State<'_, Database>) -> Json<String> {
    
    timokoesters's avatar
    timokoesters committed
        let mut verify_keys = BTreeMap::new();
        verify_keys.insert(
    
            format!("ed25519:{}", db.globals.keypair().version()),
    
                key: base64::encode_config(db.globals.keypair().public_key(), base64::STANDARD_NO_PAD),
    
    timokoesters's avatar
    timokoesters committed
            },
        );
        let mut response = serde_json::from_slice(
    
            http::Response::try_from(get_server_keys::v2::Response {
                server_key: ServerKey {
                    server_name: db.globals.server_name().to_owned(),
                    verify_keys,
                    old_verify_keys: BTreeMap::new(),
                    signatures: BTreeMap::new(),
                    valid_until_ts: SystemTime::now() + Duration::from_secs(60 * 2),
                },
    
    timokoesters's avatar
    timokoesters committed
            })
            .unwrap()
            .body(),
        )
        .unwrap();
    
        ruma::signatures::sign_json(
    
            db.globals.server_name().as_str(),
    
            db.globals.keypair(),
            &mut response,
        )
        .unwrap();
    
        Json(response.to_string())
    
    #[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server/<_>"))]
    pub fn get_server_keys_deprecated(db: State<'_, Database>) -> Json<String> {
    
    
    #[cfg_attr(
        feature = "conduit_bin",
        post("/_matrix/federation/v1/publicRooms", data = "<body>")
    )]
    
    pub async fn get_public_rooms_filtered_route(
        db: State<'_, Database>,
        body: Ruma<get_public_rooms_filtered::v1::Request<'_>>,
    ) -> ConduitResult<get_public_rooms_filtered::v1::Response> {
        let response = client_server::get_public_rooms_filtered_helper(
            &db,
            None,
            body.limit,
            body.since.as_deref(),
            &body.filter,
            &body.room_network,
        )
        .await?
        .0;
    
        Ok(get_public_rooms_filtered::v1::Response {
            chunk: response
                .chunk
                .into_iter()
                .map(|c| {
                    // Convert ruma::api::federation::directory::get_public_rooms::v1::PublicRoomsChunk
                    // to ruma::api::client::r0::directory::PublicRoomsChunk
                    Ok::<_, Error>(
                        serde_json::from_str(
                            &serde_json::to_string(&c)
                                .expect("PublicRoomsChunk::to_string always works"),
                        )
                        .expect("federation and client-server PublicRoomsChunk are the same type"),
                    )
                })
                .filter_map(|r| r.ok())
                .collect(),
            prev_batch: response.prev_batch,
            next_batch: response.next_batch,
            total_room_count_estimate: response.total_room_count_estimate,
        }
        .into())
    }
    
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v1/publicRooms", data = "<body>")
    )]
    
    pub async fn get_public_rooms_route(
        db: State<'_, Database>,
    
    Timo Kösters's avatar
    Timo Kösters committed
        body: Ruma<get_public_rooms::v1::Request<'_>>,
    
    ) -> ConduitResult<get_public_rooms::v1::Response> {
    
        let response = client_server::get_public_rooms_filtered_helper(
    
            body.limit,
            body.since.as_deref(),
            &IncomingFilter::default(),
            &IncomingRoomNetwork::Matrix,
    
        )
        .await?
        .0;
    
        Ok(get_public_rooms::v1::Response {
    
            chunk: response
                .chunk
    
                .into_iter()
                .map(|c| {
                    // Convert ruma::api::federation::directory::get_public_rooms::v1::PublicRoomsChunk
                    // to ruma::api::client::r0::directory::PublicRoomsChunk
                    Ok::<_, Error>(
                        serde_json::from_str(
                            &serde_json::to_string(&c)
                                .expect("PublicRoomsChunk::to_string always works"),
                        )
                        .expect("federation and client-server PublicRoomsChunk are the same type"),
                    )
                })
                .filter_map(|r| r.ok())
                .collect(),
    
            prev_batch: response.prev_batch,
            next_batch: response.next_batch,
            total_room_count_estimate: response.total_room_count_estimate,
    
        }
        .into())
    }
    
    #[cfg_attr(
        feature = "conduit_bin",
        put("/_matrix/federation/v1/send/<_>", data = "<body>")
    )]
    
    pub fn send_transaction_message_route<'a>(
    
    Timo Kösters's avatar
    Timo Kösters committed
        db: State<'a, Database>,
    
    Timo Kösters's avatar
    Timo Kösters committed
        body: Ruma<send_transaction_message::v1::Request<'_>>,
    
    ) -> ConduitResult<send_transaction_message::v1::Response> {
    
        //dbg!(&*body);
    
    Timo Kösters's avatar
    Timo Kösters committed
        for pdu in &body.pdus {
    
            let mut value = serde_json::from_str(pdu.json().get())
                .expect("converting raw jsons to values always works");
    
    
    Timo Kösters's avatar
    Timo Kösters committed
            let event_id = EventId::try_from(&*format!(
                "${}",
                ruma::signatures::reference_hash(&value).expect("ruma can calculate reference hashes")
            ))
            .expect("ruma's reference hashes are valid event ids");
    
            value
                .as_object_mut()
                .expect("ruma pdus are json objects")
                .insert("event_id".to_owned(), event_id.to_string().into());
    
    
            let pdu = serde_json::from_value::<PduEvent>(value.clone())
                .expect("all ruma pdus are conduit pdus");
    
            if db.rooms.exists(&pdu.room_id)? {
    
                let pdu_id = db
                    .rooms
    
                    .append_pdu(&pdu, &value, &db.globals, &db.account_data)?;
    
                db.rooms.append_to_state(&pdu_id, &pdu)?;
    
        Ok(send_transaction_message::v1::Response {
            pdus: BTreeMap::new(),
        }
        .into())
    }
    
    
    #[cfg_attr(
        feature = "conduit_bin",
        post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
    )]
    pub fn get_missing_events_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_missing_events::v1::Request<'_>>,
    ) -> ConduitResult<get_missing_events::v1::Response> {
        let mut queued_events = body.latest_events.clone();
        let mut events = Vec::new();
    
        let mut i = 0;
        while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
            if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? {
                if body.earliest_events.contains(
                    &serde_json::from_value(
                        pdu.get("event_id")
                            .cloned()
                            .ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?,
                    )
                    .map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?,
                ) {
                    i += 1;
                    continue;
                }
                queued_events.extend_from_slice(
                    &serde_json::from_value::<Vec<EventId>>(
                        pdu.get("prev_events").cloned().ok_or_else(|| {
                            Error::bad_database("Invalid prev_events field of pdu in db.")
                        })?,
                    )
                    .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
                );
                events.push(PduEvent::to_outgoing_federation_event(pdu));
            }
            i += 1;
        }
    
        dbg!(&events);
    
        Ok(get_missing_events::v1::Response { events }.into())
    }