Skip to content
Snippets Groups Projects
server_server.rs 56 KiB
Newer Older
  • Learn to ignore specific revisions
  • // Conduit implements the older APIs
    
    use std::{
    	collections::BTreeMap,
    	fmt::Debug,
    	mem,
    	net::{IpAddr, SocketAddr},
    
    	time::{Duration, Instant, SystemTime},
    
    Jonas Platte's avatar
    Jonas Platte committed
    use axum::{response::IntoResponse, Json};
    
    use futures_util::future::TryFutureExt;
    
    Timo Kösters's avatar
    Timo Kösters committed
    use get_profile_information::v1::ProfileField;
    
    use hickory_resolver::{error::ResolveError, lookup::SrvLookup};
    
    Timo Kösters's avatar
    Timo Kösters committed
    use http::header::{HeaderValue, AUTHORIZATION};
    
    use ipaddress::IPAddress;
    
    Timo Kösters's avatar
    Timo Kösters committed
    use ruma::{
    
    	api::{
    		client::error::{Error as RumaError, ErrorKind},
    		federation::{
    			authorization::get_event_authorization,
    			backfill::get_backfill,
    			device::get_devices::{self, v1::UserDevice},
    			directory::{get_public_rooms, get_public_rooms_filtered},
    			discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
    			event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
    			keys::{claim_keys, get_keys},
    			membership::{create_invite, create_join_event, prepare_join_event},
    			query::{get_profile_information, get_room_information},
    
    			space::get_hierarchy,
    
    			transactions::{
    				edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
    				send_transaction_message,
    			},
    		},
    		EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse, SendAccessToken,
    	},
    	directory::{Filter, RoomNetwork},
    	events::{
    		receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
    		room::{
    			join_rules::{JoinRule, RoomJoinRulesEventContent},
    			member::{MembershipState, RoomMemberEventContent},
    		},
    		StateEventType, TimelineEventType,
    	},
    	serde::{Base64, JsonObject, Raw},
    	to_device::DeviceIdOrAllDevices,
    	uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
    	OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
    
    timokoesters's avatar
    timokoesters committed
    };
    
    Jonas Platte's avatar
    Jonas Platte committed
    use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
    
    use tracing::{debug, error, info, warn};
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    
    use crate::{
    	api::client_server::{self, claim_keys_helper, get_keys_helper},
    	service::pdu::{gen_event_id_canonical_json, PduBuilder},
    	services, utils, Error, PduEvent, Result, Ruma,
    };
    
    timokoesters's avatar
    timokoesters committed
    
    
    /// Wraps either an literal IP address plus port, or a hostname plus complement
    /// (colon-plus-port if it was specified).
    ///
    /// Note: A `FedDest::Named` might contain an IP address in string form if there
    
    /// was no port specified to construct a `SocketAddr` with.
    
    ///
    /// # Examples:
    
    /// ```rust
    
    Charles Hall's avatar
    Charles Hall committed
    /// # use conduit::api::server_server::FedDest;
    
    /// # fn main() -> Result<(), std::net::AddrParseError> {
    
    /// FedDest::Literal("198.51.100.3:8448".parse()?);
    /// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
    
    /// FedDest::Named("matrix.example.org".to_owned(), String::new());
    
    /// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
    
    /// FedDest::Named("198.51.100.5".to_owned(), String::new());
    
    /// # Ok(())
    /// # }
    
    Nyaaori's avatar
    Nyaaori committed
    #[derive(Clone, Debug, PartialEq, Eq)]
    
    Timo Kösters's avatar
    Timo Kösters committed
    pub enum FedDest {
    
    	Literal(SocketAddr),
    	Named(String, String),
    
    impl FedDest {
    
    	fn into_https_string(self) -> String {
    		match self {
    			Self::Literal(addr) => format!("https://{addr}"),
    			Self::Named(host, port) => format!("https://{host}{port}"),
    		}
    	}
    
    	fn into_uri_string(self) -> String {
    		match self {
    			Self::Literal(addr) => addr.to_string(),
    			Self::Named(host, port) => host + &port,
    		}
    	}
    
    	fn hostname(&self) -> String {
    		match &self {
    			Self::Literal(addr) => addr.ip().to_string(),
    			Self::Named(host, _) => host.clone(),
    		}
    	}
    
    	fn port(&self) -> Option<u16> {
    		match &self {
    			Self::Literal(addr) => Some(addr.port()),
    			Self::Named(_, port) => port[1..].parse().ok(),
    		}
    	}
    
    pub(crate) async fn send_request<T>(destination: &ServerName, request: T) -> Result<T::IncomingResponse>
    
    	T: OutgoingRequest + Debug,
    
    	if !services().globals.allow_federation() {
    		return Err(Error::bad_config("Federation is disabled."));
    	}
    
    	if destination == services().globals.server_name() {
    		return Err(Error::bad_config("Won't send federation request to ourselves"));
    	}
    
    	if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) {
    		info!(
    			"Destination {} is an IP literal, checking against IP range denylist.",
    			destination
    		);
    		let ip = IPAddress::parse(destination.host()).map_err(|e| {
    			warn!("Failed to parse IP literal from string: {}", e);
    			Error::BadServerResponse("Invalid IP address")
    		})?;
    
    		let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
    		let mut cidr_ranges: Vec<IPAddress> = Vec::new();
    
    		for cidr in cidr_ranges_s {
    			cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
    		}
    
    		debug!("List of pushed CIDR ranges: {:?}", cidr_ranges);
    
    		for cidr in cidr_ranges {
    			if cidr.includes(&ip) {
    				return Err(Error::BadServerResponse("Not allowed to send requests to this IP"));
    			}
    		}
    
    		info!("IP literal {} is allowed.", destination);
    	}
    
    	debug!("Preparing to send request to {destination}");
    
    	let mut write_destination_to_cache = false;
    
    
    	let cached_result = services().globals.actual_destination_cache.read().await.get(destination).cloned();
    
    
    	let (actual_destination, host) = if let Some(result) = cached_result {
    		result
    	} else {
    		write_destination_to_cache = true;
    
    		let result = find_actual_destination(destination).await;
    
    		(result.0, result.1.into_uri_string())
    	};
    
    	let actual_destination_str = actual_destination.clone().into_https_string();
    
    	let mut http_request = request
    		.try_into_http_request::<Vec<u8>>(
    			&actual_destination_str,
    			SendAccessToken::IfRequired(""),
    			&[MatrixVersion::V1_5],
    		)
    		.map_err(|e| {
    			warn!("Failed to find destination {}: {}", actual_destination_str, e);
    			Error::BadServerResponse("Invalid destination")
    		})?;
    
    	let mut request_map = serde_json::Map::new();
    
    	if !http_request.body().is_empty() {
    		request_map.insert(
    			"content".to_owned(),
    			serde_json::from_slice(http_request.body()).expect("body is valid json, we just created it"),
    		);
    	};
    
    	request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
    	request_map.insert(
    		"uri".to_owned(),
    		http_request.uri().path_and_query().expect("all requests have a path").to_string().into(),
    	);
    	request_map.insert("origin".to_owned(), services().globals.server_name().as_str().into());
    	request_map.insert("destination".to_owned(), destination.as_str().into());
    
    	let mut request_json = serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
    
    	ruma::signatures::sign_json(
    		services().globals.server_name().as_str(),
    		services().globals.keypair(),
    		&mut request_json,
    	)
    	.expect("our request json is what ruma expects");
    
    	let request_json: serde_json::Map<String, serde_json::Value> =
    		serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
    
    	let signatures = request_json["signatures"]
    		.as_object()
    		.unwrap()
    		.values()
    		.map(|v| v.as_object().unwrap().iter().map(|(k, v)| (k, v.as_str().unwrap())));
    
    	for signature_server in signatures {
    		for s in signature_server {
    			http_request.headers_mut().insert(
    				AUTHORIZATION,
    				HeaderValue::from_str(&format!(
    					"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
    					services().globals.server_name(),
    					s.0,
    					s.1
    				))
    				.unwrap(),
    			);
    		}
    	}
    
    	let reqwest_request = reqwest::Request::try_from(http_request)?;
    
    	let url = reqwest_request.url().clone();
    
    	debug!("Sending request to {destination} at {url}");
    
    	let response = services().globals.client.federation.execute(reqwest_request).await;
    
    	debug!("Received response from {destination} at {url}");
    
    	match response {
    		Ok(mut response) => {
    			// reqwest::Response -> http::Response conversion
    			let status = response.status();
    			let mut http_response_builder = http::Response::builder().status(status).version(response.version());
    			mem::swap(
    				response.headers_mut(),
    				http_response_builder.headers_mut().expect("http::response::Builder is usable"),
    			);
    
    			debug!("Getting response bytes from {destination}");
    			let body = response.bytes().await.unwrap_or_else(|e| {
    				info!("server error {}", e);
    				Vec::new().into()
    			}); // TODO: handle timeout
    			debug!("Got response bytes from {destination}");
    
    			if !status.is_success() {
    				debug!(
    					"Response not successful\n{} {}: {}",
    					url,
    					status,
    					String::from_utf8_lossy(&body).lines().collect::<Vec<_>>().join(" ")
    				);
    			}
    
    			let http_response = http_response_builder.body(body).expect("reqwest body is valid http body");
    
    			if status.is_success() {
    				debug!("Parsing response bytes from {destination}");
    				let response = T::IncomingResponse::try_from_http_response(http_response);
    				if response.is_ok() && write_destination_to_cache {
    					services()
    						.globals
    						.actual_destination_cache
    						.write()
    
    						.insert(OwnedServerName::from(destination), (actual_destination, host));
    				}
    
    				response.map_err(|e| {
    					warn!("Invalid 200 response from {} on: {} {}", &destination, url, e);
    					Error::BadServerResponse("Server returned bad 200 response.")
    				})
    			} else {
    				debug!("Returning error from {destination}");
    
    				// remove potentially dead destinations from our cache that may be from modified
    				// well-knowns
    				if !write_destination_to_cache {
    					info!("Evicting {destination} from our true destination cache due to failed request.");
    
    					services().globals.actual_destination_cache.write().await.remove(destination);
    
    				}
    
    				Err(Error::FederationError(
    					destination.to_owned(),
    					RumaError::from_http_response(http_response),
    				))
    			}
    		},
    		Err(e) => {
    			// we do not need to log that servers in a room are dead, this is normal in
    			// public rooms and just spams the logs.
    
    🥺's avatar
    🥺 committed
    			if e.is_timeout() {
    				debug!(
    
    					"Timed out sending request to {} at {}: {}",
    					destination, actual_destination_str, e
    
    				);
    			} else if e.is_connect() {
    				debug!("Failed to connect to {} at {}: {}", destination, actual_destination_str, e);
    			} else if e.is_redirect() {
    				debug!(
    					"Redirect loop sending request to {} at {}: {}\nFinal URL: {:?}",
    					destination,
    					actual_destination_str,
    					e,
    					e.url()
    				);
    
    🥺's avatar
    🥺 committed
    			} else {
    
    				info!("Could not send request to {} at {}: {}", destination, actual_destination_str, e);
    
    			Err(e.into())
    		},
    	}
    
    timokoesters's avatar
    timokoesters committed
    }
    
    fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
    
    	if let Ok(destination) = destination_str.parse::<SocketAddr>() {
    		Some(FedDest::Literal(destination))
    	} else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
    		Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
    	} else {
    		None
    	}
    
    fn add_port_to_hostname(destination_str: &str) -> FedDest {
    
    	let (host, port) = match destination_str.find(':') {
    		None => (destination_str, ":8448"),
    		Some(pos) => destination_str.split_at(pos),
    	};
    	FedDest::Named(host.to_owned(), port.to_owned())
    
    /// Returns: `actual_destination`, host header
    
    Charles Hall's avatar
    Charles Hall committed
    /// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
    
    /// Numbers in comments below refer to bullet points in linked section of
    /// specification
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
    
    	debug!("Finding actual destination for {destination}");
    	let destination_str = destination.as_str().to_owned();
    	let mut hostname = destination_str.clone();
    	let actual_destination = match get_ip_with_port(&destination_str) {
    		Some(host_port) => {
    			debug!("1: IP literal with provided or default port");
    			host_port
    		},
    		None => {
    			if let Some(pos) = destination_str.find(':') {
    				debug!("2: Hostname with included port");
    
    				let (host, port) = destination_str.split_at(pos);
    
    				query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
    
    
    				FedDest::Named(host.to_owned(), port.to_owned())
    			} else {
    				debug!("Requesting well known for {destination}");
    				match request_well_known(destination.as_str()).await {
    					Some(delegated_hostname) => {
    						debug!("3: A .well-known file is available");
    						hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
    						match get_ip_with_port(&delegated_hostname) {
    							Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
    							None => {
    								if let Some(pos) = delegated_hostname.find(':') {
    									debug!("3.2: Hostname with port in .well-known file");
    
    									let (host, port) = delegated_hostname.split_at(pos);
    
    									query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
    
    
    									FedDest::Named(host.to_owned(), port.to_owned())
    								} else {
    									debug!("Delegated hostname has no port in this branch");
    									if let Some(hostname_override) = query_srv_record(&delegated_hostname).await {
    										debug!("3.3: SRV lookup successful");
    
    
    										let force_port = hostname_override.port();
    										query_and_cache_override(
    											&delegated_hostname,
    											&hostname_override.hostname(),
    											force_port.unwrap_or(8448),
    										)
    										.await;
    
    
    										if let Some(port) = force_port {
    											FedDest::Named(delegated_hostname, format!(":{port}"))
    										} else {
    											add_port_to_hostname(&delegated_hostname)
    										}
    									} else {
    										debug!("3.4: No SRV records, just use the hostname from .well-known");
    
    										query_and_cache_override(&delegated_hostname, &delegated_hostname, 8448).await;
    
    										add_port_to_hostname(&delegated_hostname)
    									}
    								}
    							},
    						}
    					},
    					None => {
    						debug!("4: No .well-known or an error occured");
    						match query_srv_record(&destination_str).await {
    							Some(hostname_override) => {
    								debug!("4: SRV record found");
    
    
    								let force_port = hostname_override.port();
    								query_and_cache_override(
    									&hostname,
    									&hostname_override.hostname(),
    									force_port.unwrap_or(8448),
    								)
    								.await;
    
    
    								if let Some(port) = force_port {
    									FedDest::Named(hostname.clone(), format!(":{port}"))
    								} else {
    									add_port_to_hostname(&hostname)
    								}
    							},
    							None => {
    								debug!("5: No SRV record found");
    
    								query_and_cache_override(&destination_str, &destination_str, 8448).await;
    
    								add_port_to_hostname(&destination_str)
    							},
    						}
    					},
    				}
    			}
    		},
    	};
    
    	// Can't use get_ip_with_port here because we don't want to add a port
    	// to an IP address if it wasn't specified
    	let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
    		FedDest::Literal(addr)
    	} else if let Ok(addr) = hostname.parse::<IpAddr>() {
    		FedDest::Named(addr.to_string(), ":8448".to_owned())
    	} else if let Some(pos) = hostname.find(':') {
    		let (host, port) = hostname.split_at(pos);
    		FedDest::Named(host.to_owned(), port.to_owned())
    	} else {
    		FedDest::Named(hostname, ":8448".to_owned())
    	};
    
    
    	debug!("Actual destination: {actual_destination:?} hostname: {hostname:?}");
    
    	(actual_destination, hostname)
    
    async fn query_and_cache_override(overname: &'_ str, hostname: &'_ str, port: u16) {
    	match services().globals.dns_resolver().lookup_ip(hostname.to_owned()).await {
    		Ok(override_ip) => {
    			debug!("Caching result of {:?} overriding {:?}", hostname, overname);
    
    			services()
    				.globals
    				.tls_name_override
    				.write()
    				.unwrap()
    				.insert(overname.to_owned(), (override_ip.iter().collect(), port));
    		},
    		Err(e) => {
    			debug!("Got {:?} for {:?} to override {:?}", e.kind(), hostname, overname);
    		},
    	}
    }
    
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
    
    	fn handle_successful_srv(srv: &SrvLookup) -> Option<FedDest> {
    
    		srv.iter().next().map(|result| {
    			FedDest::Named(
    				result.target().to_string().trim_end_matches('.').to_owned(),
    				format!(":{}", result.port()),
    			)
    		})
    	}
    
    	async fn lookup_srv(hostname: &str) -> Result<SrvLookup, ResolveError> {
    		debug!("querying SRV for {:?}", hostname);
    		let hostname = hostname.trim_end_matches('.');
    		services().globals.dns_resolver().srv_lookup(hostname.to_owned()).await
    	}
    
    	let first_hostname = format!("_matrix-fed._tcp.{hostname}.");
    	let second_hostname = format!("_matrix._tcp.{hostname}.");
    
    	lookup_srv(&first_hostname)
    		.or_else(|_| {
    
    			debug!("Querying deprecated _matrix SRV record for host {:?}", hostname);
    
    			lookup_srv(&second_hostname)
    		})
    
    		.and_then(|srv_lookup| async move { Ok(handle_successful_srv(&srv_lookup)) })
    
    		.await
    		.ok()
    		.flatten()
    
    Timo Kösters's avatar
    Timo Kösters committed
    async fn request_well_known(destination: &str) -> Option<String> {
    
    	if !services().globals.tls_name_override.read().unwrap().contains_key(destination) {
    		query_and_cache_override(destination, destination, 8448).await;
    	}
    
    
    	let response = services()
    		.globals
    
    		.get(&format!("https://{destination}/.well-known/matrix/server"))
    		.send()
    		.await;
    	debug!("Got well known response");
    	debug!("Well known response: {:?}", response);
    
    	if let Err(e) = &response {
    		debug!("Well known error: {e:?}");
    		return None;
    	}
    
    	let text = response.ok()?.text().await;
    
    	debug!("Got well known response text");
    	debug!("Well known response text: {:?}", text);
    
    	if text.as_ref().ok()?.len() > 10000 {
    
    		debug!(
    
    			"Well known response for destination '{destination}' exceeded past 10000 characters, assuming no \
    			 well-known."
    		);
    		return None;
    	}
    
    	let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
    	debug!("serde_json body of well known text: {}", body);
    
    	Some(body.get("m.server")?.as_str()?.to_owned())
    
    /// # `GET /_matrix/federation/v1/version`
    ///
    /// Get version information on this server.
    
    Jonas Platte's avatar
    Jonas Platte committed
    pub async fn get_server_version_route(
    
    	_body: Ruma<get_server_version::v1::Request>,
    
    ) -> Result<get_server_version::v1::Response> {
    
    	if !services().globals.allow_federation() {
    		return Err(Error::bad_config("Federation is disabled."));
    	}
    
    	Ok(get_server_version::v1::Response {
    		server: Some(get_server_version::v1::Server {
    			name: Some("Conduwuit".to_owned()),
    			version: Some(env!("CARGO_PKG_VERSION").to_owned()),
    		}),
    	})
    
    /// # `GET /_matrix/key/v2/server`
    ///
    /// Gets the public signing keys of this server.
    ///
    
    /// - Matrix does not support invalidating public keys, so the key returned by
    ///   this will be valid
    
    // Response type for this endpoint is Json because we need to calculate a
    // signature for the response
    
    pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
    
    	if !services().globals.allow_federation() {
    		return Err(Error::bad_config("Federation is disabled."));
    	}
    
    	let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
    	verify_keys.insert(
    		format!("ed25519:{}", services().globals.keypair().version())
    			.try_into()
    			.expect("found invalid server signing keys in DB"),
    		VerifyKey {
    			key: Base64::new(services().globals.keypair().public_key().to_vec()),
    		},
    	);
    	let mut response = serde_json::from_slice(
    		get_server_keys::v2::Response {
    			server_key: Raw::new(&ServerSigningKeys {
    				server_name: services().globals.server_name().to_owned(),
    				verify_keys,
    				old_verify_keys: BTreeMap::new(),
    				signatures: BTreeMap::new(),
    				valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
    					SystemTime::now() + Duration::from_secs(86400 * 7),
    				)
    				.expect("time is valid"),
    			})
    			.expect("static conversion, no errors"),
    		}
    		.try_into_http_response::<Vec<u8>>()
    		.unwrap()
    		.body(),
    	)
    	.unwrap();
    
    	ruma::signatures::sign_json(
    		services().globals.server_name().as_str(),
    		services().globals.keypair(),
    		&mut response,
    	)
    	.unwrap();
    
    	Ok(Json(response))
    
    /// # `GET /_matrix/key/v2/server/{keyId}`
    ///
    /// Gets the public signing keys of this server.
    ///
    
    /// - Matrix does not support invalidating public keys, so the key returned by
    ///   this will be valid
    
    pub async fn get_server_keys_deprecated_route() -> impl IntoResponse { get_server_keys_route().await }
    
    /// # `POST /_matrix/federation/v1/publicRooms`
    ///
    /// Lists the public rooms on this server.
    
    pub async fn get_public_rooms_filtered_route(
    
    	body: Ruma<get_public_rooms_filtered::v1::Request>,
    
    ) -> Result<get_public_rooms_filtered::v1::Response> {
    
    	if !services().globals.allow_public_room_directory_over_federation() {
    		return Err(Error::bad_config("Room directory is not public."));
    	}
    
    	let response = client_server::get_public_rooms_filtered_helper(
    		None,
    		body.limit,
    		body.since.as_deref(),
    		&body.filter,
    		&body.room_network,
    	)
    	.await?;
    
    	Ok(get_public_rooms_filtered::v1::Response {
    		chunk: response.chunk,
    		prev_batch: response.prev_batch,
    		next_batch: response.next_batch,
    		total_room_count_estimate: response.total_room_count_estimate,
    	})
    
    /// # `GET /_matrix/federation/v1/publicRooms`
    ///
    /// Lists the public rooms on this server.
    
    pub async fn get_public_rooms_route(
    
    	body: Ruma<get_public_rooms::v1::Request>,
    
    ) -> Result<get_public_rooms::v1::Response> {
    
    	if !services().globals.allow_public_room_directory_over_federation() {
    		return Err(Error::bad_config("Room directory is not public."));
    	}
    
    	let response = client_server::get_public_rooms_filtered_helper(
    		None,
    		body.limit,
    		body.since.as_deref(),
    		&Filter::default(),
    		&RoomNetwork::Matrix,
    	)
    	.await?;
    
    	Ok(get_public_rooms::v1::Response {
    		chunk: response.chunk,
    		prev_batch: response.prev_batch,
    		next_batch: response.next_batch,
    		total_room_count_estimate: response.total_room_count_estimate,
    	})
    
    pub fn parse_incoming_pdu(pdu: &RawJsonValue) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
    	let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
    		warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
    		Error::BadServerResponse("Invalid PDU in server response")
    	})?;
    
    	let room_id: OwnedRoomId = value
    		.get("room_id")
    		.and_then(|id| RoomId::parse(id.as_str()?).ok())
    		.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
    
    	let room_version_id = services().rooms.state.get_room_version(&room_id)?;
    
    	let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
    		Ok(t) => t,
    		Err(_) => {
    			// Event could not be converted to canonical json
    			return Err(Error::BadRequest(
    				ErrorKind::InvalidParam,
    				"Could not convert event to canonical json.",
    			));
    		},
    	};
    	Ok((event_id, value, room_id))
    
    /// # `PUT /_matrix/federation/v1/send/{txnId}`
    ///
    /// Push EDUs and PDUs to this server.
    
    pub async fn send_transaction_message_route(
    
    	body: Ruma<send_transaction_message::v1::Request>,
    
    ) -> Result<send_transaction_message::v1::Response> {
    
    	let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
    
    	let mut resolved_map = BTreeMap::new();
    
    	let pub_key_map = RwLock::new(BTreeMap::new());
    
    	// This is all the auth_events that have been recursively fetched so they don't
    	// have to be deserialized over and over again.
    	// TODO: make this persist across requests but not in a DB Tree (in globals?)
    	// TODO: This could potentially also be some sort of trie (suffix tree) like
    	// structure so that once an auth event is known it would know (using indexes
    	// maybe) all of the auth events that it references.
    	// let mut auth_cache = EventMap::new();
    
    	let mut parsed_pdus = vec![];
    	for pdu in &body.pdus {
    		let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
    			warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
    			Error::BadServerResponse("Invalid PDU in server response")
    		})?;
    		let room_id: OwnedRoomId = value
    			.get("room_id")
    			.and_then(|id| RoomId::parse(id.as_str()?).ok())
    			.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
    
    		if services().rooms.state.get_room_version(&room_id).is_err() {
    			debug!("Server is not in room {room_id}");
    			continue;
    		}
    
    		let r = parse_incoming_pdu(pdu);
    		let (event_id, value, room_id) = match r {
    			Ok(t) => t,
    			Err(e) => {
    				warn!("Could not parse PDU: {e}");
    
    				info!("Full PDU: {:?}", &pdu);
    
    				continue;
    			},
    		};
    		parsed_pdus.push((event_id, value, room_id));
    		// We do not add the event_id field to the pdu here because of signature
    		// and hashes checks
    	}
    
    	// We go through all the signatures we see on the PDUs and fetch the
    	// corresponding signing keys
    	services()
    		.rooms
    		.event_handler
    		.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
    		.await
    		.unwrap_or_else(|e| {
    			warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
    		});
    
    	for (event_id, value, room_id) in parsed_pdus {
    		let mutex =
    
    			Arc::clone(services().globals.roomid_mutex_federation.write().await.entry(room_id.clone()).or_default());
    
    		let mutex_lock = mutex.lock().await;
    		let start_time = Instant::now();
    		resolved_map.insert(
    			event_id.clone(),
    			services()
    				.rooms
    				.event_handler
    				.handle_incoming_pdu(sender_servername, &event_id, &room_id, value, true, &pub_key_map)
    				.await
    				.map(|_| ()),
    		);
    		drop(mutex_lock);
    
    		let elapsed = start_time.elapsed();
    		debug!(
    			"Handling transaction of event {} took {}m{}s",
    			event_id,
    			elapsed.as_secs() / 60,
    			elapsed.as_secs() % 60
    		);
    	}
    
    	for pdu in &resolved_map {
    		if let Err(e) = pdu.1 {
    			if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
    				warn!("Incoming PDU failed {:?}", pdu);
    			}
    		}
    	}
    
    	for edu in body.edus.iter().filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) {
    		match edu {
    			Edu::Presence(presence) => {
    				if !services().globals.allow_incoming_presence() {
    					continue;
    				}
    
    				for update in presence.push {
    					for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
    						services().rooms.edus.presence.set_presence(
    							&room_id?,
    							&update.user_id,
    							update.presence.clone(),
    							Some(update.currently_active),
    							Some(update.last_active_ago),
    							update.status_msg.clone(),
    						)?;
    					}
    				}
    			},
    			Edu::Receipt(receipt) => {
    
    				if !services().globals.allow_incoming_read_receipts() {
    					continue;
    				}
    
    
    				for (room_id, room_updates) in receipt.receipts {
    					for (user_id, user_updates) in room_updates.read {
    						if let Some((event_id, _)) = user_updates
    							.event_ids
    							.iter()
    							.filter_map(|id| {
    								services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r))
    							})
    							.max_by_key(|(_, count)| *count)
    						{
    							let mut user_receipts = BTreeMap::new();
    							user_receipts.insert(user_id.clone(), user_updates.data);
    
    							let mut receipts = BTreeMap::new();
    							receipts.insert(ReceiptType::Read, user_receipts);
    
    							let mut receipt_content = BTreeMap::new();
    							receipt_content.insert(event_id.to_owned(), receipts);
    
    							let event = ReceiptEvent {
    								content: ReceiptEventContent(receipt_content),
    								room_id: room_id.clone(),
    							};
    							services().rooms.edus.read_receipt.readreceipt_update(&user_id, &room_id, event)?;
    						} else {
    							// TODO fetch missing events
    							debug!("No known event ids in read receipt: {:?}", user_updates);
    						}
    					}
    				}
    			},
    			Edu::Typing(typing) => {
    				if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? {
    					if typing.typing {
    
    						services()
    							.rooms
    							.edus
    							.typing
    							.typing_add(&typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch())
    							.await?;
    
    						services().rooms.edus.typing.typing_remove(&typing.user_id, &typing.room_id).await?;
    
    					}
    				}
    			},
    			Edu::DeviceListUpdate(DeviceListUpdateContent {
    				user_id,
    				..
    			}) => {
    				services().users.mark_device_key_update(&user_id)?;
    			},
    			Edu::DirectToDevice(DirectDeviceContent {
    				sender,
    				ev_type,
    				message_id,
    				messages,
    			}) => {
    				// Check if this is a new transaction id
    				if services().transaction_ids.existing_txnid(&sender, None, &message_id)?.is_some() {
    					continue;
    				}
    
    				for (target_user_id, map) in &messages {
    					for (target_device_id_maybe, event) in map {
    						match target_device_id_maybe {
    							DeviceIdOrAllDevices::DeviceId(target_device_id) => {
    								services().users.add_to_device_event(
    									&sender,
    									target_user_id,
    									target_device_id,
    									&ev_type.to_string(),
    									event.deserialize_as().map_err(|e| {
    										warn!("To-Device event is invalid: {event:?} {e}");
    										Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
    									})?,
    								)?;
    							},
    
    							DeviceIdOrAllDevices::AllDevices => {
    								for target_device_id in services().users.all_device_ids(target_user_id) {
    									services().users.add_to_device_event(
    										&sender,
    										target_user_id,
    										&target_device_id?,
    										&ev_type.to_string(),
    										event.deserialize_as().map_err(|_| {
    											Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
    										})?,
    									)?;
    								}
    							},
    						}
    					}
    				}
    
    				// Save transaction id with empty data
    				services().transaction_ids.add_txnid(&sender, None, &message_id, &[])?;
    			},
    			Edu::SigningKeyUpdate(SigningKeyUpdateContent {
    				user_id,
    				master_key,
    				self_signing_key,
    			}) => {
    				if user_id.server_name() != sender_servername {
    					continue;
    				}
    				if let Some(master_key) = master_key {
    					services().users.add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?;
    				}
    			},
    			Edu::_Custom(_) => {},
    		}
    	}
    
    	Ok(send_transaction_message::v1::Response {
    		pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))).collect(),
    	})
    
    /// # `GET /_matrix/federation/v1/event/{eventId}`
    ///
    /// Retrieves a single event from the server.
    ///
    
    /// - Only works if a user of this server is currently invited or joined the
    ///   room
    pub async fn get_event_route(body: Ruma<get_event::v1::Request>) -> Result<get_event::v1::Response> {
    	let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
    
    	let event = services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(|| {
    		warn!("Event not found, event ID: {:?}", &body.event_id);
    		Error::BadRequest(ErrorKind::NotFound, "Event not found.")
    	})?;
    
    	let room_id_str = event
    		.get("room_id")
    		.and_then(|val| val.as_str())
    		.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
    
    	let room_id = <&RoomId>::try_from(room_id_str)
    		.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
    
    	if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? {
    		return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not in room"));
    	}
    
    	if !services().rooms.state_accessor.server_can_see_event(sender_servername, room_id, &body.event_id)? {
    		return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not allowed to see event."));
    	}
    
    	Ok(get_event::v1::Response {
    		origin: services().globals.server_name().to_owned(),
    		origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
    		pdu: PduEvent::convert_to_outgoing_federation_event(event),
    	})
    
    /// # `GET /_matrix/federation/v1/backfill/<room_id>`
    ///
    /// Retrieves events from before the sender joined the room, if the room's
    /// history visibility allows.
    
    pub async fn get_backfill_route(body: Ruma<get_backfill::v1::Request>) -> Result<get_backfill::v1::Response> {
    	let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
    
    	debug!("Got backfill request from: {}", sender_servername);
    
    	if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
    		return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not in room."));
    	}
    
    	services().rooms.event_handler.acl_check(sender_servername, &body.room_id)?;
    
    	let until = body