Skip to content
Snippets Groups Projects
server_server.rs 73.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • // Conduit implements the older APIs
    
        api::client_server::{self, claim_keys_helper, get_keys_helper},
    
    Timo Kösters's avatar
    Timo Kösters committed
        service::pdu::{gen_event_id_canonical_json, PduBuilder},
        services, utils, Error, PduEvent, Result, Ruma,
    
    Jonas Platte's avatar
    Jonas Platte committed
    use axum::{response::IntoResponse, Json};
    
    use futures_util::future::TryFutureExt;
    
    Timo Kösters's avatar
    Timo Kösters committed
    use get_profile_information::v1::ProfileField;
    
    Timo Kösters's avatar
    Timo Kösters committed
    use http::header::{HeaderValue, AUTHORIZATION};
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    
    use ipaddress::IPAddress;
    
    Timo Kösters's avatar
    Timo Kösters committed
    use ruma::{
        api::{
    
    Jonas Platte's avatar
    Jonas Platte committed
            client::error::{Error as RumaError, ErrorKind},
    
    Timo Kösters's avatar
    Timo Kösters committed
            federation::{
    
    Timo Kösters's avatar
    Timo Kösters committed
                authorization::get_event_authorization,
    
                backfill::get_backfill,
    
    Timo Kösters's avatar
    Timo Kösters committed
                device::get_devices::{self, v1::UserDevice},
    
                directory::{get_public_rooms, get_public_rooms_filtered},
    
    Timo Kösters's avatar
    Timo Kösters committed
                discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
    
    Timo Kösters's avatar
    Timo Kösters committed
                event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
    
                keys::{claim_keys, get_keys},
    
    Kévin Commaille's avatar
    Kévin Commaille committed
                membership::{create_invite, create_join_event, prepare_join_event},
    
                query::{get_profile_information, get_room_information},
    
    chenyuqide's avatar
    chenyuqide committed
                    edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
    
                    send_transaction_message,
                },
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
            EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse,
            SendAccessToken,
    
    Jonas Platte's avatar
    Jonas Platte committed
        directory::{Filter, RoomNetwork},
    
    Timo Kösters's avatar
    Timo Kösters committed
            receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
    
                join_rules::{JoinRule, RoomJoinRulesEventContent},
    
    Jonas Platte's avatar
    Jonas Platte committed
                member::{MembershipState, RoomMemberEventContent},
    
    Kévin Commaille's avatar
    Kévin Commaille committed
            StateEventType, TimelineEventType,
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
        serde::{Base64, JsonObject, Raw},
    
    Timo Kösters's avatar
    Timo Kösters committed
        to_device::DeviceIdOrAllDevices,
    
        uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
        OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
        ServerName,
    
    timokoesters's avatar
    timokoesters committed
    };
    
    Jonas Platte's avatar
    Jonas Platte committed
    use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
    
    timokoesters's avatar
    timokoesters committed
    use std::{
    
    Timo Kösters's avatar
    Timo Kösters committed
        collections::BTreeMap,
    
        net::{IpAddr, SocketAddr},
    
    Timo Kösters's avatar
    Timo Kösters committed
        sync::{Arc, RwLock},
    
        time::{Duration, Instant, SystemTime},
    
    timokoesters's avatar
    timokoesters committed
    };
    
    use trust_dns_resolver::{error::ResolveError, lookup::SrvLookup};
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    
    use tracing::{debug, error, info, warn};
    
    timokoesters's avatar
    timokoesters committed
    
    
    /// Wraps either an literal IP address plus port, or a hostname plus complement
    /// (colon-plus-port if it was specified).
    ///
    /// Note: A `FedDest::Named` might contain an IP address in string form if there
    /// was no port specified to construct a SocketAddr with.
    ///
    /// # Examples:
    
    /// ```rust
    
    Charles Hall's avatar
    Charles Hall committed
    /// # use conduit::api::server_server::FedDest;
    
    /// # fn main() -> Result<(), std::net::AddrParseError> {
    
    /// FedDest::Literal("198.51.100.3:8448".parse()?);
    /// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
    /// FedDest::Named("matrix.example.org".to_owned(), "".to_owned());
    /// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
    /// FedDest::Named("198.51.100.5".to_owned(), "".to_owned());
    
    /// # Ok(())
    /// # }
    
    Nyaaori's avatar
    Nyaaori committed
    #[derive(Clone, Debug, PartialEq, Eq)]
    
    Timo Kösters's avatar
    Timo Kösters committed
    pub enum FedDest {
    
        Literal(SocketAddr),
        Named(String, String),
    }
    
    
    impl FedDest {
    
        fn into_https_string(self) -> String {
    
    Nyaaori's avatar
    Nyaaori committed
                Self::Literal(addr) => format!("https://{addr}"),
                Self::Named(host, port) => format!("https://{host}{port}"),
    
        fn into_uri_string(self) -> String {
    
            match self {
                Self::Literal(addr) => addr.to_string(),
    
    🥺's avatar
    🥺 committed
                Self::Named(host, port) => host + &port,
    
        fn hostname(&self) -> String {
    
            match &self {
                Self::Literal(addr) => addr.ip().to_string(),
    
                Self::Named(host, _) => host.clone(),
    
    Timo Kösters's avatar
    Timo Kösters committed
    
        fn port(&self) -> Option<u16> {
            match &self {
                Self::Literal(addr) => Some(addr.port()),
                Self::Named(_, port) => port[1..].parse().ok(),
            }
        }
    
    pub(crate) async fn send_request<T>(
    
        destination: &ServerName,
    
    timokoesters's avatar
    timokoesters committed
        request: T,
    
    ) -> Result<T::IncomingResponse>
    where
    
        T: OutgoingRequest + Debug,
    
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        if destination == services().globals.server_name() {
            return Err(Error::bad_config(
                "Won't send federation request to ourselves",
            ));
        }
    
    
        if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) {
    
            info!(
                "Destination {} is an IP literal, checking against IP range denylist.",
                destination
            );
    
            let ip = IPAddress::parse(destination.host()).map_err(|e| {
                warn!("Failed to parse IP literal from string: {}", e);
                Error::BadServerResponse("Invalid IP address")
            })?;
    
            let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
            let mut cidr_ranges: Vec<IPAddress> = Vec::new();
    
            for cidr in cidr_ranges_s {
                cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
            }
    
    
            debug!("List of pushed CIDR ranges: {:?}", cidr_ranges);
    
    
            for cidr in cidr_ranges {
    
                if cidr.includes(&ip) {
    
                    return Err(Error::BadServerResponse(
                        "Not allowed to send requests to this IP",
                    ));
                }
            }
    
    
            info!("IP literal {} is allowed.", destination);
    
        debug!("Preparing to send request to {destination}");
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut write_destination_to_cache = false;
    
    
    Timo Kösters's avatar
    Timo Kösters committed
        let cached_result = services()
            .globals
    
            .actual_destination_cache
            .read()
            .unwrap()
    
            .get(destination)
    
    Timo Kösters's avatar
    Timo Kösters committed
        let (actual_destination, host) = if let Some(result) = cached_result {
    
    Timo Kösters's avatar
    Timo Kösters committed
            write_destination_to_cache = true;
    
    
            let result = find_actual_destination(destination).await;
    
            (result.0, result.1.into_uri_string())
    
    Timo Kösters's avatar
    Timo Kösters committed
        let actual_destination_str = actual_destination.clone().into_https_string();
    
    
        let mut http_request = request
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
            .try_into_http_request::<Vec<u8>>(
                &actual_destination_str,
                SendAccessToken::IfRequired(""),
    
                &[MatrixVersion::V1_5],
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
            )
    
            .map_err(|e| {
    
    Timo Kösters's avatar
    Timo Kösters committed
                warn!(
                    "Failed to find destination {}: {}",
                    actual_destination_str, e
                );
    
                Error::BadServerResponse("Invalid destination")
            })?;
    
        let mut request_map = serde_json::Map::new();
    
    timokoesters's avatar
    timokoesters committed
    
    
        if !http_request.body().is_empty() {
    
            request_map.insert(
                "content".to_owned(),
    
                serde_json::from_slice(http_request.body())
                    .expect("body is valid json, we just created it"),
    
    timokoesters's avatar
    timokoesters committed
    
    
        request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
    
        request_map.insert(
            "uri".to_owned(),
            http_request
                .uri()
                .path_and_query()
                .expect("all requests have a path")
                .to_string()
                .into(),
        );
    
    Timo Kösters's avatar
    Timo Kösters committed
        request_map.insert(
            "origin".to_owned(),
            services().globals.server_name().as_str().into(),
        );
    
        request_map.insert("destination".to_owned(), destination.as_str().into());
    
        let mut request_json =
            serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
    
    
        ruma::signatures::sign_json(
    
            services().globals.server_name().as_str(),
            services().globals.keypair(),
    
            &mut request_json,
    
        .expect("our request json is what ruma expects");
    
    timokoesters's avatar
    timokoesters committed
    
    
        let request_json: serde_json::Map<String, serde_json::Value> =
            serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
    
    
        let signatures = request_json["signatures"]
            .as_object()
            .unwrap()
            .values()
    
            .map(|v| {
                v.as_object()
                    .unwrap()
                    .iter()
                    .map(|(k, v)| (k, v.as_str().unwrap()))
            });
    
        for signature_server in signatures {
            for s in signature_server {
                http_request.headers_mut().insert(
                    AUTHORIZATION,
                    HeaderValue::from_str(&format!(
                        "X-Matrix origin={},key=\"{}\",sig=\"{}\"",
    
                        services().globals.server_name(),
    
    🥺's avatar
    🥺 committed
        let reqwest_request = reqwest::Request::try_from(http_request)?;
    
        let url = reqwest_request.url().clone();
    
        debug!("Sending request to {destination} at {url}");
    
    Timo Kösters's avatar
    Timo Kösters committed
        let response = services()
            .globals
            .federation_client()
            .execute(reqwest_request)
            .await;
    
        debug!("Received response from {destination} at {url}");
    
        match response {
            Ok(mut response) => {
                // reqwest::Response -> http::Response conversion
                let status = response.status();
                let mut http_response_builder = http::Response::builder()
                    .status(status)
                    .version(response.version());
                mem::swap(
                    response.headers_mut(),
                    http_response_builder
                        .headers_mut()
                        .expect("http::response::Builder is usable"),
                );
    
                debug!("Getting response bytes from {destination}");
    
                let body = response.bytes().await.unwrap_or_else(|e| {
    
                    info!("server error {}", e);
    
                    Vec::new().into()
                }); // TODO: handle timeout
    
                debug!("Got response bytes from {destination}");
    
                if !status.is_success() {
    
                    debug!(
                        "Response not successful\n{} {}: {}",
    
                        url,
                        status,
    
    Timo Kösters's avatar
    Timo Kösters committed
                        String::from_utf8_lossy(&body)
                            .lines()
                            .collect::<Vec<_>>()
                            .join(" ")
    
                let http_response = http_response_builder
                    .body(body)
                    .expect("reqwest body is valid http body");
    
    
                if status.is_success() {
    
                    debug!("Parsing response bytes from {destination}");
    
                    let response = T::IncomingResponse::try_from_http_response(http_response);
    
    Timo Kösters's avatar
    Timo Kösters committed
                    if response.is_ok() && write_destination_to_cache {
    
    Timo Kösters's avatar
    Timo Kösters committed
                        services()
                            .globals
                            .actual_destination_cache
                            .write()
                            .unwrap()
                            .insert(
    
    Timo Kösters's avatar
    Timo Kösters committed
                                OwnedServerName::from(destination),
    
    Timo Kösters's avatar
    Timo Kösters committed
                                (actual_destination, host),
                            );
    
                    response.map_err(|e| {
    
                        warn!(
                            "Invalid 200 response from {} on: {} {}",
                            &destination, url, e
                        );
    
                        Error::BadServerResponse("Server returned bad 200 response.")
                    })
    
                    debug!("Returning error from {destination}");
    
    
                    // remove potentially dead destinations from our cache that may be from modified well-knowns
                    if !write_destination_to_cache {
                        info!("Evicting {destination} from our true destination cache due to failed request.");
                        services()
                            .globals
                            .actual_destination_cache
                            .write()
                            .unwrap()
                            .remove(destination);
                    }
    
    
                    Err(Error::FederationError(
                        destination.to_owned(),
    
    Jonas Platte's avatar
    Jonas Platte committed
                        RumaError::from_http_response(http_response),
    
            Err(e) => {
    
                // we do not need to log that servers in a room are dead, this is normal in public rooms and just spams the logs.
                match e.is_timeout() {
                    true => info!(
    
                        "Timed out sending request to {} at {}: {}",
                        destination, actual_destination_str, e
    
                    ),
                    false => match e.is_connect() {
                        true => info!(
                            "Failed to connect to {} at {}: {}",
                            destination, actual_destination_str, e
                        ),
                        false => match e.is_redirect() {
                            true => info!(
                                "Redirect loop sending request to {} at {}: {}\nFinal URL: {:?}",
                                destination,
                                actual_destination_str,
                                e,
                                e.url()
                            ),
                            false => warn!(
                                "Could not send request to {} at {}: {}",
                                destination, actual_destination_str, e
                            ),
                        },
                    },
    
                Err(e.into())
    
    timokoesters's avatar
    timokoesters committed
    }
    
    fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
    
        if let Ok(destination) = destination_str.parse::<SocketAddr>() {
    
            Some(FedDest::Literal(destination))
    
        } else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
    
            Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
    
    fn add_port_to_hostname(destination_str: &str) -> FedDest {
    
        let (host, port) = match destination_str.find(':') {
            None => (destination_str, ":8448"),
            Some(pos) => destination_str.split_at(pos),
        };
    
        FedDest::Named(host.to_owned(), port.to_owned())
    
    /// Returns: actual_destination, host header
    
    Charles Hall's avatar
    Charles Hall committed
    /// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
    
    /// Numbers in comments below refer to bullet points in linked section of specification
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
    
        debug!("Finding actual destination for {destination}");
    
        let destination_str = destination.as_str().to_owned();
    
        let mut hostname = destination_str.clone();
        let actual_destination = match get_ip_with_port(&destination_str) {
    
            Some(host_port) => {
    
                debug!("1: IP literal with provided or default port");
    
                host_port
            }
            None => {
                if let Some(pos) = destination_str.find(':') {
    
                    debug!("2: Hostname with included port");
    
                    let (host, port) = destination_str.split_at(pos);
    
                    FedDest::Named(host.to_owned(), port.to_owned())
    
                    debug!("Requesting well known for {destination}");
    
                    match request_well_known(destination.as_str()).await {
    
                        Some(delegated_hostname) => {
    
                            debug!("3: A .well-known file is available");
    
    Timo Kösters's avatar
    Timo Kösters committed
                            hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
    
                            match get_ip_with_port(&delegated_hostname) {
                                Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
                                None => {
    
                                    if let Some(pos) = delegated_hostname.find(':') {
    
                                        debug!("3.2: Hostname with port in .well-known file");
    
                                        let (host, port) = delegated_hostname.split_at(pos);
    
                                        FedDest::Named(host.to_owned(), port.to_owned())
    
                                        debug!("Delegated hostname has no port in this branch");
    
    Timo Kösters's avatar
    Timo Kösters committed
                                        if let Some(hostname_override) =
    
                                            query_srv_record(&delegated_hostname).await
    
                                            debug!("3.3: SRV lookup successful");
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            let force_port = hostname_override.port();
    
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            if let Ok(override_ip) = services()
                                                .globals
    
    Timo Kösters's avatar
    Timo Kösters committed
                                                .dns_resolver()
                                                .lookup_ip(hostname_override.hostname())
                                                .await
                                            {
    
    Timo Kösters's avatar
    Timo Kösters committed
                                                services()
                                                    .globals
                                                    .tls_name_override
                                                    .write()
                                                    .unwrap()
                                                    .insert(
                                                        delegated_hostname.clone(),
                                                        (
                                                            override_ip.iter().collect(),
                                                            force_port.unwrap_or(8448),
                                                        ),
                                                    );
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            } else {
    
                                                debug!(
                                                    "Using SRV record {}, but could not resolve to IP",
                                                    hostname_override.hostname()
                                                );
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            }
    
                                            if let Some(port) = force_port {
    
    Nyaaori's avatar
    Nyaaori committed
                                                FedDest::Named(delegated_hostname, format!(":{port}"))
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            } else {
                                                add_port_to_hostname(&delegated_hostname)
                                            }
                                        } else {
    
                                            debug!("3.4: No SRV records, just use the hostname from .well-known");
    
    Timo Kösters's avatar
    Timo Kösters committed
                                            add_port_to_hostname(&delegated_hostname)
    
                            debug!("4: No .well-known or an error occured");
    
                            match query_srv_record(&destination_str).await {
    
    Timo Kösters's avatar
    Timo Kösters committed
                                Some(hostname_override) => {
    
                                    debug!("4: SRV record found");
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    let force_port = hostname_override.port();
    
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    if let Ok(override_ip) = services()
                                        .globals
    
    Timo Kösters's avatar
    Timo Kösters committed
                                        .dns_resolver()
                                        .lookup_ip(hostname_override.hostname())
                                        .await
                                    {
    
    Timo Kösters's avatar
    Timo Kösters committed
                                        services()
                                            .globals
                                            .tls_name_override
                                            .write()
                                            .unwrap()
                                            .insert(
                                                hostname.clone(),
                                                (
                                                    override_ip.iter().collect(),
                                                    force_port.unwrap_or(8448),
                                                ),
                                            );
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    } else {
    
                                        debug!(
                                            "Using SRV record {}, but could not resolve to IP",
                                            hostname_override.hostname()
                                        );
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    }
    
                                    if let Some(port) = force_port {
    
    Nyaaori's avatar
    Nyaaori committed
                                        FedDest::Named(hostname.clone(), format!(":{port}"))
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    } else {
                                        add_port_to_hostname(&hostname)
                                    }
                                }
    
                                None => {
    
                                    debug!("5: No SRV record found");
    
                                    add_port_to_hostname(&destination_str)
                                }
    
        debug!("Actual destination: {actual_destination:?}");
    
        // Can't use get_ip_with_port here because we don't want to add a port
        // to an IP address if it wasn't specified
    
        let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
            FedDest::Literal(addr)
        } else if let Ok(addr) = hostname.parse::<IpAddr>() {
    
            FedDest::Named(addr.to_string(), ":8448".to_owned())
    
        } else if let Some(pos) = hostname.find(':') {
            let (host, port) = hostname.split_at(pos);
    
            FedDest::Named(host.to_owned(), port.to_owned())
    
            FedDest::Named(hostname, ":8448".to_owned())
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
    
        fn handle_successful_srv(srv: SrvLookup) -> Option<FedDest> {
            srv.iter().next().map(|result| {
                FedDest::Named(
                    result.target().to_string().trim_end_matches('.').to_owned(),
                    format!(":{}", result.port()),
                )
    
    
        async fn lookup_srv(hostname: &str) -> Result<SrvLookup, ResolveError> {
            debug!("querying SRV for {:?}", hostname);
            let hostname = hostname.trim_end_matches('.');
            services()
                .globals
                .dns_resolver()
    
                .srv_lookup(hostname.to_owned())
    
                .await
        }
    
        let first_hostname = format!("_matrix-fed._tcp.{hostname}.");
        let second_hostname = format!("_matrix._tcp.{hostname}.");
    
        lookup_srv(&first_hostname)
            .or_else(|_| {
                info!(
                    "Querying deprecated _matrix SRV record for host {:?}",
                    hostname
                );
                lookup_srv(&second_hostname)
            })
            .and_then(|srv_lookup| async { Ok(handle_successful_srv(srv_lookup)) })
            .await
            .ok()
            .flatten()
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn request_well_known(destination: &str) -> Option<String> {
    
        let response = services()
            .globals
            .default_client()
            .get(&format!("https://{destination}/.well-known/matrix/server"))
            .send()
            .await;
    
        debug!("Got well known response");
    
        debug!("Well known response: {:?}", response);
    
        if let Err(e) = &response {
    
            debug!("Well known error: {e:?}");
            return None;
    
        let text = response.ok()?.text().await;
    
        debug!("Got well known response text");
    
        debug!("Well known response text: {:?}", text);
    
    
        if text.as_ref().ok()?.len() > 10000 {
            info!("Well known response for destination '{destination}' exceeded past 10000 characters, assuming no well-known.");
            return None;
        }
    
    
        let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
    
        debug!("serde_json body of well known text: {}", body);
    
    
        Some(body.get("m.server")?.as_str()?.to_owned())
    }
    
    
    /// # `GET /_matrix/federation/v1/version`
    ///
    /// Get version information on this server.
    
    Jonas Platte's avatar
    Jonas Platte committed
    pub async fn get_server_version_route(
        _body: Ruma<get_server_version::v1::Request>,
    
    ) -> Result<get_server_version::v1::Response> {
    
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        Ok(get_server_version::v1::Response {
            server: Some(get_server_version::v1::Server {
    
    timokoesters's avatar
    timokoesters committed
                version: Some(env!("CARGO_PKG_VERSION").to_owned()),
    
    timokoesters's avatar
    timokoesters committed
            }),
    
    /// # `GET /_matrix/key/v2/server`
    ///
    /// Gets the public signing keys of this server.
    ///
    /// - Matrix does not support invalidating public keys, so the key returned by this will be valid
    /// forever.
    
    // Response type for this endpoint is Json because we need to calculate a signature for the response
    
    pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
    
    timokoesters's avatar
    timokoesters committed
        verify_keys.insert(
    
            format!("ed25519:{}", services().globals.keypair().version())
    
                .try_into()
                .expect("found invalid server signing keys in DB"),
    
                key: Base64::new(services().globals.keypair().public_key().to_vec()),
    
    timokoesters's avatar
    timokoesters committed
            },
        );
        let mut response = serde_json::from_slice(
    
            get_server_keys::v2::Response {
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
                server_key: Raw::new(&ServerSigningKeys {
    
                    server_name: services().globals.server_name().to_owned(),
    
                    verify_keys,
                    old_verify_keys: BTreeMap::new(),
                    signatures: BTreeMap::new(),
    
                    valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
    
                        SystemTime::now() + Duration::from_secs(86400 * 7),
    
    Jonathan de Jong's avatar
    Jonathan de Jong committed
                })
                .expect("static conversion, no errors"),
    
    Jonas Platte's avatar
    Jonas Platte committed
            .try_into_http_response::<Vec<u8>>()
    
    timokoesters's avatar
    timokoesters committed
            .unwrap()
            .body(),
        )
        .unwrap();
    
        ruma::signatures::sign_json(
    
            services().globals.server_name().as_str(),
            services().globals.keypair(),
    
            &mut response,
        )
        .unwrap();
    
    /// # `GET /_matrix/key/v2/server/{keyId}`
    ///
    /// Gets the public signing keys of this server.
    ///
    /// - Matrix does not support invalidating public keys, so the key returned by this will be valid
    /// forever.
    
    pub async fn get_server_keys_deprecated_route() -> impl IntoResponse {
        get_server_keys_route().await
    
    /// # `POST /_matrix/federation/v1/publicRooms`
    ///
    /// Lists the public rooms on this server.
    
    pub async fn get_public_rooms_filtered_route(
    
    Jonas Platte's avatar
    Jonas Platte committed
        body: Ruma<get_public_rooms_filtered::v1::Request>,
    
    ) -> Result<get_public_rooms_filtered::v1::Response> {
    
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        if !services()
            .globals
            .allow_public_room_directory_over_federation()
        {
            return Err(Error::bad_config("Room directory is not public."));
        }
    
    
        let response = client_server::get_public_rooms_filtered_helper(
            None,
            body.limit,
            body.since.as_deref(),
            &body.filter,
            &body.room_network,
        )
    
    
        Ok(get_public_rooms_filtered::v1::Response {
    
            chunk: response.chunk,
    
            prev_batch: response.prev_batch,
            next_batch: response.next_batch,
            total_room_count_estimate: response.total_room_count_estimate,
    
    /// # `GET /_matrix/federation/v1/publicRooms`
    ///
    /// Lists the public rooms on this server.
    
    pub async fn get_public_rooms_route(
    
    Jonas Platte's avatar
    Jonas Platte committed
        body: Ruma<get_public_rooms::v1::Request>,
    
    ) -> Result<get_public_rooms::v1::Response> {
    
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        if !services()
            .globals
            .allow_public_room_directory_over_federation()
        {
            return Err(Error::bad_config("Room directory is not public."));
        }
    
    
        let response = client_server::get_public_rooms_filtered_helper(
    
            body.limit,
            body.since.as_deref(),
    
    Jonas Platte's avatar
    Jonas Platte committed
            &Filter::default(),
            &RoomNetwork::Matrix,
    
    
        Ok(get_public_rooms::v1::Response {
    
            chunk: response.chunk,
    
            prev_batch: response.prev_batch,
            next_batch: response.next_batch,
            total_room_count_estimate: response.total_room_count_estimate,
    
    Timo Kösters's avatar
    Timo Kösters committed
    pub fn parse_incoming_pdu(
        pdu: &RawJsonValue,
    ) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
        let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
            warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
            Error::BadServerResponse("Invalid PDU in server response")
        })?;
    
        let room_id: OwnedRoomId = value
            .get("room_id")
            .and_then(|id| RoomId::parse(id.as_str()?).ok())
            .ok_or(Error::BadRequest(
                ErrorKind::InvalidParam,
                "Invalid room id in pdu",
            ))?;
    
        let room_version_id = services().rooms.state.get_room_version(&room_id)?;
    
    
        let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
    
    Timo Kösters's avatar
    Timo Kösters committed
            Ok(t) => t,
            Err(_) => {
                // Event could not be converted to canonical json
                return Err(Error::BadRequest(
                    ErrorKind::InvalidParam,
                    "Could not convert event to canonical json.",
                ));
            }
        };
        Ok((event_id, value, room_id))
    }
    
    
    /// # `PUT /_matrix/federation/v1/send/{txnId}`
    ///
    /// Push EDUs and PDUs to this server.
    
    pub async fn send_transaction_message_route(
    
    Jonas Platte's avatar
    Jonas Platte committed
        body: Ruma<send_transaction_message::v1::Request>,
    
    ) -> Result<send_transaction_message::v1::Response> {
    
        if !services().globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        let sender_servername = body
            .sender_servername
            .as_ref()
            .expect("server is authenticated");
    
    
        let mut resolved_map = BTreeMap::new();
    
    
        let pub_key_map = RwLock::new(BTreeMap::new());
    
    
        // This is all the auth_events that have been recursively fetched so they don't have to be
        // deserialized over and over again.
        // TODO: make this persist across requests but not in a DB Tree (in globals?)
        // TODO: This could potentially also be some sort of trie (suffix tree) like structure so
        // that once an auth event is known it would know (using indexes maybe) all of the auth
        // events that it references.
    
        // let mut auth_cache = EventMap::new();
    
        let mut parsed_pdus = vec![];
    
        for pdu in &body.pdus {
    
            let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
                warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
                Error::BadServerResponse("Invalid PDU in server response")
            })?;
            let room_id: OwnedRoomId = value
                .get("room_id")
                .and_then(|id| RoomId::parse(id.as_str()?).ok())
                .ok_or(Error::BadRequest(
                    ErrorKind::InvalidParam,
                    "Invalid room id in pdu",
                ))?;
    
            if services().rooms.state.get_room_version(&room_id).is_err() {
                debug!("Server is not in room {room_id}");
                continue;
            }
    
    
            let r = parse_incoming_pdu(pdu);
    
            let (event_id, value, room_id) = match r {
                Ok(t) => t,
                Err(e) => {
    
                    warn!("Could not parse PDU: {e}");
                    warn!("Full PDU: {:?}", &pdu);
    
            parsed_pdus.push((event_id, value, room_id));
    
            // We do not add the event_id field to the pdu here because of signature and hashes checks
    
        }
    
        // We go through all the signatures we see on the PDUs and fetch the corresponding
        // signing keys
        services()
            .rooms
            .event_handler
            .fetch_required_signing_keys(
                parsed_pdus.iter().map(|(_event_id, event, _room_id)| event),
                &pub_key_map,
            )
            .await
            .unwrap_or_else(|e| {
                warn!(
                    "Could not fetch all signatures for PDUs from {}: {:?}",
                    sender_servername, e
    
        for (event_id, value, room_id) in parsed_pdus {
    
    Timo Kösters's avatar
    Timo Kösters committed
                services()
                    .globals
    
    Timo Kösters's avatar
    Timo Kösters committed
                    .roomid_mutex_federation
    
                    .entry(room_id.clone())
    
                    .or_default(),
            );
            let mutex_lock = mutex.lock().await;
    
            let start_time = Instant::now();
    
            resolved_map.insert(
                event_id.clone(),
    
    Timo Kösters's avatar
    Timo Kösters committed
                services()
                    .rooms
                    .event_handler
                    .handle_incoming_pdu(
    
    Nyaaori's avatar
    Nyaaori committed
                        sender_servername,
    
    Timo Kösters's avatar
    Timo Kösters committed
                        &event_id,
                        &room_id,
                        value,
                        true,
                        &pub_key_map,
                    )
                    .await
                    .map(|_| ()),
    
    
            let elapsed = start_time.elapsed();
    
                "Handling transaction of event {} took {}m{}s",
    
                event_id,
                elapsed.as_secs() / 60,
                elapsed.as_secs() % 60
            );
    
        for pdu in &resolved_map {
            if let Err(e) = pdu.1 {
    
                if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
    
                    warn!("Incoming PDU failed {:?}", pdu);
                }
            }
    
        for edu in body
            .edus
            .iter()
    
            .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
    
                Edu::Presence(presence) => {
    
                    if !services().globals.allow_incoming_presence() {
                        continue;
                    }
    
    
                    for update in presence.push {
                        for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
                            services().rooms.edus.presence.set_presence(
                                &room_id?,
                                &update.user_id,
                                update.presence.clone(),
                                Some(update.currently_active),
                                Some(update.last_active_ago),
                                update.status_msg.clone(),
                            )?;
                        }
                    }
                }
    
                Edu::Receipt(receipt) => {
                    for (room_id, room_updates) in receipt.receipts {
                        for (user_id, user_updates) in room_updates.read {
                            if let Some((event_id, _)) = user_updates
                                .event_ids
                                .iter()
                                .filter_map(|id| {
    
    Timo Kösters's avatar
    Timo Kösters committed
                                    services()
                                        .rooms
                                        .timeline
                                        .get_pdu_count(id)
                                        .ok()
                                        .flatten()
                                        .map(|r| (id, r))
    
                                })
                                .max_by_key(|(_, count)| *count)
                            {
                                let mut user_receipts = BTreeMap::new();
                                user_receipts.insert(user_id.clone(), user_updates.data);
    
    
                                let mut receipts = BTreeMap::new();
                                receipts.insert(ReceiptType::Read, user_receipts);
    
    
                                let mut receipt_content = BTreeMap::new();
    
                                receipt_content.insert(event_id.to_owned(), receipts);
    
                                let event = ReceiptEvent {
    
                                    content: ReceiptEventContent(receipt_content),
                                    room_id: room_id.clone(),
    
    Timo Kösters's avatar
    Timo Kösters committed
                                services()
                                    .rooms
                                    .edus
                                    .read_receipt
                                    .readreceipt_update(&user_id, &room_id, event)?;
    
    Timo Kösters's avatar
    Timo Kösters committed
                                // TODO fetch missing events
    
                                debug!("No known event ids in read receipt: {:?}", user_updates);
    
                            }
                        }
                    }
                }
                Edu::Typing(typing) => {
    
    Timo Kösters's avatar
    Timo Kösters committed
                    if services()
                        .rooms
                        .state_cache
                        .is_joined(&typing.user_id, &typing.room_id)?
                    {
    
    Timo Kösters's avatar
    Timo Kösters committed
                            services().rooms.edus.typing.typing_add(
    
                                &typing.user_id,
                                &typing.room_id,
                                3000 + utils::millis_since_unix_epoch(),
                            )?;
                        } else {
    
    Timo Kösters's avatar
    Timo Kösters committed
                            services()
                                .rooms
                                .edus