diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 2906d35bf3c143c707c6d93ec044f4d81ae9f51e..97aa1c691331334c65e25e43ab57505024fbf714 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -878,46 +878,59 @@ async fn join_room_by_id_helper_remote( info!("Going through send_join response room_state"); let cork = services.db.cork_and_flush(); - let mut state = HashMap::new(); - for result in send_join_response.room_state.state.iter().map(|pdu| { - services - .server_keys - .validate_and_add_event_id(pdu, &room_version_id) - }) { - let Ok((event_id, value)) = result.await else { - continue; - }; + let state = send_join_response + .room_state + .state + .iter() + .stream() + .then(|pdu| { + services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_id) + }) + .ready_filter_map(Result::ok) + .fold(HashMap::new(), |mut state, (event_id, value)| async move { + let pdu = match PduEvent::from_id_val(&event_id, value.clone()) { + Ok(pdu) => pdu, + Err(e) => { + debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); + return state; + }, + }; - let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { - debug_warn!("Invalid PDU in send_join response: {value:#?}"); - err!(BadServerResponse("Invalid PDU in send_join response: {e:?}")) - })?; + services.rooms.outlier.add_pdu_outlier(&event_id, &value); + if let Some(state_key) = &pdu.state_key { + let shortstatekey = services + .rooms + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) + .await; + + state.insert(shortstatekey, pdu.event_id.clone()); + } + + state + }) + .await; - services.rooms.outlier.add_pdu_outlier(&event_id, &value); - if let Some(state_key) = &pdu.state_key { - let shortstatekey = services - .rooms - .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) - .await; - state.insert(shortstatekey, pdu.event_id.clone()); - } - } drop(cork); info!("Going through send_join response auth_chain"); let cork = services.db.cork_and_flush(); - for result in send_join_response.room_state.auth_chain.iter().map(|pdu| { - services - .server_keys - .validate_and_add_event_id(pdu, &room_version_id) - }) { - let Ok((event_id, value)) = result.await else { - continue; - }; + send_join_response + .room_state + .auth_chain + .iter() + .stream() + .then(|pdu| { + services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_id) + }) + .ready_filter_map(Result::ok) + .ready_for_each(|(event_id, value)| services.rooms.outlier.add_pdu_outlier(&event_id, &value)) + .await; - services.rooms.outlier.add_pdu_outlier(&event_id, &value); - } drop(cork); debug!("Running send_join auth check"); diff --git a/src/service/server_keys/mod.rs b/src/service/server_keys/mod.rs index 333970df32f9c2e3c11722a5d9e2e29a20b997c0..08bcefb630af43694beb57d4205ac67a8eab3f16 100644 --- a/src/service/server_keys/mod.rs +++ b/src/service/server_keys/mod.rs @@ -7,13 +7,19 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use conduit::{implement, utils::timepoint_from_now, Result, Server}; +use conduit::{ + implement, + utils::{timepoint_from_now, IterStream}, + Result, Server, +}; use database::{Deserialized, Json, Map}; +use futures::StreamExt; use ruma::{ api::federation::discovery::{ServerSigningKeys, VerifyKey}, serde::Raw, signatures::{Ed25519KeyPair, PublicKeyMap, PublicKeySet}, - MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, + CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, RoomVersionId, ServerName, + ServerSigningKeyId, }; use serde_json::value::RawValue as RawJsonValue; @@ -107,7 +113,23 @@ async fn add_signing_keys(&self, new_keys: ServerSigningKeys) { } #[implement(Service)] -async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool { +pub async fn required_keys_exist(&self, object: &CanonicalJsonObject, version: &RoomVersionId) -> bool { + use ruma::signatures::required_keys; + + let Ok(required_keys) = required_keys(object, version) else { + return false; + }; + + required_keys + .iter() + .flat_map(|(server, key_ids)| key_ids.iter().map(move |key_id| (server, key_id))) + .stream() + .all(|(server, key_id)| self.verify_key_exists(server, key_id)) + .await +} + +#[implement(Service)] +pub async fn verify_key_exists(&self, origin: &ServerName, key_id: &ServerSigningKeyId) -> bool { type KeysMap<'a> = BTreeMap<&'a ServerSigningKeyId, &'a RawJsonValue>; let Ok(keys) = self diff --git a/src/service/server_keys/verify.rs b/src/service/server_keys/verify.rs index ad20fec7f52056e95091cad6d0cdd0990b8d88d0..c836e324a52160eec1d8a279373968bf4c02cccf 100644 --- a/src/service/server_keys/verify.rs +++ b/src/service/server_keys/verify.rs @@ -16,6 +16,26 @@ pub async fn validate_and_add_event_id( Ok((event_id, value)) } +#[implement(super::Service)] +pub async fn validate_and_add_event_id_no_fetch( + &self, pdu: &RawJsonValue, room_version: &RoomVersionId, +) -> Result<(OwnedEventId, CanonicalJsonObject)> { + let (event_id, mut value) = gen_event_id_canonical_json(pdu, room_version)?; + if !self.required_keys_exist(&value, room_version).await { + return Err!(BadServerResponse(debug_warn!( + "Event {event_id} cannot be verified: missing keys." + ))); + } + + if let Err(e) = self.verify_event(&value, Some(room_version)).await { + return Err!(BadServerResponse(debug_error!("Event {event_id} failed verification: {e:?}"))); + } + + value.insert("event_id".into(), CanonicalJsonValue::String(event_id.as_str().into())); + + Ok((event_id, value)) +} + #[implement(super::Service)] pub async fn verify_event( &self, event: &CanonicalJsonObject, room_version: Option<&RoomVersionId>,