Skip to content
Snippets Groups Projects
server_server.rs 56.8 KiB
Newer Older
  • Learn to ignore specific revisions
  •         let is_authed = state_res::event_auth::auth_check(
                &RoomVersionId::Version6,
                &pdu,
                if previous.len() == 1 {
                    previous.first().cloned()
                } else {
                    None
                },
                &pdu.auth_events
                    .iter()
                    .map(|id| {
                        auth_cache
                            .get(id)
                            .map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
                            .ok_or_else(|| {
                                "Auth event not found, event failed recursive auth checks.".to_string()
                            })
                    })
                    .collect::<StdResult<BTreeMap<_, _>, _>>()?,
                None, // TODO: third party invite
            )
            .map_err(|_e| "Auth check failed".to_string())?;
    
            if !is_authed {
                return Err("Event has failed auth check with auth events".to_string());
            }
    
            Ok((pdu, previous))
    
    /// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
    /// The check in `fetch_check_auth_events` is that a complete chain is found for the
    /// events `auth_events`. If the chain is found to have any missing events it fails.
    async fn fetch_check_auth_events(
        db: &Database,
        origin: &ServerName,
        key_map: &PublicKeyMap,
        event_ids: &[EventId],
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<()> {
        let mut stack = event_ids.to_vec();
    
        // DFS for auth event chain
        while !stack.is_empty() {
            let ev_id = stack.pop().unwrap();
            if auth_cache.contains_key(&ev_id) {
                continue;
            }
    
    
            // TODO: Batch these async calls so we can wait on multiple at once
    
            let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
                .await
    
                .map(|mut vec| {
                    vec.pop()
                        .ok_or_else(|| Error::Conflict("Event was not found in fetch_events"))
                })??;
    
    /// Find the event and auth it. Once the event is validated (steps 1 - 8)
    /// it is appended to the outliers Tree.
    
    /// 1. Look in the main timeline (pduid_pdu tree)
    /// 2. Look at outlier pdu tree
    /// 3. Ask origin server over federation
    /// 4. TODO: Ask other servers over federation?
    
    ///
    /// If the event is unknown to the `auth_cache` it is added. This guarantees that any
    /// event we need to know of will be present.
    
        db: &Database,
        origin: &ServerName,
        key_map: &PublicKeyMap,
        events: &[EventId],
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<Vec<Arc<PduEvent>>> {
        let mut pdus = vec![];
        for id in events {
    
            let pdu = match auth_cache.get(id) {
                Some(pdu) => pdu.clone(),
                // `get_pdu` checks the outliers tree for us
                None => match db.rooms.get_pdu(&id)? {
                    Some(pdu) => Arc::new(pdu),
                    None => match db
                        .sending
                        .send_federation_request(
                            &db.globals,
                            origin,
                            get_event::v1::Request { event_id: &id },
                        )
                        .await
                    {
                        Ok(res) => {
                            let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu);
                            let (pdu, _) =
                                validate_event(db, value, event_id, key_map, origin, auth_cache)
                                    .await
                                    .map_err(|e| {
                                        error!("{:?}", e);
                                        Error::Conflict("Authentication of event failed")
                                    })?;
    
                            db.rooms.append_pdu_outlier(&pdu)?;
                            pdu
                        }
                        Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
                    },
    
            auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
    
    /// Search the DB for the signing keys of the given server, if we don't have them
    /// fetch them from the server and save to our DB.
    
    pub(crate) async fn fetch_signing_keys(
    
        origin: &ServerName,
    
    ) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
        match db.globals.signing_keys_for(origin)? {
            keys if !keys.is_empty() => Ok(keys),
            _ => {
    
                    .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
    
                {
                    Ok(keys) => {
                        db.globals.add_signing_key(origin, &keys.server_key)?;
                        Ok(keys.server_key.verify_keys)
                    }
                    _ => {
                        for server in db.rooms.room_servers(room_id) {
                            let server = server?;
                            if let Ok(keys) = db
                                .sending
                                .send_federation_request(
                                    &db.globals,
                                    &server,
                                    get_remote_server_keys::v2::Request::new(
                                        &server,
                                        SystemTime::now()
                                            .checked_add(Duration::from_secs(3600))
                                            .expect("SystemTime to large"),
                                    ),
                                )
                                .await
                            {
                                let keys: Vec<ServerSigningKeys> = keys.server_keys;
                                let key = keys.into_iter().fold(None, |mut key, next| {
                                    if let Some(verified) = &key {
                                        // rustc cannot elide this type for some reason
                                        let v: &ServerSigningKeys = verified;
                                        if v.verify_keys
                                            .iter()
                                            .zip(next.verify_keys.iter())
                                            .all(|(a, b)| a.1.key == b.1.key)
                                        {
                                        }
                                    } else {
                                        key = Some(next)
                                    }
                                    key
                                });
                            }
                        }
                        Err(Error::BadServerResponse(
                            "Failed to find public key for server",
                        ))
                    }
                }
    
    /// Gather all state snapshots needed to resolve the current state of the room.
    ///
    /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
    
    /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
    ///
    /// The state snapshot of the incoming event __needs__ to be added to the resulting list.
    
    pub(crate) async fn calculate_forward_extremities(
    
        let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
    
    
        let mut is_incoming_leaf = true;
        // Make sure the incoming event is not already a forward extremity
        // FIXME: I think this could happen if different servers send us the same event??
    
        if current_leaves.contains(pdu.event_id()) {
    
            error!("The incoming event is already present in get_pdu_leaves BUG");
    
            is_incoming_leaf = false;
            // Not sure what to do here
        }
    
    
        // If the incoming event is already referenced by an existing event
        // then do nothing - it's not a candidate to be a new extremity if
        // it has been referenced.
    
        if db.rooms.is_pdu_referenced(pdu)? {
            is_incoming_leaf = false;
        }
    
    
        // TODO:
        // [dendrite] Checks if any other leaves have been referenced and removes them
        // but as long as we update the pdu leaves here and for events on our server this
        // should not be possible.
    
        // Remove any forward extremities that are referenced by this incoming events prev_events
    
        for incoming_leaf in &pdu.prev_events {
    
            if current_leaves.contains(incoming_leaf) {
                if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) {
                    current_leaves.remove(pos);
                }
    
        // Add the incoming event only if it is a leaf, we do this after fetching all the
        // state since we know we have already fetched the state of the incoming event so lets
        // not do it again!
        if is_incoming_leaf {
            current_leaves.push(pdu.event_id().clone());
        }
    
        Ok(current_leaves)
    }
    
    /// This should always be called after the incoming event has been appended to the DB.
    ///
    
    /// This guarantees that the incoming event will be in the state sets (at least our servers
    
    /// and the sending server).
    pub(crate) async fn build_forward_extremity_snapshots(
        db: &Database,
    
        mut current_state: StateMap<Arc<PduEvent>>,
    
        current_leaves: &[EventId],
        pub_key_map: &PublicKeyMap,
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> {
    
        let current_hash = db.rooms.current_state_hash(pdu.room_id())?;
    
        let mut includes_current_state = false;
    
        let mut fork_states = BTreeSet::new();
    
            match db.rooms.get_pdu_id(id)? {
                // We can skip this because it is handled outside of this function
                // The current server state and incoming event state are built to be
                // the state after.
                // This would be the incoming state from the server.
                Some(_) if id == pdu.event_id() => {}
                Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => {
                    let state_hash = db
                        .rooms
                        .pdu_state_hash(&pduid)?
                        .expect("found pdu with no statehash");
    
                    if current_hash.as_ref() == Some(&state_hash) {
                        includes_current_state = true;
                    }
    
                    let mut state_before = db
                        .rooms
                        .state_full(pdu.room_id(), &state_hash)?
                        .into_iter()
                        .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
                        .collect::<StateMap<_>>();
    
                    // Now it's the state after
                    if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? {
                        let key = (pdu.kind.clone(), pdu.state_key());
                        state_before.insert(key, Arc::new(pdu));
                    }
    
                    fork_states.insert(state_before);
    
                _ => {
                    error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
    
                    let res = db
                        .sending
                        .send_federation_request(
                            &db.globals,
                            origin,
                            get_room_state_ids::v1::Request {
                                room_id: pdu.room_id(),
                                event_id: id,
                            },
                        )
                        .await?;
    
                    // TODO: This only adds events to the auth_cache, there is for sure a better way to
                    // do this...
                    fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
    
                    let mut state_before =
                        fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
                            .await?
                            .into_iter()
                            .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
                            .collect::<StateMap<_>>();
    
                    if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
                        .await?
                        .pop()
                    {
                        let key = (pdu.kind.clone(), pdu.state_key());
                        state_before.insert(key, pdu);
                    }
    
                    // Now it's the state after
                    fork_states.insert(state_before);
                }
    
        // This guarantees that our current room state is included
    
        if !includes_current_state {
    
            current_state.insert((pdu.kind(), pdu.state_key()), pdu);
    
    
    pub(crate) fn update_resolved_state(
    
        state: Option<StateMap<Arc<PduEvent>>>,
    ) -> Result<()> {
        // Update the state of the room if needed
        // We can tell if we need to do this based on wether state resolution took place or not
        if let Some(state) = state {
    
            let mut new_state = HashMap::new();
            for ((ev_type, state_k), pdu) in state {
                match db.rooms.get_pdu_id(pdu.event_id())? {
                    Some(pduid) => {
                        new_state.insert(
                            (
                                ev_type,
                                state_k.ok_or_else(|| {
                                    Error::Conflict("State contained non state event")
                                })?,
                            ),
                            pduid.to_vec(),
                        );
                    }
                    None => {
    
                        error!("We are missing a state event for the current room state.");
    
            db.rooms.force_state(room_id, new_state, &db.globals)?;
        }
    
        Ok(())
    }
    
    /// Append the incoming event setting the state snapshot to the state from the
    /// server that sent the event.
    pub(crate) fn append_incoming_pdu(
        db: &Database,
        pdu: &PduEvent,
        new_room_leaves: &[EventId],
        state: &StateMap<Arc<PduEvent>>,
    ) -> Result<()> {
        // Update the state of the room if needed
        // We can tell if we need to do this based on wether state resolution took place or not
        let mut new_state = HashMap::new();
    
        for ((ev_type, state_k), state_pdu) in state {
            match db.rooms.get_pdu_id(state_pdu.event_id())? {
                Some(state_pduid) => {
    
                    new_state.insert(
                        (
                            ev_type.clone(),
                            state_k
                                .clone()
                                .ok_or_else(|| Error::Conflict("State contained non state event"))?,
                        ),
    
                        state_pduid.to_vec(),
    
                None => error!("We are missing a state event for the incoming event snapshot"),
    
        db.rooms
            .force_state(pdu.room_id(), new_state, &db.globals)?;
    
    
        let count = db.globals.next_count()?;
        let mut pdu_id = pdu.room_id.as_bytes().to_vec();
        pdu_id.push(0xff);
        pdu_id.extend_from_slice(&count.to_be_bytes());
    
    
        // We append to state before appending the pdu, so we don't have a moment in time with the
        // pdu without it's state. This is okay because append_pdu can't fail.
    
        let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
    
    
        db.rooms.append_pdu(
    
            utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
    
            count,
            pdu_id.clone().into(),
    
    Devin Ragotzy's avatar
    Devin Ragotzy committed
            &db,
    
        db.rooms.set_room_state(pdu.room_id(), &state_hash)?;
    
        for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
            db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
        }
    
        Ok(())
    }
    
    
    #[cfg_attr(
        feature = "conduit_bin",
        post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
    )]
    pub fn get_missing_events_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_missing_events::v1::Request<'_>>,
    ) -> ConduitResult<get_missing_events::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        let mut queued_events = body.latest_events.clone();
        let mut events = Vec::new();
    
        let mut i = 0;
        while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
            if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? {
                if body.earliest_events.contains(
                    &serde_json::from_value(
                        pdu.get("event_id")
                            .cloned()
                            .ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?,
                    )
                    .map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?,
                ) {
                    i += 1;
                    continue;
                }
                queued_events.extend_from_slice(
                    &serde_json::from_value::<Vec<EventId>>(
                        pdu.get("prev_events").cloned().ok_or_else(|| {
                            Error::bad_database("Invalid prev_events field of pdu in db.")
                        })?,
                    )
                    .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
                );
    
                events.push(serde_json::from_value(pdu).expect("Raw<..> is always valid"));
    
            }
            i += 1;
        }
    
        Ok(get_missing_events::v1::Response { events }.into())
    }
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v1/query/profile", data = "<body>")
    )]
    pub fn get_profile_information_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_profile_information::v1::Request<'_>>,
    ) -> ConduitResult<get_profile_information::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut displayname = None;
        let mut avatar_url = None;
    
    
        match &body.field {
            // TODO: what to do with custom
            Some(ProfileField::_Custom(_s)) => {}
    
    Timo Kösters's avatar
    Timo Kösters committed
            Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
            Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
            None => {
                displayname = db.users.displayname(&body.user_id)?;
                avatar_url = db.users.avatar_url(&body.user_id)?;
            }
        }
    
        Ok(get_profile_information::v1::Response {
            displayname,
            avatar_url,
        }
        .into())
    }
    
    /*
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
    )]
    pub fn get_user_devices_route<'a>(
        db: State<'a, Database>,
        body: Ruma<membership::v1::Request<'_>>,
    ) -> ConduitResult<get_profile_information::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut displayname = None;
        let mut avatar_url = None;
    
        match body.field {
            Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
            Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
            None => {
                displayname = db.users.displayname(&body.user_id)?;
                avatar_url = db.users.avatar_url(&body.user_id)?;
            }
        }
    
        Ok(get_profile_information::v1::Response {
            displayname,
            avatar_url,
        }
        .into())
    }
    */
    
    
    #[cfg(test)]
    mod tests {
        use super::{add_port_to_hostname, get_ip_with_port};
    
        #[test]
        fn ips_get_default_ports() {
            assert_eq!(
                get_ip_with_port(String::from("1.1.1.1")),
                Some(String::from("1.1.1.1:8448"))
            );
            assert_eq!(
                get_ip_with_port(String::from("dead:beef::")),
                Some(String::from("[dead:beef::]:8448"))
            );
        }
    
        #[test]
        fn ips_keep_custom_ports() {
            assert_eq!(
                get_ip_with_port(String::from("1.1.1.1:1234")),
                Some(String::from("1.1.1.1:1234"))
            );
            assert_eq!(
                get_ip_with_port(String::from("[dead::beef]:8933")),
                Some(String::from("[dead::beef]:8933"))
            );
        }
    
        #[test]
        fn hostnames_get_default_ports() {
            assert_eq!(
                add_port_to_hostname(String::from("example.com")),
                "example.com:8448"
            )
        }
    
        #[test]
        fn hostnames_keep_custom_ports() {
            assert_eq!(
                add_port_to_hostname(String::from("example.com:1337")),
                "example.com:1337"
            )
        }
    }