Newer
Older
Timo Kösters
committed
#![allow(deprecated)]
// Conduit implements the older APIs
Timo Kösters
committed
use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder},
services, utils, Error, PduEvent, Result, Ruma,
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
membership::{create_invite, create_join_event, prepare_join_event},
query::{get_profile_information, get_room_information},
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
send_transaction_message,
},
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse,
SendAccessToken,
join_rules::{JoinRule, RoomJoinRulesEventContent},
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId,
ServerName,
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
net::{IpAddr, SocketAddr},
time::{Duration, Instant, SystemTime},
use trust_dns_resolver::{error::ResolveError, lookup::SrvLookup};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
///
/// Note: A `FedDest::Named` might contain an IP address in string form if there
/// was no port specified to construct a SocketAddr with.
///
/// # Examples:
/// # fn main() -> Result<(), std::net::AddrParseError> {
/// FedDest::Literal("198.51.100.3:8448".parse()?);
/// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
/// FedDest::Named("matrix.example.org".to_owned(), "".to_owned());
/// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
/// FedDest::Named("198.51.100.5".to_owned(), "".to_owned());
Literal(SocketAddr),
Named(String, String),
}
fn into_https_string(self) -> String {
match self {
Self::Literal(addr) => format!("https://{addr}"),
Self::Named(host, port) => format!("https://{host}{port}"),
}
}
fn into_uri_string(self) -> String {
match self {
Self::Literal(addr) => addr.to_string(),
}
}
fn hostname(&self) -> String {
match &self {
Self::Literal(addr) => addr.ip().to_string(),
Self::Named(host, _) => host.clone(),
}
}
fn port(&self) -> Option<u16> {
match &self {
Self::Literal(addr) => Some(addr.port()),
Self::Named(_, port) => port[1..].parse().ok(),
}
}
}
pub(crate) async fn send_request<T>(
) -> Result<T::IncomingResponse>
where
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
if destination == services().globals.server_name() {
return Err(Error::bad_config(
"Won't send federation request to ourselves",
));
}
if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) {
info!(
"Destination {} is an IP literal, checking against IP range denylist.",
destination
);
let ip = IPAddress::parse(destination.host()).map_err(|e| {
warn!("Failed to parse IP literal from string: {}", e);
Error::BadServerResponse("Invalid IP address")
})?;
let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
let mut cidr_ranges: Vec<IPAddress> = Vec::new();
for cidr in cidr_ranges_s {
cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
}
debug!("List of pushed CIDR ranges: {:?}", cidr_ranges);
return Err(Error::BadServerResponse(
"Not allowed to send requests to this IP",
));
}
}
info!("IP literal {} is allowed.", destination);
debug!("Preparing to send request to {destination}");
.actual_destination_cache
.read()
.unwrap()
let (actual_destination, host) = if let Some(result) = cached_result {
let result = find_actual_destination(destination).await;
let actual_destination_str = actual_destination.clone().into_https_string();
let mut http_request = request
.try_into_http_request::<Vec<u8>>(
&actual_destination_str,
SendAccessToken::IfRequired(""),
warn!(
"Failed to find destination {}: {}",
actual_destination_str, e
);
Error::BadServerResponse("Invalid destination")
})?;
let mut request_map = serde_json::Map::new();
if !http_request.body().is_empty() {
request_map.insert(
"content".to_owned(),
serde_json::from_slice(http_request.body())
.expect("body is valid json, we just created it"),
request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
request_map.insert(
"uri".to_owned(),
http_request
.uri()
.path_and_query()
.expect("all requests have a path")
.to_string()
.into(),
);
request_map.insert(
"origin".to_owned(),
services().globals.server_name().as_str().into(),
);
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json =
serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
services().globals.server_name().as_str(),
services().globals.keypair(),
.expect("our request json is what ruma expects");
let request_json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
let signatures = request_json["signatures"]
.as_object()
.unwrap()
.values()
.map(|v| {
v.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k, v.as_str().unwrap()))
});
for signature_server in signatures {
for s in signature_server {
http_request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_str(&format!(
"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
s.0,
s.1
))
.unwrap(),
);
}
let reqwest_request = reqwest::Request::try_from(http_request)?;
let url = reqwest_request.url().clone();
debug!("Sending request to {destination} at {url}");
let response = services()
.globals
.federation_client()
.execute(reqwest_request)
.await;
debug!("Received response from {destination} at {url}");
match response {
Ok(mut response) => {
// reqwest::Response -> http::Response conversion
let status = response.status();
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
mem::swap(
response.headers_mut(),
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
debug!("Getting response bytes from {destination}");
let body = response.bytes().await.unwrap_or_else(|e| {
Vec::new().into()
}); // TODO: handle timeout
debug!("Got response bytes from {destination}");
debug!(
"Response not successful\n{} {}: {}",
String::from_utf8_lossy(&body)
.lines()
.collect::<Vec<_>>()
.join(" ")
let http_response = http_response_builder
.body(body)
.expect("reqwest body is valid http body");
debug!("Parsing response bytes from {destination}");
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
services()
.globals
.actual_destination_cache
.write()
.unwrap()
.insert(
response.map_err(|e| {
warn!(
"Invalid 200 response from {} on: {} {}",
&destination, url, e
);
Error::BadServerResponse("Server returned bad 200 response.")
})
debug!("Returning error from {destination}");
// remove potentially dead destinations from our cache that may be from modified well-knowns
if !write_destination_to_cache {
info!("Evicting {destination} from our true destination cache due to failed request.");
services()
.globals
.actual_destination_cache
.write()
.unwrap()
.remove(destination);
}
Err(Error::FederationError(
destination.to_owned(),
// we do not need to log that servers in a room are dead, this is normal in public rooms and just spams the logs.
match e.is_timeout() {
true => info!(
"Timed out sending request to {} at {}: {}",
destination, actual_destination_str, e
),
false => match e.is_connect() {
true => info!(
"Failed to connect to {} at {}: {}",
destination, actual_destination_str, e
),
false => match e.is_redirect() {
true => info!(
"Redirect loop sending request to {} at {}: {}\nFinal URL: {:?}",
destination,
actual_destination_str,
e,
e.url()
),
false => warn!(
"Could not send request to {} at {}: {}",
destination, actual_destination_str, e
),
},
},
fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
if let Ok(destination) = destination_str.parse::<SocketAddr>() {
Some(FedDest::Literal(destination))
} else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
} else {
None
}
}
fn add_port_to_hostname(destination_str: &str) -> FedDest {
let (host, port) = match destination_str.find(':') {
None => (destination_str, ":8448"),
Some(pos) => destination_str.split_at(pos),
};
FedDest::Named(host.to_owned(), port.to_owned())
/// Returns: actual_destination, host header
/// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
/// Numbers in comments below refer to bullet points in linked section of specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
debug!("Finding actual destination for {destination}");
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
debug!("1: IP literal with provided or default port");
host_port
}
None => {
if let Some(pos) = destination_str.find(':') {
let (host, port) = destination_str.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
debug!("Requesting well known for {destination}");
match request_well_known(destination.as_str()).await {
debug!("3: A .well-known file is available");
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
match get_ip_with_port(&delegated_hostname) {
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
None => {
if let Some(pos) = delegated_hostname.find(':') {
debug!("3.2: Hostname with port in .well-known file");
let (host, port) = delegated_hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
debug!("Delegated hostname has no port in this branch");
query_srv_record(&delegated_hostname).await
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
services()
.globals
.tls_name_override
.write()
.unwrap()
.insert(
delegated_hostname.clone(),
(
override_ip.iter().collect(),
force_port.unwrap_or(8448),
),
);
debug!(
"Using SRV record {}, but could not resolve to IP",
hostname_override.hostname()
);
} else {
add_port_to_hostname(&delegated_hostname)
}
} else {
debug!("3.4: No SRV records, just use the hostname from .well-known");
debug!("4: No .well-known or an error occured");
match query_srv_record(&destination_str).await {
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
services()
.globals
.tls_name_override
.write()
.unwrap()
.insert(
hostname.clone(),
(
override_ip.iter().collect(),
force_port.unwrap_or(8448),
),
);
debug!(
"Using SRV record {}, but could not resolve to IP",
hostname_override.hostname()
);
} else {
add_port_to_hostname(&hostname)
}
}
debug!("Actual destination: {actual_destination:?}");
// Can't use get_ip_with_port here because we don't want to add a port
// to an IP address if it wasn't specified
let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
FedDest::Literal(addr)
} else if let Ok(addr) = hostname.parse::<IpAddr>() {
FedDest::Named(addr.to_string(), ":8448".to_owned())
} else if let Some(pos) = hostname.find(':') {
let (host, port) = hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
FedDest::Named(hostname, ":8448".to_owned())
(actual_destination, hostname)
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
fn handle_successful_srv(srv: SrvLookup) -> Option<FedDest> {
srv.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port()),
)
async fn lookup_srv(hostname: &str) -> Result<SrvLookup, ResolveError> {
debug!("querying SRV for {:?}", hostname);
let hostname = hostname.trim_end_matches('.');
services()
.globals
.dns_resolver()
.srv_lookup(hostname.to_owned())
.await
}
let first_hostname = format!("_matrix-fed._tcp.{hostname}.");
let second_hostname = format!("_matrix._tcp.{hostname}.");
lookup_srv(&first_hostname)
.or_else(|_| {
info!(
"Querying deprecated _matrix SRV record for host {:?}",
hostname
);
lookup_srv(&second_hostname)
})
.and_then(|srv_lookup| async { Ok(handle_successful_srv(srv_lookup)) })
.await
.ok()
.flatten()
async fn request_well_known(destination: &str) -> Option<String> {
let response = services()
.globals
.default_client()
.get(&format!("https://{destination}/.well-known/matrix/server"))
.send()
.await;
debug!("Well known response: {:?}", response);
debug!("Well known error: {e:?}");
return None;
debug!("Well known response text: {:?}", text);
if text.as_ref().ok()?.len() > 10000 {
info!("Well known response for destination '{destination}' exceeded past 10000 characters, assuming no well-known.");
return None;
}
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
debug!("serde_json body of well known text: {}", body);
Some(body.get("m.server")?.as_str()?.to_owned())
}
/// # `GET /_matrix/federation/v1/version`
///
/// Get version information on this server.
pub async fn get_server_version_route(
_body: Ruma<get_server_version::v1::Request>,
) -> Result<get_server_version::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
Ok(get_server_version::v1::Response {
server: Some(get_server_version::v1::Server {
sininenkissa
committed
name: Some("Conduwuit".to_owned()),
version: Some(env!("CARGO_PKG_VERSION").to_owned()),
/// # `GET /_matrix/key/v2/server`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by this will be valid
/// forever.
// Response type for this endpoint is Json because we need to calculate a signature for the response
pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
format!("ed25519:{}", services().globals.keypair().version())
.try_into()
.expect("found invalid server signing keys in DB"),
key: Base64::new(services().globals.keypair().public_key().to_vec()),
},
);
let mut response = serde_json::from_slice(
server_name: services().globals.server_name().to_owned(),
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(86400 * 7),
)
.expect("time is valid"),
services().globals.server_name().as_str(),
services().globals.keypair(),
Ok(Json(response))
/// # `GET /_matrix/key/v2/server/{keyId}`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by this will be valid
/// forever.
pub async fn get_server_keys_deprecated_route() -> impl IntoResponse {
get_server_keys_route().await
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_filtered_route(
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::bad_config("Room directory is not public."));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
Ok(get_public_rooms_filtered::v1::Response {
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_route(
) -> Result<get_public_rooms::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::bad_config("Room directory is not public."));
}
let response = client_server::get_public_rooms_filtered_helper(
Ok(get_public_rooms::v1::Response {
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
pub fn parse_incoming_pdu(
pdu: &RawJsonValue,
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid room id in pdu",
))?;
let room_version_id = services().rooms.state.get_room_version(&room_id)?;
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
}
};
Ok((event_id, value, room_id))
}
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
pub async fn send_transaction_message_route(
) -> Result<send_transaction_message::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
let mut resolved_map = BTreeMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
// This is all the auth_events that have been recursively fetched so they don't have to be
// deserialized over and over again.
// TODO: make this persist across requests but not in a DB Tree (in globals?)
// TODO: This could potentially also be some sort of trie (suffix tree) like structure so
// that once an auth event is known it would know (using indexes maybe) all of the auth
// events that it references.
// let mut auth_cache = EventMap::new();
for pdu in &body.pdus {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid room id in pdu",
))?;
if services().rooms.state.get_room_version(&room_id).is_err() {
debug!("Server is not in room {room_id}");
continue;
}
let r = parse_incoming_pdu(pdu);
let (event_id, value, room_id) = match r {
Ok(t) => t,
Err(e) => {
warn!("Could not parse PDU: {e}");
warn!("Full PDU: {:?}", &pdu);
parsed_pdus.push((event_id, value, room_id));
// We do not add the event_id field to the pdu here because of signature and hashes checks
}
// We go through all the signatures we see on the PDUs and fetch the corresponding
// signing keys
services()
.rooms
.event_handler
.fetch_required_signing_keys(
parsed_pdus.iter().map(|(_event_id, event, _room_id)| event),
&pub_key_map,
)
.await
.unwrap_or_else(|e| {
warn!(
"Could not fetch all signatures for PDUs from {}: {:?}",
sender_servername, e
for (event_id, value, room_id) in parsed_pdus {
let mutex = Arc::clone(
.write()
.unwrap()
.or_default(),
);
let mutex_lock = mutex.lock().await;
let start_time = Instant::now();
resolved_map.insert(
event_id.clone(),
services()
.rooms
.event_handler
.handle_incoming_pdu(
&event_id,
&room_id,
value,
true,
&pub_key_map,
)
.await
.map(|_| ()),
drop(mutex_lock);
let elapsed = start_time.elapsed();
"Handling transaction of event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
for edu in body
.edus
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
Edu::Presence(presence) => {
if !services().globals.allow_incoming_presence() {
continue;
}
for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&update.user_id,
update.presence.clone(),
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
}
}
Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
}
Edu::Typing(typing) => {
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
if typing.typing {
&typing.user_id,
&typing.room_id,
3000 + utils::millis_since_unix_epoch(),
)?;
} else {