Newer
Older
Timo Kösters
committed
#![allow(deprecated)]
// Conduit implements the older APIs
Timo Kösters
committed
use std::{
collections::BTreeMap,
fmt::Debug,
mem,
net::{IpAddr, SocketAddr},
sync::Arc,
use hickory_resolver::{error::ResolveError, lookup::SrvLookup};
api::{
client::error::{Error as RumaError, ErrorKind},
federation::{
authorization::get_event_authorization,
backfill::get_backfill,
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
membership::{create_invite, create_join_event, prepare_join_event},
query::{get_profile_information, get_room_information},
transactions::{
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
send_transaction_message,
},
},
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse, SendAccessToken,
},
directory::{Filter, RoomNetwork},
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
room::{
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
},
StateEventType, TimelineEventType,
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::RwLock;
use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder},
services, utils, Error, PduEvent, Result, Ruma,
};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
///
/// Note: A `FedDest::Named` might contain an IP address in string form if there
/// was no port specified to construct a `SocketAddr` with.
/// # fn main() -> Result<(), std::net::AddrParseError> {
/// FedDest::Literal("198.51.100.3:8448".parse()?);
/// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
/// FedDest::Named("matrix.example.org".to_owned(), String::new());
/// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
/// FedDest::Named("198.51.100.5".to_owned(), String::new());
Literal(SocketAddr),
Named(String, String),
fn into_https_string(self) -> String {
match self {
Self::Literal(addr) => format!("https://{addr}"),
Self::Named(host, port) => format!("https://{host}{port}"),
}
}
fn into_uri_string(self) -> String {
match self {
Self::Literal(addr) => addr.to_string(),
Self::Named(host, port) => host + &port,
}
}
fn hostname(&self) -> String {
match &self {
Self::Literal(addr) => addr.ip().to_string(),
Self::Named(host, _) => host.clone(),
}
}
fn port(&self) -> Option<u16> {
match &self {
Self::Literal(addr) => Some(addr.port()),
Self::Named(_, port) => port[1..].parse().ok(),
}
}
}
pub(crate) async fn send_request<T>(destination: &ServerName, request: T) -> Result<T::IncomingResponse>
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
if destination == services().globals.server_name() {
return Err(Error::bad_config("Won't send federation request to ourselves"));
}
if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) {
info!(
"Destination {} is an IP literal, checking against IP range denylist.",
destination
);
let ip = IPAddress::parse(destination.host()).map_err(|e| {
warn!("Failed to parse IP literal from string: {}", e);
Error::BadServerResponse("Invalid IP address")
})?;
let cidr_ranges_s = services().globals.ip_range_denylist().to_vec();
let mut cidr_ranges: Vec<IPAddress> = Vec::new();
for cidr in cidr_ranges_s {
cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup"));
}
debug!("List of pushed CIDR ranges: {:?}", cidr_ranges);
for cidr in cidr_ranges {
if cidr.includes(&ip) {
return Err(Error::BadServerResponse("Not allowed to send requests to this IP"));
}
}
info!("IP literal {} is allowed.", destination);
}
debug!("Preparing to send request to {destination}");
let mut write_destination_to_cache = false;
let cached_result = services().globals.actual_destination_cache.read().await.get(destination).cloned();
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
let (actual_destination, host) = if let Some(result) = cached_result {
result
} else {
write_destination_to_cache = true;
let result = find_actual_destination(destination).await;
(result.0, result.1.into_uri_string())
};
let actual_destination_str = actual_destination.clone().into_https_string();
let mut http_request = request
.try_into_http_request::<Vec<u8>>(
&actual_destination_str,
SendAccessToken::IfRequired(""),
&[MatrixVersion::V1_5],
)
.map_err(|e| {
warn!("Failed to find destination {}: {}", actual_destination_str, e);
Error::BadServerResponse("Invalid destination")
})?;
let mut request_map = serde_json::Map::new();
if !http_request.body().is_empty() {
request_map.insert(
"content".to_owned(),
serde_json::from_slice(http_request.body()).expect("body is valid json, we just created it"),
);
};
request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
request_map.insert(
"uri".to_owned(),
http_request.uri().path_and_query().expect("all requests have a path").to_string().into(),
);
request_map.insert("origin".to_owned(), services().globals.server_name().as_str().into());
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json = serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut request_json,
)
.expect("our request json is what ruma expects");
let request_json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
let signatures = request_json["signatures"]
.as_object()
.unwrap()
.values()
.map(|v| v.as_object().unwrap().iter().map(|(k, v)| (k, v.as_str().unwrap())));
for signature_server in signatures {
for s in signature_server {
http_request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_str(&format!(
"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
services().globals.server_name(),
s.0,
s.1
))
.unwrap(),
);
}
}
let reqwest_request = reqwest::Request::try_from(http_request)?;
let url = reqwest_request.url().clone();
debug!("Sending request to {destination} at {url}");
let response = services().globals.client.federation.execute(reqwest_request).await;
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
debug!("Received response from {destination} at {url}");
match response {
Ok(mut response) => {
// reqwest::Response -> http::Response conversion
let status = response.status();
let mut http_response_builder = http::Response::builder().status(status).version(response.version());
mem::swap(
response.headers_mut(),
http_response_builder.headers_mut().expect("http::response::Builder is usable"),
);
debug!("Getting response bytes from {destination}");
let body = response.bytes().await.unwrap_or_else(|e| {
info!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
debug!("Got response bytes from {destination}");
if !status.is_success() {
debug!(
"Response not successful\n{} {}: {}",
url,
status,
String::from_utf8_lossy(&body).lines().collect::<Vec<_>>().join(" ")
);
}
let http_response = http_response_builder.body(body).expect("reqwest body is valid http body");
if status.is_success() {
debug!("Parsing response bytes from {destination}");
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
services()
.globals
.actual_destination_cache
.write()
.await
.insert(OwnedServerName::from(destination), (actual_destination, host));
}
response.map_err(|e| {
warn!("Invalid 200 response from {} on: {} {}", &destination, url, e);
Error::BadServerResponse("Server returned bad 200 response.")
})
} else {
debug!("Returning error from {destination}");
// remove potentially dead destinations from our cache that may be from modified
// well-knowns
if !write_destination_to_cache {
info!("Evicting {destination} from our true destination cache due to failed request.");
services().globals.actual_destination_cache.write().await.remove(destination);
}
Err(Error::FederationError(
destination.to_owned(),
RumaError::from_http_response(http_response),
))
}
},
Err(e) => {
// we do not need to log that servers in a room are dead, this is normal in
// public rooms and just spams the logs.
"Timed out sending request to {} at {}: {}",
destination, actual_destination_str, e
);
} else if e.is_connect() {
debug!("Failed to connect to {} at {}: {}", destination, actual_destination_str, e);
} else if e.is_redirect() {
debug!(
"Redirect loop sending request to {} at {}: {}\nFinal URL: {:?}",
destination,
actual_destination_str,
e,
e.url()
);
info!("Could not send request to {} at {}: {}", destination, actual_destination_str, e);
fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
if let Ok(destination) = destination_str.parse::<SocketAddr>() {
Some(FedDest::Literal(destination))
} else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
} else {
None
}
fn add_port_to_hostname(destination_str: &str) -> FedDest {
let (host, port) = match destination_str.find(':') {
None => (destination_str, ":8448"),
Some(pos) => destination_str.split_at(pos),
};
FedDest::Named(host.to_owned(), port.to_owned())
/// Returns: `actual_destination`, host header
/// Implemented according to the specification at <https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names>
/// Numbers in comments below refer to bullet points in linked section of
/// specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
debug!("Finding actual destination for {destination}");
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
Some(host_port) => {
debug!("1: IP literal with provided or default port");
host_port
},
None => {
if let Some(pos) = destination_str.find(':') {
debug!("2: Hostname with included port");
let (host, port) = destination_str.split_at(pos);
query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
FedDest::Named(host.to_owned(), port.to_owned())
} else {
debug!("Requesting well known for {destination}");
match request_well_known(destination.as_str()).await {
Some(delegated_hostname) => {
debug!("3: A .well-known file is available");
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
match get_ip_with_port(&delegated_hostname) {
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
None => {
if let Some(pos) = delegated_hostname.find(':') {
debug!("3.2: Hostname with port in .well-known file");
let (host, port) = delegated_hostname.split_at(pos);
query_and_cache_override(host, host, port.parse::<u16>().unwrap_or(8448)).await;
FedDest::Named(host.to_owned(), port.to_owned())
} else {
debug!("Delegated hostname has no port in this branch");
if let Some(hostname_override) = query_srv_record(&delegated_hostname).await {
debug!("3.3: SRV lookup successful");
let force_port = hostname_override.port();
query_and_cache_override(
&delegated_hostname,
&hostname_override.hostname(),
force_port.unwrap_or(8448),
)
.await;
if let Some(port) = force_port {
FedDest::Named(delegated_hostname, format!(":{port}"))
} else {
add_port_to_hostname(&delegated_hostname)
}
} else {
debug!("3.4: No SRV records, just use the hostname from .well-known");
query_and_cache_override(&delegated_hostname, &delegated_hostname, 8448).await;
add_port_to_hostname(&delegated_hostname)
}
}
},
}
},
None => {
debug!("4: No .well-known or an error occured");
match query_srv_record(&destination_str).await {
Some(hostname_override) => {
debug!("4: SRV record found");
let force_port = hostname_override.port();
query_and_cache_override(
&hostname,
&hostname_override.hostname(),
force_port.unwrap_or(8448),
)
.await;
if let Some(port) = force_port {
FedDest::Named(hostname.clone(), format!(":{port}"))
} else {
add_port_to_hostname(&hostname)
}
},
None => {
debug!("5: No SRV record found");
query_and_cache_override(&destination_str, &destination_str, 8448).await;
add_port_to_hostname(&destination_str)
},
}
},
}
}
},
};
// Can't use get_ip_with_port here because we don't want to add a port
// to an IP address if it wasn't specified
let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
FedDest::Literal(addr)
} else if let Ok(addr) = hostname.parse::<IpAddr>() {
FedDest::Named(addr.to_string(), ":8448".to_owned())
} else if let Some(pos) = hostname.find(':') {
let (host, port) = hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
FedDest::Named(hostname, ":8448".to_owned())
};
debug!("Actual destination: {actual_destination:?} hostname: {hostname:?}");
async fn query_and_cache_override(overname: &'_ str, hostname: &'_ str, port: u16) {
match services().globals.dns_resolver().lookup_ip(hostname.to_owned()).await {
Ok(override_ip) => {
debug!("Caching result of {:?} overriding {:?}", hostname, overname);
services()
.globals
.tls_name_override
.write()
.unwrap()
.insert(overname.to_owned(), (override_ip.iter().collect(), port));
},
Err(e) => {
debug!("Got {:?} for {:?} to override {:?}", e.kind(), hostname, overname);
},
}
}
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
fn handle_successful_srv(srv: &SrvLookup) -> Option<FedDest> {
srv.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port()),
)
})
}
async fn lookup_srv(hostname: &str) -> Result<SrvLookup, ResolveError> {
debug!("querying SRV for {:?}", hostname);
let hostname = hostname.trim_end_matches('.');
services().globals.dns_resolver().srv_lookup(hostname.to_owned()).await
}
let first_hostname = format!("_matrix-fed._tcp.{hostname}.");
let second_hostname = format!("_matrix._tcp.{hostname}.");
lookup_srv(&first_hostname)
.or_else(|_| {
debug!("Querying deprecated _matrix SRV record for host {:?}", hostname);
.and_then(|srv_lookup| async move { Ok(handle_successful_srv(&srv_lookup)) })
async fn request_well_known(destination: &str) -> Option<String> {
if !services().globals.tls_name_override.read().unwrap().contains_key(destination) {
query_and_cache_override(destination, destination, 8448).await;
}
.get(&format!("https://{destination}/.well-known/matrix/server"))
.send()
.await;
debug!("Got well known response");
debug!("Well known response: {:?}", response);
if let Err(e) = &response {
debug!("Well known error: {e:?}");
return None;
}
let text = response.ok()?.text().await;
debug!("Got well known response text");
debug!("Well known response text: {:?}", text);
if text.as_ref().ok()?.len() > 10000 {
"Well known response for destination '{destination}' exceeded past 10000 characters, assuming no \
well-known."
);
return None;
}
let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?;
debug!("serde_json body of well known text: {}", body);
Some(body.get("m.server")?.as_str()?.to_owned())
/// # `GET /_matrix/federation/v1/version`
///
/// Get version information on this server.
_body: Ruma<get_server_version::v1::Request>,
) -> Result<get_server_version::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
Ok(get_server_version::v1::Response {
server: Some(get_server_version::v1::Server {
name: Some("Conduwuit".to_owned()),
version: Some(env!("CARGO_PKG_VERSION").to_owned()),
}),
})
/// # `GET /_matrix/key/v2/server`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
// Response type for this endpoint is Json because we need to calculate a
// signature for the response
pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
verify_keys.insert(
format!("ed25519:{}", services().globals.keypair().version())
.try_into()
.expect("found invalid server signing keys in DB"),
VerifyKey {
key: Base64::new(services().globals.keypair().public_key().to_vec()),
},
);
let mut response = serde_json::from_slice(
get_server_keys::v2::Response {
server_key: Raw::new(&ServerSigningKeys {
server_name: services().globals.server_name().to_owned(),
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(86400 * 7),
)
.expect("time is valid"),
})
.expect("static conversion, no errors"),
}
.try_into_http_response::<Vec<u8>>()
.unwrap()
.body(),
)
.unwrap();
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut response,
)
.unwrap();
Ok(Json(response))
/// # `GET /_matrix/key/v2/server/{keyId}`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
pub async fn get_server_keys_deprecated_route() -> impl IntoResponse { get_server_keys_route().await }
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_filtered_route(
body: Ruma<get_public_rooms_filtered::v1::Request>,
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services().globals.allow_public_room_directory_over_federation() {
return Err(Error::bad_config("Room directory is not public."));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await?;
Ok(get_public_rooms_filtered::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_route(
) -> Result<get_public_rooms::v1::Response> {
if !services().globals.allow_public_room_directory_over_federation() {
return Err(Error::bad_config("Room directory is not public."));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&Filter::default(),
&RoomNetwork::Matrix,
)
.await?;
Ok(get_public_rooms::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
pub fn parse_incoming_pdu(pdu: &RawJsonValue) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
let room_version_id = services().rooms.state.get_room_version(&room_id)?;
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
},
};
Ok((event_id, value, room_id))
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
pub async fn send_transaction_message_route(
body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> {
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
let mut resolved_map = BTreeMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
// This is all the auth_events that have been recursively fetched so they don't
// have to be deserialized over and over again.
// TODO: make this persist across requests but not in a DB Tree (in globals?)
// TODO: This could potentially also be some sort of trie (suffix tree) like
// structure so that once an auth event is known it would know (using indexes
// maybe) all of the auth events that it references.
// let mut auth_cache = EventMap::new();
let mut parsed_pdus = vec![];
for pdu in &body.pdus {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
if services().rooms.state.get_room_version(&room_id).is_err() {
debug!("Server is not in room {room_id}");
continue;
}
let r = parse_incoming_pdu(pdu);
let (event_id, value, room_id) = match r {
Ok(t) => t,
Err(e) => {
warn!("Could not parse PDU: {e}");
info!("Full PDU: {:?}", &pdu);
continue;
},
};
parsed_pdus.push((event_id, value, room_id));
// We do not add the event_id field to the pdu here because of signature
// and hashes checks
}
// We go through all the signatures we see on the PDUs and fetch the
// corresponding signing keys
services()
.rooms
.event_handler
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
.await
.unwrap_or_else(|e| {
warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
});
for (event_id, value, room_id) in parsed_pdus {
let mutex =
Arc::clone(services().globals.roomid_mutex_federation.write().await.entry(room_id.clone()).or_default());
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
let mutex_lock = mutex.lock().await;
let start_time = Instant::now();
resolved_map.insert(
event_id.clone(),
services()
.rooms
.event_handler
.handle_incoming_pdu(sender_servername, &event_id, &room_id, value, true, &pub_key_map)
.await
.map(|_| ()),
);
drop(mutex_lock);
let elapsed = start_time.elapsed();
debug!(
"Handling transaction of event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
}
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
}
for edu in body.edus.iter().filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) {
match edu {
Edu::Presence(presence) => {
if !services().globals.allow_incoming_presence() {
continue;
}
for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&update.user_id,
update.presence.clone(),
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
}
},
Edu::Receipt(receipt) => {
if !services().globals.allow_incoming_read_receipts() {
continue;
}
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services().rooms.edus.read_receipt.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
},
Edu::Typing(typing) => {
if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? {
if typing.typing {
services()
.rooms
.edus
.typing
.typing_add(&typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch())
.await?;
services().rooms.edus.typing.typing_remove(&typing.user_id, &typing.room_id).await?;
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
}
}
},
Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
..
}) => {
services().users.mark_device_key_update(&user_id)?;
},
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
// Check if this is a new transaction id
if services().transaction_ids.existing_txnid(&sender, None, &message_id)?.is_some() {
continue;
}
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services().users.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type.to_string(),
event.deserialize_as().map_err(|e| {
warn!("To-Device event is invalid: {event:?} {e}");
Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
})?,
)?;
},
DeviceIdOrAllDevices::AllDevices => {
for target_device_id in services().users.all_device_ids(target_user_id) {
services().users.add_to_device_event(
&sender,
target_user_id,
&target_device_id?,
&ev_type.to_string(),
event.deserialize_as().map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
})?,
)?;
}
},
}
}
}
// Save transaction id with empty data
services().transaction_ids.add_txnid(&sender, None, &message_id, &[])?;
},
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
user_id,
master_key,
self_signing_key,
}) => {
if user_id.server_name() != sender_servername {
continue;
}
if let Some(master_key) = master_key {
services().users.add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?;
}
},
Edu::_Custom(_) => {},
}
}
Ok(send_transaction_message::v1::Response {
pdus: resolved_map.into_iter().map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))).collect(),
})
/// # `GET /_matrix/federation/v1/event/{eventId}`
///
/// Retrieves a single event from the server.
///
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
/// - Only works if a user of this server is currently invited or joined the
/// room
pub async fn get_event_route(body: Ruma<get_event::v1::Request>) -> Result<get_event::v1::Response> {
let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
let event = services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(|| {
warn!("Event not found, event ID: {:?}", &body.event_id);
Error::BadRequest(ErrorKind::NotFound, "Event not found.")
})?;
let room_id_str = event
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not in room"));
}
if !services().rooms.state_accessor.server_can_see_event(sender_servername, room_id, &body.event_id)? {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not allowed to see event."));
}
Ok(get_event::v1::Response {
origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdu: PduEvent::convert_to_outgoing_federation_event(event),
})
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
///
/// Retrieves events from before the sender joined the room, if the room's
/// history visibility allows.
pub async fn get_backfill_route(body: Ruma<get_backfill::v1::Request>) -> Result<get_backfill::v1::Response> {
let sender_servername = body.sender_servername.as_ref().expect("server is authenticated");
debug!("Got backfill request from: {}", sender_servername);
if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Server is not in room."));
}
services().rooms.event_handler.acl_check(sender_servername, &body.room_id)?;
let until = body