Newer
Older
Timo Kösters
committed
#![allow(deprecated)]
// Conduit implements the older APIs
Timo Kösters
committed
use std::{
collections::BTreeMap,
fmt::Debug,
mem,
net::{IpAddr, SocketAddr},
sync::Arc,
use hickory_resolver::{error::ResolveError, lookup::SrvLookup};
api::{
client::error::{Error as RumaError, ErrorKind},
federation::{
authorization::get_event_authorization,
backfill::get_backfill,
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
membership::{create_invite, create_join_event, prepare_join_event},
query::{get_profile_information, get_room_information},
transactions::{
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
send_transaction_message,
},
},
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse, SendAccessToken,
},
directory::{Filter, RoomNetwork},
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
room::{
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
},
StateEventType, TimelineEventType,
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, RoomVersionId, ServerName,
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::RwLock;
use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder},
services, utils, Error, PduEvent, Result, Ruma,
};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
///
/// Note: A `FedDest::Named` might contain an IP address in string form if there
/// was no port specified to construct a `SocketAddr` with.
/// # fn main() -> Result<(), std::net::AddrParseError> {
/// FedDest::Literal("198.51.100.3:8448".parse()?);
/// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
/// FedDest::Named("matrix.example.org".to_owned(), String::new());
/// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
/// FedDest::Named("198.51.100.5".to_owned(), String::new());
Literal(SocketAddr),
Named(String, String),
fn into_https_string(self) -> String {
match self {
Self::Literal(addr) => format!("https://{addr}"),
Self::Named(host, port) => format!("https://{host}{port}"),
}
}
fn into_uri_string(self) -> String {
match self {
Self::Literal(addr) => addr.to_string(),
Self::Named(host, port) => host + &port,
}
}
fn hostname(&self) -> String {
match &self {
Self::Literal(addr) => addr.ip().to_string(),
Self::Named(host, _) => host.clone(),
}
}
fn port(&self) -> Option<u16> {
match &self {
Self::Literal(addr) => Some(addr.port()),
Self::Named(_, port) => port[1..].parse().ok(),
}
}
}
pub(crate) async fn send_request<T>(destination: &ServerName, request: T) -> Result<T::IncomingResponse>
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
if destination == services().globals.server_name() {
return Err(Error::bad_config("Won't send federation request to ourselves"));
}
if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) {
info!(
"Destination {} is an IP literal, checking against IP range denylist.",
destination
);
let ip = IPAddress::parse(destination.host()).map_err(|e| {
warn!("Failed to parse IP literal from string: {}", e);
Error::BadServerResponse("Invalid IP address")
})?;
let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
let mut cidr_ranges: Vec<IPAddress> = Vec::new();
for cidr in cidr_ranges_s {
cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
}
debug!("List of pushed CIDR ranges: {:?}", cidr_ranges);
for cidr in cidr_ranges {
if cidr.includes(&ip) {
return Err(Error::BadServerResponse("Not allowed to send requests to this IP"));
}
}
info!("IP literal {} is allowed.", destination);
}
debug!("Preparing to send request to {destination}");
let mut write_destination_to_cache = false;
let cached_result = services()
.globals
.actual_destinations()
.read()
.await
.get(destination)
.cloned();
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
let (actual_destination, host) = if let Some(result) = cached_result {
result
} else {
write_destination_to_cache = true;
let result = find_actual_destination(destination).await;
(result.0, result.1.into_uri_string())
};
let actual_destination_str = actual_destination.clone().into_https_string();
let mut http_request = request
.try_into_http_request::<Vec<u8>>(
&actual_destination_str,
SendAccessToken::IfRequired(""),
&[MatrixVersion::V1_5],
)
.map_err(|e| {
warn!("Failed to find destination {}: {}", actual_destination_str, e);
Error::BadServerResponse("Invalid destination")
})?;
let mut request_map = serde_json::Map::new();
if !http_request.body().is_empty() {
request_map.insert(
"content".to_owned(),
serde_json::from_slice(http_request.body()).expect("body is valid json, we just created it"),
);
};
request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
request_map.insert(
"uri".to_owned(),
http_request
.uri()
.path_and_query()
.expect("all requests have a path")
.to_string()
.into(),
);
request_map.insert("origin".to_owned(), services().globals.server_name().as_str().into());
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json = serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut request_json,
)
.expect("our request json is what ruma expects");
let request_json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
let signatures = request_json["signatures"]
.as_object()
.unwrap()
.values()
.map(|v| {
v.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k, v.as_str().unwrap()))
});
for signature_server in signatures {
for s in signature_server {
http_request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_str(&format!(
"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
services().globals.server_name(),
s.0,
s.1
))
.unwrap(),
);
}
}
let reqwest_request = reqwest::Request::try_from(http_request)?;
let url = reqwest_request.url().clone();
if let Some(url_host) = url.host_str() {
debug!("Checking request URL for IP");
if let Ok(ip) = IPAddress::parse(url_host) {
let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
let mut cidr_ranges: Vec<IPAddress> = Vec::new();
for cidr in cidr_ranges_s {
cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
}
for cidr in cidr_ranges {
if cidr.includes(&ip) {
return Err(Error::BadServerResponse("Not allowed to send requests to this IP"));
}
}
}
}
debug!("Sending request to {destination} at {url}");
let response = services()
.globals
.client
.federation
.execute(reqwest_request)
.await;
debug!("Received response from {destination} at {url}");
match response {
Ok(mut response) => {
// reqwest::Response -> http::Response conversion
debug!("Checking response destination's IP");
if let Some(remote_addr) = response.remote_addr() {
if let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string()) {
let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
let mut cidr_ranges: Vec<IPAddress> = Vec::new();
for cidr in cidr_ranges_s {
cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
}
for cidr in cidr_ranges {
if cidr.includes(&ip) {
return Err(Error::BadServerResponse("Not allowed to send requests to this IP"));
}
}
}
}
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
debug!("Getting response bytes from {destination}");
let body = response.bytes().await.unwrap_or_else(|e| {
info!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
debug!("Got response bytes from {destination}");
if !status.is_success() {
debug!(
"Response not successful\n{} {}: {}",
url,
status,
String::from_utf8_lossy(&body)
.lines()
.collect::<Vec<_>>()
.join(" ")
let http_response = http_response_builder
.body(body)
.expect("reqwest body is valid http body");
if status.is_success() {
debug!("Parsing response bytes from {destination}");
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
services()
.globals
.actual_destinations()
.await
.insert(OwnedServerName::from(destination), (actual_destination, host));
}
response.map_err(|e| {
warn!("Invalid 200 response from {} on: {} {}", &destination, url, e);
Error::BadServerResponse("Server returned bad 200 response.")
})
} else {
debug!("Returning error from {destination}");
Err(Error::FederationError(
destination.to_owned(),
RumaError::from_http_response(http_response),
))
}
},
Err(e) => {
// we do not need to log that servers in a room are dead, this is normal in
// public rooms and just spams the logs.
"Timed out sending request to {} at {}: {}",
destination, actual_destination_str, e
);
} else if e.is_connect() {
debug!("Failed to connect to {} at {}: {}", destination, actual_destination_str, e);
} else if e.is_redirect() {
debug!(
"Redirect loop sending request to {} at {}: {}\nFinal URL: {:?}",
destination,
actual_destination_str,
e,
e.url()
);
info!("Could not send request to {} at {}: {}", destination, actual_destination_str, e);
fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
if let Ok(destination) = destination_str.parse::<SocketAddr>() {
Some(FedDest::Literal(destination))
} else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
} else {
None
}
fn add_port_to_hostname(destination_str: &str) -> FedDest {
let (host, port) = match destination_str.find(':') {
None => (destination_str, ":8448"),
Some(pos) => destination_str.split_at(pos),
};
FedDest::Named(host.to_owned(), port.to_owned())
/// Returns: `actual_destination`, host header
/// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
/// Numbers in comments below refer to bullet points in linked section of
/// specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
debug!("Finding actual destination for {destination}");
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
Some(host_port) => {
debug!("1: IP literal with provided or default port");
host_port
},
None => {
if let Some(pos) = destination_str.find(':') {
debug!("2: Hostname with included port");
let (host, port) = destination_str.split_at(pos);
query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
FedDest::Named(host.to_owned(), port.to_owned())
} else {
debug!("Requesting well known for {destination}");
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
if let Some(delegated_hostname) = request_well_known(destination.as_str()).await {
debug!("3: A .well-known file is available");
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
match get_ip_with_port(&delegated_hostname) {
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
None => {
if let Some(pos) = delegated_hostname.find(':') {
debug!("3.2: Hostname with port in .well-known file");
let (host, port) = delegated_hostname.split_at(pos);
query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
FedDest::Named(host.to_owned(), port.to_owned())
} else {
debug!("Delegated hostname has no port in this branch");
if let Some(hostname_override) = query_srv_record(&delegated_hostname).await {
debug!("3.3: SRV lookup successful");
let force_port = hostname_override.port();
query_and_cache_override(
&delegated_hostname,
&hostname_override.hostname(),
force_port.unwrap_or(8448),
)
.await;
if let Some(port) = force_port {
FedDest::Named(delegated_hostname, format!(":{port}"))
} else {
add_port_to_hostname(&delegated_hostname)
}
} else {
debug!("3.4: No SRV records, just use the hostname from .well-known");
query_and_cache_override(&delegated_hostname, &delegated_hostname, 8448).await;
add_port_to_hostname(&delegated_hostname)
}
},
}
} else {
debug!("4: No .well-known or an error occured");
if let Some(hostname_override) = query_srv_record(&destination_str).await {
debug!("4: SRV record found");
let force_port = hostname_override.port();
query_and_cache_override(&hostname, &hostname_override.hostname(), force_port.unwrap_or(8448))
.await;
if let Some(port) = force_port {
FedDest::Named(hostname.clone(), format!(":{port}"))
} else {
add_port_to_hostname(&hostname)
} else {
debug!("5: No SRV record found");
query_and_cache_override(&destination_str, &destination_str, 8448).await;
add_port_to_hostname(&destination_str)
}
}
}
},
};
// Can't use get_ip_with_port here because we don't want to add a port
// to an IP address if it wasn't specified
let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
FedDest::Literal(addr)
} else if let Ok(addr) = hostname.parse::<IpAddr>() {
FedDest::Named(addr.to_string(), ":8448".to_owned())
} else if let Some(pos) = hostname.find(':') {
let (host, port) = hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
FedDest::Named(hostname, ":8448".to_owned())
};
debug!("Actual destination: {actual_destination:?} hostname: {hostname:?}");
async fn query_and_cache_override(overname: &'_ str, hostname: &'_ str, port: u16) {
match services()
.globals
.dns_resolver()
.lookup_ip(hostname.to_owned())
.await
{
Ok(override_ip) => {
debug!("Caching result of {:?} overriding {:?}", hostname, overname);
services()
.globals
.resolver
.overrides
.write()
.unwrap()
.insert(overname.to_owned(), (override_ip.iter().collect(), port));
},
Err(e) => {
debug!("Got {:?} for {:?} to override {:?}", e.kind(), hostname, overname);
},
}
}
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
fn handle_successful_srv(srv: &SrvLookup) -> Option<FedDest> {
srv.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port()),
)
})
}
async fn lookup_srv(hostname: &str) -> Result<SrvLookup, ResolveError> {
debug!("querying SRV for {:?}", hostname);
let hostname = hostname.trim_end_matches('.');
services()
.globals
.dns_resolver()
.srv_lookup(hostname.to_owned())
.await
}
let first_hostname = format!("_matrix-fed._tcp.{hostname}.");
let second_hostname = format!("_matrix._tcp.{hostname}.");
lookup_srv(&first_hostname)
.or_else(|_| {
debug!("Querying deprecated _matrix SRV record for host {:?}", hostname);
.and_then(|srv_lookup| async move { Ok(handle_successful_srv(&srv_lookup)) })
async fn request_well_known(destination: &str) -> Option<String> {
if !services()
.globals
.resolver
.overrides
.read()
.unwrap()
.contains_key(destination)
{
query_and_cache_override(destination, destination, 8448).await;
}
.get(&format!("https://{destination}/.well-known/matrix/server"))
.send()
.await;
debug!("Got well known response");
debug!("Well known response: {:?}", response);
if let Err(e) = &response {
debug!("Well known error: {e:?}");
return None;
}
let text = response.ok()?.text().await;
debug!("Got well known response text");
debug!("Well known response text: {:?}", text);
if text.as_ref().ok()?.len() > 10000 {
"Well known response for destination '{destination}' exceeded past 10000 characters, assuming no \
well-known."
);
return None;
}
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
debug!("serde_json body of well known text: {}", body);
Some(body.get("m.server")?.as_str()?.to_owned())
/// # `GET /_matrix/federation/v1/version`
///
/// Get version information on this server.
_body: Ruma<get_server_version::v1::Request>,
) -> Result<get_server_version::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let version = match option_env!("CONDUIT_VERSION_EXTRA") {
Some(extra) => format!("{} ({})", env!("CARGO_PKG_VERSION"), extra),
None => env!("CARGO_PKG_VERSION").to_owned(),
};
Ok(get_server_version::v1::Response {
server: Some(get_server_version::v1::Server {
name: Some("Conduwuit".to_owned()),
/// # `GET /_matrix/key/v2/server`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
// Response type for this endpoint is Json because we need to calculate a
// signature for the response
pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
verify_keys.insert(
format!("ed25519:{}", services().globals.keypair().version())
.try_into()
.expect("found invalid server signing keys in DB"),
VerifyKey {
key: Base64::new(services().globals.keypair().public_key().to_vec()),
},
);
let mut response = serde_json::from_slice(
get_server_keys::v2::Response {
server_key: Raw::new(&ServerSigningKeys {
server_name: services().globals.server_name().to_owned(),
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(86400 * 7),
)
.expect("time is valid"),
})
.expect("static conversion, no errors"),
}
.try_into_http_response::<Vec<u8>>()
.unwrap()
.body(),
)
.unwrap();
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut response,
)
.unwrap();
Ok(Json(response))
/// # `GET /_matrix/key/v2/server/{keyId}`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
pub async fn get_server_keys_deprecated_route() -> impl IntoResponse { get_server_keys_route().await }
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_filtered_route(
body: Ruma<get_public_rooms_filtered::v1::Request>,
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::BadRequest(ErrorKind::Forbidden, "Room directory is not public"));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await?;
Ok(get_public_rooms_filtered::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_route(
) -> Result<get_public_rooms::v1::Response> {
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::BadRequest(ErrorKind::Forbidden, "Room directory is not public"));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&Filter::default(),
&RoomNetwork::Matrix,
)
.await?;
Ok(get_public_rooms::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
pub fn parse_incoming_pdu(pdu: &RawJsonValue) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
let room_version_id = services().rooms.state.get_room_version(&room_id)?;
let Ok((event_id, value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
pub async fn send_transaction_message_route(
body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> {
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
let mut resolved_map = BTreeMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
// This is all the auth_events that have been recursively fetched so they don't
// have to be deserialized over and over again.
// TODO: make this persist across requests but not in a DB Tree (in globals?)
// TODO: This could potentially also be some sort of trie (suffix tree) like
// structure so that once an auth event is known it would know (using indexes
// maybe) all of the auth events that it references.
// let mut auth_cache = EventMap::new();
let mut parsed_pdus = vec![];
for pdu in &body.pdus {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
if services().rooms.state.get_room_version(&room_id).is_err() {
debug!("Server is not in room {room_id}");
continue;
}
let r = parse_incoming_pdu(pdu);
let (event_id, value, room_id) = match r {
Ok(t) => t,
Err(e) => {
warn!("Could not parse PDU: {e}");
info!("Full PDU: {:?}", &pdu);
continue;
},
};
parsed_pdus.push((event_id, value, room_id));
// We do not add the event_id field to the pdu here because of signature
// and hashes checks
}
// We go through all the signatures we see on the PDUs and fetch the
// corresponding signing keys
services()
.rooms
.event_handler
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
.await
.unwrap_or_else(|e| {
warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
});
for (event_id, value, room_id) in parsed_pdus {
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.await
.entry(room_id.clone())
.or_default(),
);
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
let mutex_lock = mutex.lock().await;
let start_time = Instant::now();
resolved_map.insert(
event_id.clone(),
services()
.rooms
.event_handler
.handle_incoming_pdu(sender_servername, &event_id, &room_id, value, true, &pub_key_map)
.await
.map(|_| ()),
);
drop(mutex_lock);
let elapsed = start_time.elapsed();
debug!(
"Handling transaction of event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
}
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
}
for edu in body
.edus
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{
match edu {
Edu::Presence(presence) => {
if !services().globals.allow_incoming_presence() {
continue;
}
for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&update.user_id,
update.presence.clone(),
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
}
},
Edu::Receipt(receipt) => {
if !services().globals.allow_incoming_read_receipts() {
continue;
}
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
},
Edu::Typing(typing) => {
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
services()
.rooms
.edus
.typing
.typing_add(&typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch())
.await?;
services()
.rooms
.edus
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await?;
}
}
},
Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
..
}) => {
services().users.mark_device_key_update(&user_id)?;
},
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
// Check if this is a new transaction id
if services()
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_some()
{