Skip to content
Snippets Groups Projects
sync.rs 62.6 KiB
Newer Older
  • Learn to ignore specific revisions
  •             services()
                    .rooms
                    .user
                    .notification_count(&sender_user, &room_id)?
                    .try_into()
                    .expect("notification count can't go that high"),
            )
        } else {
            None
        };
    
        let highlight_count = if send_notification_counts {
            Some(
                services()
                    .rooms
                    .user
                    .highlight_count(&sender_user, &room_id)?
                    .try_into()
                    .expect("highlight count can't go that high"),
            )
        } else {
            None
        };
    
        let prev_batch = timeline_pdus
            .first()
            .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
                Ok(Some(match pdu_count {
                    PduCount::Backfilled(_) => {
                        error!("timeline in backfill state?!");
                        "0".to_owned()
                    }
                    PduCount::Normal(c) => c.to_string(),
                }))
            })?;
    
        let room_events: Vec<_> = timeline_pdus
            .iter()
            .map(|(_, pdu)| pdu.to_sync_room_event())
            .collect();
    
        let mut edus: Vec<_> = services()
            .rooms
            .edus
            .read_receipt
            .readreceipts_since(&room_id, since)
            .filter_map(|r| r.ok()) // Filter out buggy events
            .map(|(_, _, v)| v)
            .collect();
    
        if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
            edus.push(
                serde_json::from_str(
                    &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
                        .expect("event is valid, we just created it"),
                )
                .expect("event is valid, we just created it"),
            );
    
        // Save the state after this sync so we can send the correct state diff next sync
        services().rooms.user.associate_token_shortstatehash(
            &room_id,
            next_batch,
            current_shortstatehash,
        )?;
    
        Ok(JoinedRoom {
            account_data: RoomAccountData {
    
                events: services()
    
                    .account_data
    
                    .changes_since(Some(&room_id), &sender_user, since)?
    
                    .into_iter()
                    .filter_map(|(_, v)| {
                        serde_json::from_str(v.json().get())
                            .map_err(|_| Error::bad_database("Invalid account event in database."))
                            .ok()
                    })
    
    Jonas Platte's avatar
    Jonas Platte committed
                    .collect(),
    
            summary: RoomSummary {
                heroes,
                joined_member_count: joined_member_count.map(|n| (n as u32).into()),
                invited_member_count: invited_member_count.map(|n| (n as u32).into()),
    
            unread_notifications: UnreadNotificationsCount {
                highlight_count,
                notification_count,
    
            timeline: Timeline {
                limited: limited || joined_since_last_sync,
                prev_batch,
                events: room_events,
            },
            state: State {
                events: state_events
                    .iter()
                    .map(|pdu| pdu.to_sync_state_event())
                    .collect(),
            },
            ephemeral: Ephemeral { events: edus },
            unread_thread_notifications: BTreeMap::new(),
        })
    
    fn load_timeline(
        sender_user: &UserId,
        room_id: &RoomId,
    
        roomsincecount: PduCount,
    
        limit: u64,
    ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
        let timeline_pdus;
        let limited;
        if services()
            .rooms
            .timeline
            .last_timeline_count(&sender_user, &room_id)?
    
        {
            let mut non_timeline_pdus = services()
                .rooms
                .timeline
                .pdus_until(&sender_user, &room_id, PduCount::max())?
                .filter_map(|r| {
                    // Filter out buggy events
                    if r.is_err() {
                        error!("Bad pdu in pdus_since: {:?}", r);
                    }
                    r.ok()
                })
    
                .take_while(|(pducount, _)| pducount > &roomsincecount);
    
    
            // Take the last events for the timeline
            timeline_pdus = non_timeline_pdus
                .by_ref()
                .take(limit as usize)
                .collect::<Vec<_>>()
                .into_iter()
                .rev()
                .collect::<Vec<_>>();
    
            // They /sync response doesn't always return all messages, so we say the output is
            // limited unless there are events in non_timeline_pdus
            limited = non_timeline_pdus.next().is_some();
        } else {
            timeline_pdus = Vec::new();
            limited = false;
        }
        Ok((timeline_pdus, limited))
    }
    
    
    fn share_encrypted_room(
    
        sender_user: &UserId,
    
        user_id: &UserId,
        ignore_room: &RoomId,
    
    ) -> Result<bool> {
    
        Ok(services()
    
    Timo Kösters's avatar
    Timo Kösters committed
            .user
    
    Jonas Platte's avatar
    Jonas Platte committed
            .get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])?
    
            .filter_map(|r| r.ok())
            .filter(|room_id| room_id != ignore_room)
            .filter_map(|other_room_id| {
                Some(
    
    Timo Kösters's avatar
    Timo Kösters committed
                    services()
                        .rooms
                        .state_accessor
    
    Timo Kösters's avatar
    Timo Kösters committed
                        .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
    
                        .ok()?
                        .is_some(),
                )
            })
    
            .any(|encrypted| encrypted))
    
    
    pub async fn sync_events_v4_route(
        body: Ruma<sync_events::v4::Request>,
    ) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
        let sender_user = body.sender_user.expect("user is authenticated");
        let sender_device = body.sender_device.expect("user is authenticated");
    
        let mut body = body.body;
    
        // Setup watchers, so if there's no response, we can wait for them
        let watcher = services().globals.watch(&sender_user, &sender_device);
    
    
        let next_batch = services().globals.next_count()?;
    
        let globalsince = body
    
            .pos
            .as_ref()
            .and_then(|string| string.parse().ok())
            .unwrap_or(0);
    
    
        if globalsince == 0 {
    
            if let Some(conn_id) = &body.conn_id {
                services().users.forget_sync_request_connection(
                    sender_user.clone(),
                    sender_device.clone(),
                    conn_id.clone(),
                )
            }
        }
    
    
        // Get sticky parameters from cache
    
        let known_rooms = services().users.update_sync_request_with_cache(
            sender_user.clone(),
            sender_device.clone(),
            &mut body,
        );
    
    
        let all_joined_rooms = services()
            .rooms
            .state_cache
            .rooms_joined(&sender_user)
            .filter_map(|r| r.ok())
            .collect::<Vec<_>>();
    
    
        if body.extensions.to_device.enabled.unwrap_or(false) {
            services()
                .users
    
                .remove_to_device_events(&sender_user, &sender_device, globalsince)?;
    
        }
    
        let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
        let mut device_list_changes = HashSet::new();
        let mut device_list_left = HashSet::new();
    
        if body.extensions.e2ee.enabled.unwrap_or(false) {
            // Look for device list updates of this account
            device_list_changes.extend(
                services()
                    .users
    
                    .keys_changed(sender_user.as_ref(), globalsince, None)
    
                    .filter_map(|r| r.ok()),
            );
    
            for room_id in &all_joined_rooms {
                let current_shortstatehash =
                    if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
                        s
                    } else {
                        error!("Room {} has no state", room_id);
                        continue;
                    };
    
                let since_shortstatehash = services()
                    .rooms
                    .user
    
                    .get_token_shortstatehash(&room_id, globalsince)?;
    
    
                let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
                    .and_then(|shortstatehash| {
                        services()
                            .rooms
                            .state_accessor
                            .state_get(
                                shortstatehash,
                                &StateEventType::RoomMember,
                                sender_user.as_str(),
                            )
                            .transpose()
                    })
                    .transpose()?
                    .and_then(|pdu| {
                        serde_json::from_str(pdu.content.get())
                            .map_err(|_| Error::bad_database("Invalid PDU in database."))
                            .ok()
                    });
    
                let encrypted_room = services()
                    .rooms
                    .state_accessor
                    .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
                    .is_some();
    
                if let Some(since_shortstatehash) = since_shortstatehash {
                    // Skip if there are only timeline changes
                    if since_shortstatehash == current_shortstatehash {
                        continue;
                    }
    
                    let since_encryption = services().rooms.state_accessor.state_get(
                        since_shortstatehash,
                        &StateEventType::RoomEncryption,
                        "",
                    )?;
    
                    let joined_since_last_sync = since_sender_member
                        .map_or(true, |member| member.membership != MembershipState::Join);
    
                    let new_encrypted_room = encrypted_room && since_encryption.is_none();
                    if encrypted_room {
                        let current_state_ids = services()
                            .rooms
                            .state_accessor
                            .state_full_ids(current_shortstatehash)
                            .await?;
                        let since_state_ids = services()
                            .rooms
                            .state_accessor
                            .state_full_ids(since_shortstatehash)
                            .await?;
    
                        for (key, id) in current_state_ids {
                            if since_state_ids.get(&key) != Some(&id) {
                                let pdu = match services().rooms.timeline.get_pdu(&id)? {
                                    Some(pdu) => pdu,
                                    None => {
                                        error!("Pdu in state not found: {}", id);
                                        continue;
                                    }
                                };
                                if pdu.kind == TimelineEventType::RoomMember {
                                    if let Some(state_key) = &pdu.state_key {
                                        let user_id =
                                            UserId::parse(state_key.clone()).map_err(|_| {
                                                Error::bad_database("Invalid UserId in member PDU.")
                                            })?;
    
                                        if user_id == sender_user {
                                            continue;
                                        }
    
                                        let new_membership = serde_json::from_str::<
                                            RoomMemberEventContent,
                                        >(
                                            pdu.content.get()
                                        )
                                        .map_err(|_| Error::bad_database("Invalid PDU in database."))?
                                        .membership;
    
                                        match new_membership {
                                            MembershipState::Join => {
                                                // A new user joined an encrypted room
                                                if !share_encrypted_room(
                                                    &sender_user,
                                                    &user_id,
                                                    &room_id,
                                                )? {
                                                    device_list_changes.insert(user_id);
                                                }
                                            }
                                            MembershipState::Leave => {
                                                // Write down users that have left encrypted rooms we are in
                                                left_encrypted_users.insert(user_id);
                                            }
                                            _ => {}
                                        }
                                    }
                                }
                            }
                        }
                        if joined_since_last_sync || new_encrypted_room {
                            // If the user is in a new encrypted room, give them all joined users
                            device_list_changes.extend(
                                services()
                                    .rooms
                                    .state_cache
                                    .room_members(&room_id)
                                    .flatten()
                                    .filter(|user_id| {
                                        // Don't send key updates from the sender to the sender
                                        &sender_user != user_id
                                    })
                                    .filter(|user_id| {
                                        // Only send keys if the sender doesn't share an encrypted room with the target already
                                        !share_encrypted_room(&sender_user, user_id, &room_id)
                                            .unwrap_or(false)
                                    }),
                            );
                        }
                    }
                }
                // Look for device list updates in this room
                device_list_changes.extend(
                    services()
                        .users
    
                        .keys_changed(room_id.as_ref(), globalsince, None)
    
                        .filter_map(|r| r.ok()),
                );
            }
            for user_id in left_encrypted_users {
                let dont_share_encrypted_room = services()
                    .rooms
                    .user
                    .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
                    .filter_map(|r| r.ok())
                    .filter_map(|other_room_id| {
                        Some(
                            services()
                                .rooms
                                .state_accessor
                                .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
                                .ok()?
                                .is_some(),
                        )
                    })
                    .all(|encrypted| !encrypted);
                // If the user doesn't share an encrypted room with the target anymore, we need to tell
                // them
                if dont_share_encrypted_room {
                    device_list_left.insert(user_id);
                }
            }
        }
    
    
        let mut lists = BTreeMap::new();
        let mut todo_rooms = BTreeMap::new(); // and required state
    
        for (list_id, list) in body.lists {
            if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
                continue;
            }
    
    
            let mut new_known_rooms = BTreeSet::new();
    
                list_id.clone(),
    
                sync_events::v4::SyncList {
                    ops: list
                        .ranges
                        .into_iter()
                        .map(|mut r| {
                            r.0 =
                                r.0.clamp(uint!(0), UInt::from(all_joined_rooms.len() as u32 - 1));
                            r.1 =
                                r.1.clamp(r.0, UInt::from(all_joined_rooms.len() as u32 - 1));
                            let room_ids = all_joined_rooms
                                [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
                                .to_vec();
    
                            new_known_rooms.extend(room_ids.iter().cloned());
    
                            for room_id in &room_ids {
                                let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
                                    BTreeSet::new(),
                                    0,
    
                                let limit = list
                                    .room_details
                                    .timeline_limit
                                    .map_or(10, u64::from)
                                    .min(100);
    
                                todo_room
                                    .0
                                    .extend(list.room_details.required_state.iter().cloned());
    
                                todo_room.1 = todo_room.1.max(limit);
    
                                // 0 means unknown because it got out of date
                                todo_room.2 = todo_room.2.min(
                                    known_rooms
                                        .get(&list_id)
                                        .and_then(|k| k.get(room_id))
                                        .copied()
                                        .unwrap_or(0),
                                );
    
                            sync_events::v4::SyncOp {
                                op: SlidingOp::Sync,
                                range: Some(r.clone()),
                                index: None,
                                room_ids,
                                room_id: None,
                            }
                        })
                        .collect(),
                    count: UInt::from(all_joined_rooms.len() as u32),
                },
            );
    
    
            if let Some(conn_id) = &body.conn_id {
                services().users.update_sync_known_rooms(
                    sender_user.clone(),
                    sender_device.clone(),
                    conn_id.clone(),
                    list_id,
                    new_known_rooms,
    
        let mut known_subscription_rooms = BTreeSet::new();
    
        for (room_id, room) in &body.room_subscriptions {
    
            let todo_room = todo_rooms
                .entry(room_id.clone())
    
                .or_insert((BTreeSet::new(), 0, u64::MAX));
    
            let limit = room.timeline_limit.map_or(10, u64::from).min(100);
            todo_room.0.extend(room.required_state.iter().cloned());
    
            todo_room.1 = todo_room.1.max(limit);
    
            // 0 means unknown because it got out of date
            todo_room.2 = todo_room.2.min(
                known_rooms
                    .get("subscriptions")
                    .and_then(|k| k.get(room_id))
                    .copied()
                    .unwrap_or(0),
            );
            known_subscription_rooms.insert(room_id.clone());
    
        }
    
        for r in body.unsubscribe_rooms {
            known_subscription_rooms.remove(&r);
            body.room_subscriptions.remove(&r);
        }
    
        if let Some(conn_id) = &body.conn_id {
            services().users.update_sync_known_rooms(
                sender_user.clone(),
                sender_device.clone(),
                conn_id.clone(),
                "subscriptions".to_owned(),
                known_subscription_rooms,
    
            );
        }
    
        if let Some(conn_id) = &body.conn_id {
            services().users.update_sync_subscriptions(
                sender_user.clone(),
                sender_device.clone(),
                conn_id.clone(),
                body.room_subscriptions,
            );
    
        }
    
        let mut rooms = BTreeMap::new();
    
        for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
            let roomsincecount = PduCount::Normal(*roomsince);
    
    
            let (timeline_pdus, limited) =
    
                load_timeline(&sender_user, &room_id, roomsincecount, *timeline_limit)?;
    
            if roomsince != &0 && timeline_pdus.is_empty() {
    
            let prev_batch = timeline_pdus
                .first()
                .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
                    Ok(Some(match pdu_count {
                        PduCount::Backfilled(_) => {
                            error!("timeline in backfill state?!");
                            "0".to_owned()
                        }
                        PduCount::Normal(c) => c.to_string(),
                    }))
    
                })?
                .or_else(|| {
    
                    if roomsince != &0 {
                        Some(roomsince.to_string())
    
            let room_events: Vec<_> = timeline_pdus
                .iter()
                .map(|(_, pdu)| pdu.to_sync_room_event())
                .collect();
    
            let required_state = required_state_request
                .iter()
                .map(|state| {
                    services()
                        .rooms
                        .state_accessor
                        .room_state_get(&room_id, &state.0, &state.1)
                })
                .filter_map(|r| r.ok())
                .filter_map(|o| o)
                .map(|state| state.to_sync_state_event())
                .collect();
    
    
            // Heroes
            let heroes = services()
                .rooms
                .state_cache
                .room_members(&room_id)
                .filter_map(|r| r.ok())
                .filter(|member| member != &sender_user)
                .map(|member| {
                    Ok::<_, Error>(
                        services()
                            .rooms
                            .state_accessor
                            .get_member(&room_id, &member)?
                            .map(|memberevent| {
                                (
                                    memberevent
                                        .displayname
                                        .unwrap_or_else(|| member.to_string()),
                                    memberevent.avatar_url,
                                )
                            }),
                    )
                })
                .filter_map(|r| r.ok())
                .filter_map(|o| o)
                .take(5)
                .collect::<Vec<_>>();
            let name = if heroes.len() > 1 {
                let last = heroes[0].0.clone();
                Some(
                    heroes[1..]
                        .iter()
                        .map(|h| h.0.clone())
                        .collect::<Vec<_>>()
                        .join(", ")
                        + " and "
                        + &last,
                )
            } else if heroes.len() == 1 {
                Some(heroes[0].0.clone())
            } else {
                None
            };
    
            let avatar = if heroes.len() == 1 {
                heroes[0].1.clone()
            } else {
                None
            };
    
    
            rooms.insert(
                room_id.clone(),
                sync_events::v4::SlidingSyncRoom {
    
                    name: services()
                        .rooms
                        .state_accessor
                        .get_name(&room_id)?
    
                        .or_else(|| name),
                    avatar: services()
                        .rooms
                        .state_accessor
                        .get_avatar(&room_id)?
                        .map_or(avatar, |a| a.url),
    
                    initial: Some(roomsince == &0),
    
                    is_dm: None,
                    invite_state: None,
                    unread_notifications: UnreadNotificationsCount {
    
                        highlight_count: Some(
                            services()
                                .rooms
                                .user
                                .highlight_count(&sender_user, &room_id)?
                                .try_into()
                                .expect("notification count can't go that high"),
                        ),
                        notification_count: Some(
                            services()
                                .rooms
                                .user
                                .notification_count(&sender_user, &room_id)?
                                .try_into()
                                .expect("notification count can't go that high"),
                        ),
    
                    },
                    timeline: room_events,
                    required_state,
    
                    prev_batch,
    
                    limited,
                    joined_count: Some(
                        (services()
                            .rooms
                            .state_cache
                            .room_joined_count(&room_id)?
                            .unwrap_or(0) as u32)
                            .into(),
                    ),
                    invited_count: Some(
                        (services()
                            .rooms
                            .state_cache
                            .room_invited_count(&room_id)?
                            .unwrap_or(0) as u32)
                            .into(),
                    ),
    
                    num_live: None, // Count events in timeline greater than global sync counter
    
                },
            );
        }
    
        if rooms
            .iter()
            .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
        {
            // Hang a few seconds so requests are not spammed
            // Stop hanging if new info arrives
            let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
            if duration.as_secs() > 30 {
                duration = Duration::from_secs(30);
            }
            let _ = tokio::time::timeout(duration, watcher).await;
        }
    
    
        Ok(sync_events::v4::Response {
    
            initial: globalsince == 0,
    
            txn_id: body.txn_id.clone(),
            pos: next_batch.to_string(),
            lists,
            rooms,
            extensions: sync_events::v4::Extensions {
    
                to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
                    Some(sync_events::v4::ToDevice {
                        events: services()
                            .users
                            .get_to_device_events(&sender_user, &sender_device)?,
                        next_batch: next_batch.to_string(),
                    })
                } else {
                    None
                },
    
                e2ee: sync_events::v4::E2EE {
                    device_lists: DeviceLists {
    
                        changed: device_list_changes.into_iter().collect(),
                        left: device_list_left.into_iter().collect(),
    
                    device_one_time_keys_count: services()
                        .users
                        .count_one_time_keys(&sender_user, &sender_device)?,
                    // Fallback keys are not yet supported
    
                    device_unused_fallback_key_types: None,
                },
                account_data: sync_events::v4::AccountData {
    
                    global: if body.extensions.account_data.enabled.unwrap_or(false) {
                        services()
                            .account_data
    
                            .changes_since(None, &sender_user, globalsince)?
    
                            .into_iter()
                            .filter_map(|(_, v)| {
                                serde_json::from_str(v.json().get())
                                    .map_err(|_| {
                                        Error::bad_database("Invalid account event in database.")
                                    })
                                    .ok()
                            })
                            .collect()
                    } else {
                        Vec::new()
                    },
    
                    rooms: BTreeMap::new(),
                },
                receipts: sync_events::v4::Receipts {
                    rooms: BTreeMap::new(),
                },
                typing: sync_events::v4::Typing {
                    rooms: BTreeMap::new(),
                },
            },
            delta_token: None,