Newer
Older
use crate::{
service::rooms::timeline::PduCount, services, Error, PduEvent, Result, Ruma, RumaResponse,
};
sync::sync_events::{
self,
v3::{
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
},
DeviceLists, UnreadNotificationsCount,
},
serde::Raw,
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
///
/// - This endpoint takes a `since` parameter which should be the `next_batch` value from a
/// previous request for incremental syncs.
///
/// Calling this endpoint without a `since` parameter returns:
/// - Some of the most recent events of each timeline
/// - Notification counts for each room
/// - Joined and invited member counts, heroes
/// - All state events
///
/// Calling this endpoint with a `since` parameter from a previous `next_batch` returns:
/// For joined rooms:
/// - Some of the most recent events of each timeline that happened after since
/// - If user joined the room after since: All state events (unless lazy loading is activated) and
/// all device list updates in that room
/// - If the user was already in the room: A list of all events that are in the state now, but were
/// not in the state at `since`
/// - If the state we send contains a member event: Joined and invited member counts, heroes
/// - Device list updates that happened after `since`
/// - If there are events in the timeline we send or the user send updated his read mark: Notification counts
/// - EDUs that are active now (read receipts, typing updates, presence)
/// - TODO: Allow multiple sync streams to support Pantalaimon
///
/// For invited rooms:
/// - If the user was invited after `since`: A subset of the state of the room at the point of the invite
///
/// For left rooms:
/// - If the user left after `since`: prev_batch token, empty state (TODO: subset of the state at the point of the leave)
///
/// - Sync is handled in an async task, multiple requests from the same device with the same
/// `since` will be cached
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let body = body.body;
.globals
.sync_receivers
.write()
.unwrap()
.entry((sender_user.clone(), sender_device.clone()))
{
Entry::Vacant(v) => {
let (tx, rx) = tokio::sync::watch::channel(None);
v.insert((body.since.clone(), rx.clone()));
tokio::spawn(sync_helper_wrapper(
sender_user.clone(),
sender_device.clone(),
}
Entry::Occupied(mut o) => {
if o.get().0 != body.since {
let (tx, rx) = tokio::sync::watch::channel(None);
o.insert((body.since.clone(), rx.clone()));
tokio::spawn(sync_helper_wrapper(
sender_user.clone(),
sender_device.clone(),
tx,
));
rx
} else {
o.get().1.clone()
}
}
};
let we_have_to_wait = rx.borrow().is_none();
if we_have_to_wait {
if let Err(e) = rx.changed().await {
error!("Error waiting for sync: {}", e);
}
}
let result = match rx
.borrow()
.as_ref()
.expect("When sync channel changes it's always set to some")
{
Ok(response) => Ok(response.clone()),
Err(error) => Err(error.to_response()),
};
result
}
let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await;
if let Ok((_, caching_allowed)) = r {
if !caching_allowed {
.globals
.sync_receivers
.write()
.unwrap()
.entry((sender_user, sender_device))
{
Entry::Occupied(o) => {
// Only remove if the device didn't start a different /sync already
if o.get().0 == since {
o.remove();
}
}
Entry::Vacant(_) => {}
}
}
}
let _ = tx.send(Some(r.map(|(r, _)| r)));
if services().globals.allow_local_presence() {
services()
.rooms
.edus
.presence
.ping_presence(&sender_user, body.set_presence)?;
}
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
let next_batch = services().globals.current_count()?;
let next_batchcount = PduCount::Normal(next_batch);
let next_batch_string = next_batch.to_string();
// Load filter
let filter = match body.filter {
None => FilterDefinition::default(),
Some(Filter::FilterDefinition(filter)) => filter,
Some(Filter::FilterId(filter_id)) => services()
.users
.get_filter(&sender_user, &filter_id)?
.unwrap_or_default(),
};
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options {
LazyLoadOptions::Enabled {
include_redundant_members: redundant,
} => (true, redundant),
LazyLoadOptions::Disabled => (false, false),
let mut joined_rooms = BTreeMap::new();
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let mut presence_updates = HashMap::new();
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_updates = HashSet::new();
// Look for device list updates of this account
device_list_updates.extend(
let all_joined_rooms = services()
.rooms
.state_cache
.rooms_joined(&sender_user)
.collect::<Vec<_>>();
for room_id in all_joined_rooms {
if let Ok(joined_room) = load_joined_room(
&sender_user,
&sender_device,
&room_id,
since,
sincecount,
next_batch,
next_batchcount,
lazy_load_enabled,
lazy_load_send_redundant,
full_state,
&mut device_list_updates,
&mut left_encrypted_users,
)
.await
{
if !joined_room.is_empty() {
joined_rooms.insert(room_id.clone(), joined_room);
}
if services().globals.allow_local_presence() {
process_room_presence_updates(&mut presence_updates, &room_id, since).await?;
}
}
}
let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services()
.rooms
.state_cache
.rooms_left(&sender_user)
.collect();
for result in all_left_rooms {
let (room_id, _) = result?;
let mut left_state_events = Vec::new();
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
.roomid_mutex_insert
.write()
.unwrap()
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
.state_cache
.get_left_count(&room_id, &sender_user)?;
// Left before last sync
if Some(since) >= left_count {
continue;
if !services().rooms.metadata.exists(&room_id)? {
// This is just a rejected invite, not a room we know
continue;
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(&room_id, since)?;
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
None => HashMap::new(),
};
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
&room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)? {
Some(e) => e,
None => {
error!("Left room but no left state event");
continue;
let left_shortstatehash = match services()
.rooms
.state_accessor
.pdu_shortstatehash(&left_event_id)?
{
Some(s) => s,
None => {
error!("Leave event has no state");
continue;
}
};
let mut left_state_ids = services()
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
let leave_shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
let mut i = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
|| full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
left_state_events.push(pdu.to_sync_state_event());
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
left_rooms.insert(
room_id.clone(),
LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
timeline: Timeline {
limited: false,
prev_batch: Some(next_batch_string.clone()),
events: Vec::new(),
},
state: State {
events: left_state_events,
},
},
);
}
let mut invited_rooms = BTreeMap::new();
let all_invited_rooms: Vec<_> = services()
.rooms
.state_cache
.rooms_invited(&sender_user)
.collect();
for result in all_invited_rooms {
let (room_id, invite_state_events) = result?;
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.unwrap()
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
let invite_count = services()
.rooms
.state_cache
.get_invite_count(&room_id, &sender_user)?;
// Invited before last sync
if Some(since) >= invite_count {
continue;
}
invited_rooms.insert(
room_id.clone(),
InvitedRoom {
invite_state: InviteState {
events: invite_state_events,
},
},
);
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(|other_room_id| {
Some(
services()
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if dont_share_encrypted_room {
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
device_list_left.insert(user_id);
}
}
// Remove all to-device events the device received *last time*
services()
.users
.remove_to_device_events(&sender_user, &sender_device, since)?;
let response = sync_events::v3::Response {
next_batch: next_batch_string,
rooms: Rooms {
leave: left_rooms,
join: joined_rooms,
invite: invited_rooms,
knock: BTreeMap::new(), // TODO
},
presence: Presence {
events: presence_updates
.into_values()
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
.collect(),
},
account_data: GlobalAccountData {
events: services()
.account_data
.changes_since(None, &sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.collect(),
},
device_lists: DeviceLists {
changed: device_list_updates.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services()
.users
.count_one_time_keys(&sender_user, &sender_device)?,
to_device: ToDevice {
events: services()
.users
.get_to_device_events(&sender_user, &sender_device)?,
},
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
};
// TODO: Retry the endpoint instead of returning (waiting for #118)
if !full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty()
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let mut duration = body.timeout.unwrap_or_default();
if duration.as_secs() > 30 {
duration = Duration::from_secs(30);
}
let _ = tokio::time::timeout(duration, watcher).await;
Ok((response, false))
} else {
Ok((response, since != next_batch)) // Only cache if we made progress
}
}
async fn process_room_presence_updates(
presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>,
room_id: &RoomId,
since: u64,
) -> Result<()> {
// Take presence updates from this room
for (user_id, _, presence_event) in services()
.rooms
.edus
.presence
.presence_since(room_id, since)
{
match presence_updates.entry(user_id) {
Entry::Vacant(slot) => {
slot.insert(presence_event);
}
Entry::Occupied(mut slot) => {
let curr_event = slot.get_mut();
let curr_content = &mut curr_event.content;
let new_content = presence_event.content;
// Update existing presence event with more info
curr_content.presence = new_content.presence;
curr_content.status_msg = new_content
.status_msg
.or_else(|| curr_content.status_msg.take());
curr_content.last_active_ago =
new_content.last_active_ago.or(curr_content.last_active_ago);
curr_content.displayname = new_content
.displayname
.or_else(|| curr_content.displayname.take());
curr_content.avatar_url = new_content
.avatar_url
.or_else(|| curr_content.avatar_url.take());
curr_content.currently_active = new_content
.or(curr_content.currently_active);
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
async fn load_joined_room(
sender_user: &UserId,
sender_device: &DeviceId,
room_id: &RoomId,
since: u64,
sincecount: PduCount,
next_batch: u64,
next_batchcount: PduCount,
lazy_load_enabled: bool,
lazy_load_send_redundant: bool,
full_state: bool,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
) -> Result<JoinedRoom> {
{
// Get and drop the lock to wait for remaining operations to finish
// This will make sure the we have all events until next_batch
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
let send_notification_counts = !timeline_pdus.is_empty()
|| services()
.rooms
.user
.last_notification_read(sender_user, room_id)?
> since;
let mut timeline_users = HashSet::new();
for (_, event) in &timeline_pdus {
timeline_users.insert(event.sender.as_str().to_owned());
}
services().rooms.lazy_loading.lazy_load_confirm_delivery(
sender_user,
sender_device,
room_id,
sincecount,
)?;
// Database queries:
let current_shortstatehash =
if let Some(s) = services().rooms.state.get_room_shortstatehash(room_id)? {
s
} else {
error!("Room {} has no state", room_id);
return Err(Error::BadDatabase("Room has no state"));
};
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(room_id, since)?;
let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
// No state changes
(Vec::new(), None, None, false, Vec::new())
} else {
// Calculates joined_member_count, invited_member_count and heroes
let calculate_counts = || {
let joined_member_count = services()
.rooms
.state_cache
.room_joined_count(room_id)?
.unwrap_or(0);
let invited_member_count = services()
.rooms
.state_cache
.room_invited_count(room_id)?
.unwrap_or(0);
// Recalculate heroes (first 5 members)
let mut heroes = Vec::new();
if joined_member_count + invited_member_count <= 5 {
// Go through all PDUs and for each member event, check if the user is still joined or
// invited until we have 5 or we reach the end
for hero in services()
.rooms
.timeline
.all_pdus(sender_user, room_id)?
.filter_map(std::result::Result::ok) // Ignore all broken pdus
.filter(|(_, pdu)| pdu.kind == TimelineEventType::RoomMember)
.map(|(_, pdu)| {
let content: RoomMemberEventContent =
serde_json::from_str(pdu.content.get()).map_err(|_| {
Error::bad_database("Invalid member event in database.")
})?;
if let Some(state_key) = &pdu.state_key {
let user_id = UserId::parse(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
// The membership was and still is invite or join
if matches!(
content.membership,
MembershipState::Join | MembershipState::Invite
) && (services()
.rooms
.state_cache
.is_joined(&user_id, room_id)?
|| services()
.rooms
.state_cache
.is_invited(&user_id, room_id)?)
{
Ok::<_, Error>(Some(state_key.clone()))
} else {
Ok(None)
}
} else {
Ok(None)
}
})
// Filter out buggy users
// Filter for possible heroes
.flatten()
{
if heroes.contains(&hero) || hero == sender_user.as_str() {
heroes.push(hero);
Ok::<_, Error>((
Some(joined_member_count),
Some(invited_member_count),
heroes,
))
};
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
services()
.rooms
.state_accessor
.state_get(
shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.transpose()
})
.transpose()?
.and_then(|pdu| {
serde_json::from_str(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
});
let joined_since_last_sync = since_sender_member
.map_or(true, |member| member.membership != MembershipState::Join);
if since_shortstatehash.is_none() || joined_since_last_sync {
// Probably since = 0, we will do an initial sync
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
let mut i = 0;
for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = services()
.rooms
.short
.get_statekey_from_short(shortstatekey)?;
if event_type != StateEventType::RoomMember {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
state_events.push(pdu);
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
} else if !lazy_load_enabled
|| full_state
|| timeline_users.contains(&state_key)
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
// This check is in case a bad user ID made it into the database
if let Ok(uid) = UserId::parse(&state_key) {
lazy_loaded.insert(uid);
}
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
// Reset lazy loading because this is an initial sync
services().rooms.lazy_loading.lazy_load_reset(
sender_user,
sender_device,
room_id,
// The state_events above should contain all timeline_users, let's mark them as lazy
// loaded.
services().rooms.lazy_loading.lazy_load_mark_sent(
sender_user,
sender_device,
room_id,
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
lazy_loaded,
next_batchcount,
);
(
heroes,
joined_member_count,
invited_member_count,
true,
state_events,
)
} else {
// Incremental /sync
let since_shortstatehash = since_shortstatehash.unwrap();
let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
if since_shortstatehash != current_shortstatehash {
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let since_state_ids = services()
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.await?;
for (key, id) in current_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
if pdu.kind == TimelineEventType::RoomMember {
match UserId::parse(
pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
}
Err(e) => error!("Invalid state key for member event: {}", e),
}
}
state_events.push(pdu);
tokio::task::yield_now().await;
}
for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) = services().rooms.state_accessor.room_state_get(
&StateEventType::RoomMember,
event.sender.as_str(),
)? {
lazy_loaded.insert(event.sender.clone());
state_events.push(member_event);
}
}
}
services().rooms.lazy_loading.lazy_load_mark_sent(
sender_user,
sender_device,
room_id,
lazy_loaded,
next_batchcount,
);
let encrypted_room = services()
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.is_some();
let since_encryption = services().rooms.state_accessor.state_get(
since_shortstatehash,
&StateEventType::RoomEncryption,
"",
)?;
// Calculations:
let new_encrypted_room = encrypted_room && since_encryption.is_none();
let send_member_count = state_events
.iter()
.any(|event| event.kind == TimelineEventType::RoomMember);
if encrypted_room {
for state_event in &state_events {
if state_event.kind != TimelineEventType::RoomMember {
if let Some(state_key) = &state_event.state_key {
let user_id = UserId::parse(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
if user_id == sender_user {
continue;
let new_membership = serde_json::from_str::<RoomMemberEventContent>(
state_event.content.get(),
)
.map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership;
match new_membership {
MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(sender_user, &user_id, room_id)? {
device_list_updates.insert(user_id);
}
}
MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id);
}
_ => {}
if joined_since_last_sync && encrypted_room || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(
services()
.rooms
.state_cache
.room_members(room_id)
.flatten()
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_user != user_id
})
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target already
!share_encrypted_room(sender_user, user_id, room_id)
.unwrap_or(false)
}),
);
}
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
calculate_counts()?
} else {
(None, None, Vec::new())
};
(
heroes,