Skip to content
Snippets Groups Projects
server_server.rs 61.3 KiB
Newer Older
  • Learn to ignore specific revisions
  •                             Err(_) => return Err("Redaction failed".to_string()),
                            }
                        } else {
                            value
                        }
                    }
                    Err(_e) => {
    
                        return Err("Signature verification failed".to_string());
                    }
                };
    
            // Now that we have checked the signature and hashes we can add the eventID and convert
            // to our PduEvent type also finally verifying the first step listed above
            val.insert(
                "event_id".to_owned(),
                to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
            );
            let pdu = serde_json::from_value::<PduEvent>(
                serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"),
            )
            .map_err(|_| "Event is not a valid PDU".to_string())?;
    
            debug!("Fetching auth events.");
    
            fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache)
                .await
    
            let pdu = Arc::new(pdu.clone());
    
    
            // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
    
            debug!("Fetching prev events.");
    
            let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache)
    
            */
    
            // if the previous event was the create event special rules apply
            let previous_create = if pdu.auth_events.len() == 1 && pdu.prev_events == pdu.auth_events {
                auth_cache.get(&pdu.auth_events[0]).cloned()
            } else {
                None
            };
    
            // Check that the event passes auth based on the auth_events
    
            debug!("Checking auth.");
    
            let is_authed = state_res::event_auth::auth_check(
                &RoomVersionId::Version6,
                &pdu,
    
                previous_create.clone(),
    
                &pdu.auth_events
                    .iter()
                    .map(|id| {
                        auth_cache
                            .get(id)
                            .map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
                            .ok_or_else(|| {
                                "Auth event not found, event failed recursive auth checks.".to_string()
                            })
                    })
                    .collect::<StdResult<BTreeMap<_, _>, _>>()?,
                None, // TODO: third party invite
            )
            .map_err(|_e| "Auth check failed".to_string())?;
    
            if !is_authed {
                return Err("Event has failed auth check with auth events".to_string());
            }
    
    
            debug!("Validation successful.");
    
            Ok((pdu, previous_create))
    
    #[tracing::instrument(skip(db))]
    
    async fn fetch_check_auth_events(
        db: &Database,
        origin: &ServerName,
    
        key_map: &mut PublicKeyMap,
    
        event_ids: &[EventId],
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<()> {
    
        fetch_events(db, origin, key_map, event_ids, auth_cache).await?;
    
    /// Find the event and auth it. Once the event is validated (steps 1 - 8)
    /// it is appended to the outliers Tree.
    
    /// 1. Look in the main timeline (pduid_pdu tree)
    /// 2. Look at outlier pdu tree
    /// 3. Ask origin server over federation
    /// 4. TODO: Ask other servers over federation?
    
    ///
    /// If the event is unknown to the `auth_cache` it is added. This guarantees that any
    /// event we need to know of will be present.
    
    #[tracing::instrument(skip(db))]
    
        key_map: &mut PublicKeyMap,
    
        events: &[EventId],
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<Vec<Arc<PduEvent>>> {
        let mut pdus = vec![];
        for id in events {
    
            let pdu = match auth_cache.get(id) {
    
                    debug!("Event found in cache");
    
                // `get_pdu` checks the outliers tree for us
                None => match db.rooms.get_pdu(&id)? {
    
                        debug!("Event found in outliers");
    
                        debug!("Fetching event over federation");
    
                        match db
                            .sending
                            .send_federation_request(
                                &db.globals,
                                origin,
                                get_event::v1::Request { event_id: &id },
                            )
                            .await
                        {
                            Ok(res) => {
    
                                debug!("Got event over federation: {:?}", res);
    
                                let (event_id, value) =
                                    crate::pdu::gen_event_id_canonical_json(&res.pdu);
                                let (pdu, _) =
                                    validate_event(db, value, event_id, key_map, origin, auth_cache)
                                        .await
                                        .map_err(|e| {
                                            error!("ERROR: {:?}", e);
                                            Error::Conflict("Authentication of event failed")
                                        })?;
    
    
                                debug!("Added fetched pdu as outlier.");
    
                                db.rooms.add_pdu_outlier(&pdu)?;
                                pdu
                            }
                            Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
    
            auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
    
    /// Search the DB for the signing keys of the given server, if we don't have them
    /// fetch them from the server and save to our DB.
    
    #[tracing::instrument(skip(db))]
    
    pub(crate) async fn fetch_signing_keys(
    
        origin: &ServerName,
    
    ) -> Result<BTreeMap<String, String>> {
        let mut result = BTreeMap::new();
    
    
        match db.globals.signing_keys_for(origin)? {
    
            keys if !keys.is_empty() => Ok(keys
                .into_iter()
                .map(|(k, v)| (k.to_string(), v.key))
                .collect()),
    
                    .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
    
                {
                    Ok(keys) => {
                        db.globals.add_signing_key(origin, &keys.server_key)?;
    
    
                        result.extend(
                            keys.server_key
                                .verify_keys
                                .into_iter()
                                .map(|(k, v)| (k.to_string(), v.key)),
                        );
                        result.extend(
                            keys.server_key
                                .old_verify_keys
                                .into_iter()
                                .map(|(k, v)| (k.to_string(), v.key)),
                        );
                        return Ok(result);
    
                        for server in db.globals.trusted_servers() {
    
                            debug!("Asking {} for {}'s signing key", server, origin);
    
                            if let Ok(keys) = db
                                .sending
                                .send_federation_request(
                                    &db.globals,
                                    &server,
                                    get_remote_server_keys::v2::Request::new(
    
                                        origin,
    
                                        SystemTime::now()
                                            .checked_add(Duration::from_secs(3600))
                                            .expect("SystemTime to large"),
                                    ),
                                )
                                .await
                            {
    
                                debug!("Got signing keys: {:?}", keys);
    
                                for k in keys.server_keys.into_iter() {
    
                                    db.globals.add_signing_key(origin, &k)?;
    
                                    result.extend(
                                        k.verify_keys
                                            .into_iter()
                                            .map(|(k, v)| (k.to_string(), v.key)),
                                    );
                                    result.extend(
                                        k.old_verify_keys
                                            .into_iter()
                                            .map(|(k, v)| (k.to_string(), v.key)),
                                    );
    
                                return Ok(result);
    
                            }
                        }
                        Err(Error::BadServerResponse(
                            "Failed to find public key for server",
                        ))
                    }
                }
    
    /// Gather all state snapshots needed to resolve the current state of the room.
    ///
    /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
    
    /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
    ///
    /// The state snapshot of the incoming event __needs__ to be added to the resulting list.
    
    #[tracing::instrument(skip(db))]
    
    pub(crate) async fn calculate_forward_extremities(
    
        let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
    
    
        let mut is_incoming_leaf = true;
        // Make sure the incoming event is not already a forward extremity
        // FIXME: I think this could happen if different servers send us the same event??
    
        if current_leaves.contains(pdu.event_id()) {
    
            error!("The incoming event is already present in get_pdu_leaves BUG");
    
            is_incoming_leaf = false;
            // Not sure what to do here
        }
    
    
        // If the incoming event is already referenced by an existing event
        // then do nothing - it's not a candidate to be a new extremity if
        // it has been referenced.
    
        if db.rooms.is_pdu_referenced(pdu)? {
            is_incoming_leaf = false;
        }
    
    
        // TODO:
        // [dendrite] Checks if any other leaves have been referenced and removes them
        // but as long as we update the pdu leaves here and for events on our server this
        // should not be possible.
    
        // Remove any forward extremities that are referenced by this incoming events prev_events
    
        for incoming_leaf in &pdu.prev_events {
    
            if current_leaves.contains(incoming_leaf) {
                if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) {
                    current_leaves.remove(pos);
                }
    
        // Add the incoming event only if it is a leaf, we do this after fetching all the
        // state since we know we have already fetched the state of the incoming event so lets
        // not do it again!
        if is_incoming_leaf {
            current_leaves.push(pdu.event_id().clone());
        }
    
        Ok(current_leaves)
    }
    
    /// This should always be called after the incoming event has been appended to the DB.
    ///
    
    /// This guarantees that the incoming event will be in the state sets (at least our servers
    
    #[tracing::instrument(skip(db))]
    
    pub(crate) async fn build_forward_extremity_snapshots(
        db: &Database,
    
        mut current_state: StateMap<Arc<PduEvent>>,
    
        current_leaves: &[EventId],
        pub_key_map: &PublicKeyMap,
        auth_cache: &mut EventMap<Arc<PduEvent>>,
    ) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> {
    
        let current_shortstatehash = db.rooms.current_shortstatehash(pdu.room_id())?;
    
        let mut includes_current_state = false;
    
        let mut fork_states = BTreeSet::new();
    
            if id == &pdu.event_id {
                continue;
            }
    
            match db.rooms.get_pdu(id)? {
    
                // We can skip this because it is handled outside of this function
                // The current server state and incoming event state are built to be
                // the state after.
                // This would be the incoming state from the server.
    
                Some(leave_pdu) => {
                    let pdu_shortstatehash = db
    
                        .pdu_shortstatehash(&leave_pdu.event_id)?
                        .ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?;
    
                    if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) {
    
                        includes_current_state = true;
                    }
    
                    let mut state_before = db
                        .rooms
    
                        .state_full(pdu.room_id(), pdu_shortstatehash)?
    
                        .into_iter()
                        .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
                        .collect::<StateMap<_>>();
    
                    // Now it's the state after
    
                    let key = (leave_pdu.kind.clone(), leave_pdu.state_key.clone());
                    state_before.insert(key, Arc::new(leave_pdu));
    
                    fork_states.insert(state_before);
    
                    error!("Missing state snapshot for {:?}", id);
                    return Err(Error::bad_database("Missing state snapshot."));
    
        // This guarantees that our current room state is included
    
        if !includes_current_state {
    
            current_state.insert((pdu.kind(), pdu.state_key()), pdu);
    
    
    #[tracing::instrument(skip(db))]
    
    pub(crate) fn update_resolved_state(
    
        state: Option<StateMap<Arc<PduEvent>>>,
    ) -> Result<()> {
        // Update the state of the room if needed
        // We can tell if we need to do this based on wether state resolution took place or not
        if let Some(state) = state {
    
            let mut new_state = HashMap::new();
            for ((ev_type, state_k), pdu) in state {
    
                new_state.insert(
                    (
                        ev_type,
                        state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
                    ),
    
                    pdu.event_id.clone(),
    
            db.rooms.force_state(room_id, new_state, &db.globals)?;
        }
    
        Ok(())
    }
    
    /// Append the incoming event setting the state snapshot to the state from the
    /// server that sent the event.
    
    #[tracing::instrument(skip(db))]
    
    pub(crate) fn append_incoming_pdu(
        db: &Database,
        pdu: &PduEvent,
        new_room_leaves: &[EventId],
        state: &StateMap<Arc<PduEvent>>,
    ) -> Result<()> {
        // Update the state of the room if needed
        // We can tell if we need to do this based on wether state resolution took place or not
        let mut new_state = HashMap::new();
    
        for ((ev_type, state_k), state_pdu) in state {
    
            new_state.insert(
                (
                    ev_type.clone(),
                    state_k
                        .clone()
                        .ok_or_else(|| Error::Conflict("State contained non state event"))?,
                ),
    
                state_pdu.event_id.clone(),
    
        db.rooms
            .force_state(pdu.room_id(), new_state, &db.globals)?;
    
    
        let count = db.globals.next_count()?;
        let mut pdu_id = pdu.room_id.as_bytes().to_vec();
        pdu_id.push(0xff);
        pdu_id.extend_from_slice(&count.to_be_bytes());
    
    
        // We append to state before appending the pdu, so we don't have a moment in time with the
        // pdu without it's state. This is okay because append_pdu can't fail.
    
        let state_hash = db.rooms.append_to_state(&pdu, &db.globals)?;
    
    
        db.rooms.append_pdu(
    
            utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
    
            count,
            pdu_id.clone().into(),
    
    Devin Ragotzy's avatar
    Devin Ragotzy committed
            &db,
    
        db.rooms.set_room_state(pdu.room_id(), state_hash)?;
    
        for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
    
            if let Some(namespaces) = appservice.1.get("namespaces") {
                let users = namespaces
                    .get("users")
                    .and_then(|users| users.as_sequence())
                    .map_or_else(Vec::new, |users| {
                        users
                            .iter()
                            .map(|users| {
                                users
                                    .get("regex")
                                    .and_then(|regex| regex.as_str())
                                    .and_then(|regex| Regex::new(regex).ok())
                            })
                            .filter_map(|o| o)
                            .collect::<Vec<_>>()
                    });
                let aliases = namespaces
                    .get("aliases")
                    .and_then(|users| users.get("regex"))
                    .and_then(|regex| regex.as_str())
                    .and_then(|regex| Regex::new(regex).ok());
                let rooms = namespaces
                    .get("rooms")
                    .and_then(|rooms| rooms.as_sequence());
    
                let room_aliases = db.rooms.room_aliases(&pdu.room_id);
    
                let bridge_user_id = appservice
                    .1
                    .get("sender_localpart")
                    .and_then(|string| string.as_str())
                    .and_then(|string| {
                        UserId::parse_with_server_name(string, db.globals.server_name()).ok()
                    });
    
                #[allow(clippy::blocks_in_if_conditions)]
                if bridge_user_id.map_or(false, |bridge_user_id| {
                    db.rooms
                        .is_joined(&bridge_user_id, &pdu.room_id)
                        .unwrap_or(false)
                }) || users.iter().any(|users| {
                    users.is_match(pdu.sender.as_str())
                        || pdu.kind == EventType::RoomMember
                            && pdu
                                .state_key
                                .as_ref()
                                .map_or(false, |state_key| users.is_match(&state_key))
                }) || aliases.map_or(false, |aliases| {
                    room_aliases
                        .filter_map(|r| r.ok())
                        .any(|room_alias| aliases.is_match(room_alias.as_str()))
                }) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into()))
                    || db
                        .rooms
                        .room_members(&pdu.room_id)
                        .filter_map(|r| r.ok())
                        .any(|member| users.iter().any(|regex| regex.is_match(member.as_str())))
                {
                    db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
                }
            }
    
    #[cfg_attr(
        feature = "conduit_bin",
        post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
    )]
    
    #[tracing::instrument(skip(db, body))]
    
    pub fn get_missing_events_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_missing_events::v1::Request<'_>>,
    ) -> ConduitResult<get_missing_events::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
        let mut queued_events = body.latest_events.clone();
        let mut events = Vec::new();
    
        let mut i = 0;
        while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
            if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? {
                if body.earliest_events.contains(
                    &serde_json::from_value(
                        pdu.get("event_id")
                            .cloned()
                            .ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?,
                    )
                    .map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?,
                ) {
                    i += 1;
                    continue;
                }
                queued_events.extend_from_slice(
                    &serde_json::from_value::<Vec<EventId>>(
                        pdu.get("prev_events").cloned().ok_or_else(|| {
                            Error::bad_database("Invalid prev_events field of pdu in db.")
                        })?,
                    )
                    .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
                );
    
                events.push(PduEvent::convert_to_outgoing_federation_event(
                    serde_json::from_value(pdu)
                        .map_err(|_| Error::bad_database("Invalid pdu in database."))?,
                ));
    
            }
            i += 1;
        }
    
        Ok(get_missing_events::v1::Response { events }.into())
    }
    
    Timo Kösters's avatar
    Timo Kösters committed
    
    
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v1/state_ids/<_>", data = "<body>")
    )]
    #[tracing::instrument(skip(db, body))]
    pub fn get_room_state_ids_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_room_state_ids::v1::Request<'_>>,
    ) -> ConduitResult<get_room_state_ids::v1::Response> {
        if !db.globals.allow_federation() {
            return Err(Error::bad_config("Federation is disabled."));
        }
    
        let shortstatehash = db
            .rooms
            .pdu_shortstatehash(&body.event_id)?
            .ok_or(Error::BadRequest(
                ErrorKind::NotFound,
                "Pdu state not found.",
            ))?;
    
        let pdu_ids = db.rooms.state_full_ids(shortstatehash)?;
    
        let mut auth_chain_ids = BTreeSet::<EventId>::new();
        let mut todo = BTreeSet::new();
        todo.insert(body.event_id.clone());
    
        loop {
            if let Some(event_id) = todo.iter().next().cloned() {
                if let Some(pdu) = db.rooms.get_pdu(&event_id)? {
                    todo.extend(
                        pdu.auth_events
                            .clone()
                            .into_iter()
                            .collect::<BTreeSet<_>>()
                            .difference(&auth_chain_ids)
                            .cloned(),
                    );
                    auth_chain_ids.extend(pdu.auth_events.into_iter());
                } else {
                    warn!("Could not find pdu mentioned in auth events.");
                }
    
                todo.remove(&event_id);
            } else {
                break;
            }
        }
    
        Ok(get_room_state_ids::v1::Response {
            auth_chain_ids: auth_chain_ids.into_iter().collect(),
            pdu_ids,
        }
        .into())
    }
    
    
    Timo Kösters's avatar
    Timo Kösters committed
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v1/query/profile", data = "<body>")
    )]
    
    #[tracing::instrument(skip(db, body))]
    
    Timo Kösters's avatar
    Timo Kösters committed
    pub fn get_profile_information_route<'a>(
        db: State<'a, Database>,
        body: Ruma<get_profile_information::v1::Request<'_>>,
    ) -> ConduitResult<get_profile_information::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut displayname = None;
        let mut avatar_url = None;
    
    
        match &body.field {
            // TODO: what to do with custom
            Some(ProfileField::_Custom(_s)) => {}
    
    Timo Kösters's avatar
    Timo Kösters committed
            Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
            Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
            None => {
                displayname = db.users.displayname(&body.user_id)?;
                avatar_url = db.users.avatar_url(&body.user_id)?;
            }
        }
    
        Ok(get_profile_information::v1::Response {
            displayname,
            avatar_url,
        }
        .into())
    }
    
    /*
    #[cfg_attr(
        feature = "conduit_bin",
        get("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
    )]
    pub fn get_user_devices_route<'a>(
        db: State<'a, Database>,
        body: Ruma<membership::v1::Request<'_>>,
    ) -> ConduitResult<get_profile_information::v1::Response> {
    
        if !db.globals.allow_federation() {
    
            return Err(Error::bad_config("Federation is disabled."));
    
    Timo Kösters's avatar
    Timo Kösters committed
        let mut displayname = None;
        let mut avatar_url = None;
    
        match body.field {
            Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
            Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
            None => {
                displayname = db.users.displayname(&body.user_id)?;
                avatar_url = db.users.avatar_url(&body.user_id)?;
            }
        }
    
        Ok(get_profile_information::v1::Response {
            displayname,
            avatar_url,
        }
        .into())
    }
    */
    
    
    #[cfg(test)]
    mod tests {
        use super::{add_port_to_hostname, get_ip_with_port};
    
        #[test]
        fn ips_get_default_ports() {
            assert_eq!(
                get_ip_with_port(String::from("1.1.1.1")),
                Some(String::from("1.1.1.1:8448"))
            );
            assert_eq!(
                get_ip_with_port(String::from("dead:beef::")),
                Some(String::from("[dead:beef::]:8448"))
            );
        }
    
        #[test]
        fn ips_keep_custom_ports() {
            assert_eq!(
                get_ip_with_port(String::from("1.1.1.1:1234")),
                Some(String::from("1.1.1.1:1234"))
            );
            assert_eq!(
                get_ip_with_port(String::from("[dead::beef]:8933")),
                Some(String::from("[dead::beef]:8933"))
            );
        }
    
        #[test]
        fn hostnames_get_default_ports() {
            assert_eq!(
                add_port_to_hostname(String::from("example.com")),
                "example.com:8448"
            )
        }
    
        #[test]
        fn hostnames_keep_custom_ports() {
            assert_eq!(
                add_port_to_hostname(String::from("example.com:1337")),
                "example.com:1337"
            )
        }
    }