Skip to content
Snippets Groups Projects
Commit b1ae2bbe authored by Devin Ragotzy's avatar Devin Ragotzy
Browse files

Fixing the incoming events algorithm (review with time)

parent 96dc6be1
No related branches found
No related tags found
No related merge requests found
...@@ -150,7 +150,7 @@ pub fn state_get( ...@@ -150,7 +150,7 @@ pub fn state_get(
} }
} }
/// Returns the last state hash key added to the db. /// Returns the state hash for this pdu.
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> { pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> {
Ok(self.pduid_statehash.get(pdu_id)?) Ok(self.pduid_statehash.get(pdu_id)?)
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
use rocket::{get, post, put, response::content::Json, State}; use rocket::{get, post, put, response::content::Json, State};
use ruma::{ use ruma::{
api::{ api::{
client::r0::state,
federation::{ federation::{
directory::{get_public_rooms, get_public_rooms_filtered}, directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{ discovery::{
...@@ -590,6 +591,8 @@ pub async fn send_transaction_message_route<'a>( ...@@ -590,6 +591,8 @@ pub async fn send_transaction_message_route<'a>(
continue; continue;
} }
}; };
// 1. check the server is in the room (optional)
if !db.rooms.exists(&room_id)? { if !db.rooms.exists(&room_id)? {
resolved_map.insert(event_id, Err("Room is unknown to this server".to_string())); resolved_map.insert(event_id, Err("Room is unknown to this server".to_string()));
continue; continue;
...@@ -634,14 +637,13 @@ pub async fn send_transaction_message_route<'a>( ...@@ -634,14 +637,13 @@ pub async fn send_transaction_message_route<'a>(
// the auth events that it references. // the auth events that it references.
let mut auth_cache = EventMap::new(); let mut auth_cache = EventMap::new();
// 1. check the server is in the room (optional)
// 2. check content hash, redact if doesn't match // 2. check content hash, redact if doesn't match
// 3. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events // 3. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// 5. reject "due to auth events" if the event doesn't pass auth based on the auth events // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
// 6. persist this event as an outlier
// 7. if not timeline event: stop // 7. if not timeline event: stop
let pdu = match validate_event( // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let (pdu, previous) = match validate_event(
&db, &db,
value, value,
event_id.clone(), event_id.clone(),
...@@ -659,59 +661,16 @@ pub async fn send_transaction_message_route<'a>( ...@@ -659,59 +661,16 @@ pub async fn send_transaction_message_route<'a>(
} }
}; };
let pdu = Arc::new(pdu.clone()); let single_prev = if previous.len() == 1 {
// Fetch any unknown prev_events or retrieve them from the DB previous.first().cloned()
let previous = match fetch_events( } else {
&db, None
server_name,
&pub_key_map,
&pdu.prev_events,
&mut auth_cache,
)
.await
{
Ok(mut evs) if evs.len() == 1 => Some(evs.remove(0)),
_ => None,
}; };
// [auth_cache] At this point we have the auth chain of the incoming event. // 6. persist the event as an outlier.
let mut event_map: state_res::EventMap<Arc<PduEvent>> = auth_cache db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?;
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
// Check that the event passes auth based on the auth_events // Step 10. check the auth of the event passes based on the calculated state of the event
let is_authed = state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
previous.clone(),
&pdu.auth_events
.iter()
.map(|id| {
auth_cache
.get(id)
.map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
.ok_or_else(|| {
Error::Conflict(
"Auth event not found, event failed recursive auth checks.",
)
})
})
.collect::<Result<BTreeMap<_, _>>>()?,
None, // TODO: third party invite
)
.map_err(|_e| Error::Conflict("Auth check failed"))?;
if !is_authed {
resolved_map.insert(
pdu.event_id().clone(),
Err("Event has failed auth check with auth events".into()),
);
continue;
}
// End of step 4.
// Step 5. event passes auth based on state at the event
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db match db
.sending .sending
...@@ -757,9 +716,7 @@ pub async fn send_transaction_message_route<'a>( ...@@ -757,9 +716,7 @@ pub async fn send_transaction_message_route<'a>(
&res.auth_chain_ids, &res.auth_chain_ids,
&mut auth_cache, &mut auth_cache,
) )
.await? .await?,
.into_iter()
.collect(),
) )
} }
Err(_) => { Err(_) => {
...@@ -771,10 +728,11 @@ pub async fn send_transaction_message_route<'a>( ...@@ -771,10 +728,11 @@ pub async fn send_transaction_message_route<'a>(
} }
}; };
// 10. This is the actual auth check for state at the event
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
&RoomVersionId::Version6, &RoomVersionId::Version6,
&pdu, &pdu,
previous.clone(), single_prev.clone(),
&state_at_event, &state_at_event,
None, // TODO: third party invite None, // TODO: third party invite
) )
...@@ -787,10 +745,34 @@ pub async fn send_transaction_message_route<'a>( ...@@ -787,10 +745,34 @@ pub async fn send_transaction_message_route<'a>(
); );
continue; continue;
} }
// End of step 5. // End of step 10.
// 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it
let current_state = db
.rooms
.room_state_full(pdu.room_id())?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect();
if !state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
single_prev.clone(),
&current_state,
None,
)
.map_err(|_e| Error::Conflict("Auth check failed"))?
{
// Soft fail, we add the event as an outlier.
resolved_map.insert(
pdu.event_id().clone(),
Err("Event has been soft failed".into()),
);
};
// Gather the forward extremities and resolve // Gather the forward extremities and resolve
let fork_states = match forward_extremity_ids( let fork_states = match forward_extremities(
&db, &db,
&pdu, &pdu,
server_name, server_name,
...@@ -806,7 +788,9 @@ pub async fn send_transaction_message_route<'a>( ...@@ -806,7 +788,9 @@ pub async fn send_transaction_message_route<'a>(
} }
}; };
// Step 6. event passes auth based on state of all forks and current room state // 13. start state-res with all previous forward extremities minus the ones that are in
// the prev_events of this event plus the new one created by this event and use
// the result as the new room state
let state_at_forks = if fork_states.is_empty() { let state_at_forks = if fork_states.is_empty() {
// State is empty // State is empty
Default::default() Default::default()
...@@ -852,6 +836,7 @@ pub async fn send_transaction_message_route<'a>( ...@@ -852,6 +836,7 @@ pub async fn send_transaction_message_route<'a>(
} }
info!("{} event's were not in the auth_cache", number_fetches); info!("{} event's were not in the auth_cache", number_fetches);
let mut event_map = EventMap::new();
// Add everything we will need to event_map // Add everything we will need to event_map
event_map.extend( event_map.extend(
auth_events auth_events
...@@ -904,7 +889,7 @@ pub async fn send_transaction_message_route<'a>( ...@@ -904,7 +889,7 @@ pub async fn send_transaction_message_route<'a>(
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
&RoomVersionId::Version6, &RoomVersionId::Version6,
&pdu, &pdu,
previous, single_prev,
&state_at_forks, &state_at_forks,
None, None,
) )
...@@ -925,14 +910,19 @@ pub async fn send_transaction_message_route<'a>( ...@@ -925,14 +910,19 @@ pub async fn send_transaction_message_route<'a>(
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
} }
/// An async function that can recursively calls itself.
type AsyncRecursiveResult<'a, T> = Pin<Box<dyn Future<Output = StdResult<T, String>> + 'a + Send>>;
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events /// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// Validate any event that is given to us by another server. /// Validate any event that is given to us by another server.
/// ///
/// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this). /// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this).
/// 2. Passes signature checks, otherwise event is dropped. /// 2. check content hash, redact if doesn't match
/// 3. Passes hash checks, otherwise it is redacted before being processed further. /// 3. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
/// 4. Passes auth_chain collection (we can gather the events that auth this event recursively). /// 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
/// 5. Once the event has passed all checks it can be added as an outlier to the DB. /// 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
/// 7. if not timeline event: stop
/// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
fn validate_event<'a>( fn validate_event<'a>(
db: &'a Database, db: &'a Database,
value: CanonicalJsonObject, value: CanonicalJsonObject,
...@@ -940,9 +930,24 @@ fn validate_event<'a>( ...@@ -940,9 +930,24 @@ fn validate_event<'a>(
pub_key_map: &'a PublicKeyMap, pub_key_map: &'a PublicKeyMap,
origin: &'a ServerName, origin: &'a ServerName,
auth_cache: &'a mut EventMap<Arc<PduEvent>>, auth_cache: &'a mut EventMap<Arc<PduEvent>>,
) -> Pin<Box<dyn Future<Output = StdResult<PduEvent, String>> + 'a + Send>> { ) -> AsyncRecursiveResult<'a, (Arc<PduEvent>, Vec<Arc<PduEvent>>)> {
Box::pin(async move { Box::pin(async move {
let mut val = signature_and_hash_check(&pub_key_map, value)?; let mut val =
match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version6) {
Ok(ver) => {
if let ruma::signatures::Verified::Signatures = ver {
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
Ok(obj) => obj,
Err(_) => return Err("Redaction failed".to_string()),
}
} else {
value
}
}
Err(_e) => {
return Err("Signature verification failed".to_string());
}
};
// Now that we have checked the signature and hashes we can add the eventID and convert // Now that we have checked the signature and hashes we can add the eventID and convert
// to our PduEvent type also finally verifying the first step listed above // to our PduEvent type also finally verifying the first step listed above
...@@ -959,11 +964,42 @@ fn validate_event<'a>( ...@@ -959,11 +964,42 @@ fn validate_event<'a>(
.await .await
.map_err(|_| "Event failed auth chain check".to_string())?; .map_err(|_| "Event failed auth chain check".to_string())?;
db.rooms let pdu = Arc::new(pdu.clone());
.append_pdu_outlier(pdu.event_id(), &pdu)
// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let previous = fetch_events(&db, origin, &pub_key_map, &pdu.prev_events, auth_cache)
.await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
Ok(pdu) // Check that the event passes auth based on the auth_events
let is_authed = state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
if previous.len() == 1 {
previous.first().cloned()
} else {
None
},
&pdu.auth_events
.iter()
.map(|id| {
auth_cache
.get(id)
.map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
.ok_or_else(|| {
"Auth event not found, event failed recursive auth checks.".to_string()
})
})
.collect::<StdResult<BTreeMap<_, _>, _>>()?,
None, // TODO: third party invite
)
.map_err(|_e| "Auth check failed".to_string())?;
if !is_authed {
return Err("Event has failed auth check with auth events".to_string());
}
Ok((pdu, previous))
}) })
} }
...@@ -990,7 +1026,10 @@ async fn fetch_check_auth_events( ...@@ -990,7 +1026,10 @@ async fn fetch_check_auth_events(
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await .await
.map(|mut vec| vec.remove(0))?; .map(|mut vec| {
vec.pop()
.ok_or_else(|| Error::Conflict("Event was not found in fetch_events"))
})??;
stack.extend(ev.auth_events()); stack.extend(ev.auth_events());
auth_cache.insert(ev.event_id().clone(), ev); auth_cache.insert(ev.event_id().clone(), ev);
...@@ -1028,11 +1067,12 @@ async fn fetch_events( ...@@ -1028,11 +1067,12 @@ async fn fetch_events(
{ {
Ok(res) => { Ok(res) => {
let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu);
let pdu = validate_event(db, value, event_id, key_map, origin, auth_cache) let (pdu, _) =
.await validate_event(db, value, event_id, key_map, origin, auth_cache)
.map_err(|_| Error::Conflict("Authentication of event failed"))?; .await
.map_err(|_| Error::Conflict("Authentication of event failed"))?;
Arc::new(pdu) pdu
} }
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
}, },
...@@ -1063,31 +1103,11 @@ async fn fetch_signing_keys( ...@@ -1063,31 +1103,11 @@ async fn fetch_signing_keys(
} }
} }
fn signature_and_hash_check( /// Gather all state snapshots needed to resolve the current state of the room.
pub_key_map: &ruma::signatures::PublicKeyMap, ///
value: CanonicalJsonObject, /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
) -> std::result::Result<CanonicalJsonObject, String> { /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
Ok( async fn forward_extremities(
match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version6) {
Ok(ver) => {
if let ruma::signatures::Verified::Signatures = ver {
error!("CONTENT HASH FAILED");
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
Ok(obj) => obj,
Err(_) => return Err("Redaction failed".to_string()),
}
} else {
value
}
}
Err(_e) => {
return Err("Signature verification failed".to_string());
}
},
)
}
async fn forward_extremity_ids(
db: &Database, db: &Database,
pdu: &PduEvent, pdu: &PduEvent,
origin: &ServerName, origin: &ServerName,
...@@ -1102,6 +1122,8 @@ async fn forward_extremity_ids( ...@@ -1102,6 +1122,8 @@ async fn forward_extremity_ids(
} }
} }
let current_hash = db.rooms.current_state_hash(pdu.room_id())?;
let mut includes_current_state = false;
let mut fork_states = vec![]; let mut fork_states = vec![];
for id in &current_leaves { for id in &current_leaves {
if let Some(id) = db.rooms.get_pdu_id(id)? { if let Some(id) = db.rooms.get_pdu_id(id)? {
...@@ -1109,6 +1131,10 @@ async fn forward_extremity_ids( ...@@ -1109,6 +1131,10 @@ async fn forward_extremity_ids(
.rooms .rooms
.pdu_state_hash(&id)? .pdu_state_hash(&id)?
.expect("found pdu with no statehash"); .expect("found pdu with no statehash");
if current_hash.as_ref() == Some(&state_hash) {
includes_current_state = true;
}
let state = db let state = db
.rooms .rooms
.state_full(&pdu.room_id, &state_hash)? .state_full(&pdu.room_id, &state_hash)?
...@@ -1144,6 +1170,17 @@ async fn forward_extremity_ids( ...@@ -1144,6 +1170,17 @@ async fn forward_extremity_ids(
} }
} }
// This guarantees that our current room state is included
if !includes_current_state && current_hash.is_some() {
fork_states.push(
db.rooms
.state_full(pdu.room_id(), current_hash.as_ref().unwrap())?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(),
)
}
Ok(fork_states) Ok(fork_states)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment