Newer
Older
use super::State;
use crate::{ConduitResult, Database, Error, Ruma};
use ruma::{
api::client::r0::sync::sync_events,
events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType},
Raw, RoomId, UserId,
};
#[cfg(feature = "conduit_bin")]
use rocket::{get, tokio};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet},
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
///
/// - This endpoint takes a `since` parameter which should be the `next_batch` value from a
/// previous request.
/// - Calling this endpoint without a `since` parameter will return all recent events, the state
/// of all rooms and more data. This should only be called on the initial login of the device.
/// - To get incremental updates, you can call this endpoint with a `since` parameter. This will
/// return all recent events, state updates and more data that happened since the last /sync
/// request.
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/sync", data = "<body>")
)]
pub async fn sync_events_route(
db: State<'_, Database>,
) -> ConduitResult<sync_events::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let device_id = body.device_id.as_ref().expect("user is authenticated");
// TODO: match body.set_presence {
db.rooms.edus.ping_presence(&sender_id)?;
// Setup watchers, so if there's no response, we can wait for them
let watcher = db.watch(sender_id, device_id);
let next_batch = db.globals.current_count()?.to_string();
let mut joined_rooms = BTreeMap::new();
let since = body
.since
.clone()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let mut presence_updates = HashMap::new();
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_updates = HashSet::new();
// Look for device list updates of this account
device_list_updates.extend(
db.users
.keys_changed(&sender_id.to_string(), since, None)
.filter_map(|r| r.ok()),
);
for room_id in db.rooms.rooms_joined(&sender_id) {
let room_id = room_id?;
let mut non_timeline_pdus = db
.rooms
.pdus_since(&sender_id, &room_id, since)?
.filter_map(|r| r.ok()); // Filter out buggy events
// Take the last 10 events for the timeline
let timeline_pdus = non_timeline_pdus
.by_ref()
.rev()
.take(10)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>();
let send_notification_counts = !timeline_pdus.is_empty()
|| db
.rooms
.edus
.last_privateread_update(&sender_id, &room_id)?
> since;
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are events in non_timeline_pdus
for (_, pdu) in non_timeline_pdus {
if pdu.state_key.is_some() {
state_pdus.push(pdu);
}
let encrypted_room = db
.rooms
.room_state_get(&room_id, &EventType::RoomEncryption, "")?
.is_some();
// These type is Option<Option<_>>. The outer Option is None when there is no event between
// since and the current room state, meaning there should be no updates.
// The inner Option is None when there is an event, but there is no state hash associated
// with it. This can happen for the RoomCreate event, so all updates should arrive.
let first_pdu_after_since = db.rooms.pdus_after(sender_id, &room_id, since).next();
let since_state_hash = first_pdu_after_since
.as_ref()
.map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?);
let since_members = since_state_hash.as_ref().map(|state_hash| {
state_hash.as_ref().and_then(|state_hash| {
db.rooms
.state_type(&state_hash, &EventType::RoomMember)
.ok()
})
});
let since_encryption = since_state_hash.as_ref().map(|state_hash| {
state_hash.as_ref().and_then(|state_hash| {
db.rooms
.state_get(&state_hash, &EventType::RoomEncryption, "")
.ok()
})
});
let current_members = db.rooms.room_state_type(&room_id, &EventType::RoomMember)?;
// Calculations:
let new_encrypted_room =
encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none());
let send_member_count = since_members.as_ref().map_or(false, |since_members| {
since_members.as_ref().map_or(true, |since_members| {
current_members.len() != since_members.len()
})
let since_sender_member = since_members.as_ref().map(|since_members| {
since_members.as_ref().and_then(|members| {
members.get(sender_id.as_str()).and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
pdu.content.clone(),
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
})
})
});
if encrypted_room {
for (user_id, current_member) in current_members {
let current_membership = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(current_member.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership;
let since_membership =
since_members
.as_ref()
.map_or(MembershipState::Join, |members| {
members
.as_ref()
.and_then(|members| {
members.get(&user_id).and_then(|since_member| {
serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(
since_member.content.clone()
)
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| {
Error::bad_database("Invalid PDU in database.")
})
.ok()
})
})
.map_or(MembershipState::Leave, |member| member.membership)
});
let user_id = UserId::try_from(user_id)
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
match (since_membership, current_membership) {
(MembershipState::Leave, MembershipState::Join) => {
// A new user joined an encrypted room
if !share_encrypted_room(&db, &sender_id, &user_id, &room_id) {
device_list_updates.insert(user_id);
}
}
(MembershipState::Join, MembershipState::Leave) => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id);
let joined_since_last_sync = since_sender_member.map_or(false, |member| {
member.map_or(true, |member| member.membership != MembershipState::Join)
});
if joined_since_last_sync && encrypted_room || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(
db.rooms
.room_members(&room_id)
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_id != user_id
})
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target already
!share_encrypted_room(&db, sender_id, user_id, &room_id)
}),
);
}
// Look for device list updates in this room
device_list_updates.extend(
db.users
.keys_changed(&room_id.to_string(), since, None)
.filter_map(|r| r.ok()),
);
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
let joined_member_count = db.rooms.room_members(&room_id).count();
let invited_member_count = db.rooms.room_members_invited(&room_id).count();
// Recalculate heroes (first 5 members)
let mut heroes = Vec::new();
if joined_member_count + invited_member_count <= 5 {
// Go through all PDUs and for each member event, check if the user is still joined or
// invited until we have 5 or we reach the end
for hero in db
.rooms
.all_pdus(&sender_id, &room_id)?
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
.filter(|(_, pdu)| pdu.kind == EventType::RoomMember)
.map(|(_, pdu)| {
let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
if let Some(state_key) = &pdu.state_key {
let user_id = UserId::try_from(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
// The membership was and still is invite or join
if matches!(
content.membership,
MembershipState::Join | MembershipState::Invite
) && (db.rooms.is_joined(&user_id, &room_id)?
|| db.rooms.is_invited(&user_id, &room_id)?)
{
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
Ok::<_, Error>(Some(state_key.clone()))
} else {
Ok(None)
}
} else {
Ok(None)
}
})
.filter_map(|u| u.ok()) // Filter out buggy users
// Filter for possible heroes
.filter_map(|u| u)
{
if heroes.contains(&hero) || hero == sender_id.as_str() {
continue;
}
heroes.push(hero);
}
}
(
Some(joined_member_count),
Some(invited_member_count),
heroes,
)
} else {
(None, None, Vec::new())
};
let notification_count = if send_notification_counts {
if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_id)? {
Some(
(db.rooms
.pdus_since(&sender_id, &room_id, last_read)?
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
matches!(
pdu.kind.clone(),
EventType::RoomMessage | EventType::RoomEncrypted
)
})
.count() as u32)
.into(),
)
} else {
None
}
} else {
None
};
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_id, _)| {
Ok(Some(db.rooms.pdu_count(pdu_id)?.to_string()))
})?;
let room_events = timeline_pdus
.into_iter()
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect::<Vec<_>>();
let mut edus = db
.rooms
.edus
.readreceipts_since(&room_id, since)?
.filter_map(|r| r.ok()) // Filter out buggy events
.collect::<Vec<_>>();
if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since {
edus.push(
serde_json::from_str(
&serde_json::to_string(&AnySyncEphemeralRoomEvent::Typing(
db.rooms.edus.typings_all(&room_id)?,
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
))
.expect("event is valid, we just created it"),
)
.expect("event is valid, we just created it"),
);
}
let joined_room = sync_events::JoinedRoom {
account_data: sync_events::AccountData {
events: db
.account_data
.changes_since(Some(&room_id), &sender_id, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.collect::<Vec<_>>(),
},
summary: sync_events::RoomSummary {
heroes,
joined_member_count: joined_member_count.map(|n| (n as u32).into()),
invited_member_count: invited_member_count.map(|n| (n as u32).into()),
},
unread_notifications: sync_events::UnreadNotificationsCount {
highlight_count: None,
notification_count,
},
timeline: sync_events::Timeline {
limited: limited || joined_since_last_sync,
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
prev_batch,
events: room_events,
},
// TODO: state before timeline
state: sync_events::State {
events: if joined_since_last_sync {
db.rooms
.room_state_full(&room_id)?
.into_iter()
.map(|(_, pdu)| pdu.to_sync_state_event())
.collect()
} else {
Vec::new()
},
},
ephemeral: sync_events::Ephemeral { events: edus },
};
if !joined_room.is_empty() {
joined_rooms.insert(room_id.clone(), joined_room);
}
// Take presence updates from this room
for (user_id, presence) in
db.rooms
.edus
.presence_since(&room_id, since, &db.rooms, &db.globals)?
{
match presence_updates.entry(user_id) {
hash_map::Entry::Vacant(v) => {
v.insert(presence);
}
hash_map::Entry::Occupied(mut o) => {
let p = o.get_mut();
// Update existing presence event with more info
p.content.presence = presence.content.presence;
if let Some(status_msg) = presence.content.status_msg {
p.content.status_msg = Some(status_msg);
}
if let Some(last_active_ago) = presence.content.last_active_ago {
p.content.last_active_ago = Some(last_active_ago);
}
if let Some(displayname) = presence.content.displayname {
p.content.displayname = Some(displayname);
}
if let Some(avatar_url) = presence.content.avatar_url {
p.content.avatar_url = Some(avatar_url);
}
if let Some(currently_active) = presence.content.currently_active {
p.content.currently_active = Some(currently_active);
}
}
}
}
}
let mut left_rooms = BTreeMap::new();
for room_id in db.rooms.rooms_left(&sender_id) {
let room_id = room_id?;
let pdus = db.rooms.pdus_since(&sender_id, &room_id, since)?;
let room_events = pdus
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let left_room = sync_events::LeftRoom {
account_data: sync_events::AccountData { events: Vec::new() },
timeline: sync_events::Timeline {
limited: false,
prev_batch: Some(next_batch.clone()),
events: room_events,
},
state: sync_events::State { events: Vec::new() },
};
let since_member = db
.rooms
.next()
.and_then(|pdu| pdu.ok())
.and_then(|pdu| {
db.rooms
.pdu_state_hash(&pdu.0)
.ok()?
.ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash."))
.ok()
})
.and_then(|state_hash| {
db.rooms
.state_get(&state_hash, &EventType::RoomMember, sender_id.as_str())
.ok()?
.ok_or_else(|| Error::bad_database("State hash in db doesn't have a state."))
.ok()
})
.and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
});
let left_since_last_sync =
since_member.map_or(false, |member| member.membership == MembershipState::Join);
if left_since_last_sync {
device_list_left.extend(
db.rooms
.room_members(&room_id)
.filter_map(|user_id| Some(user_id.ok()?))
.filter(|user_id| {
// Don't send key updates from the sender to the sender
sender_id != user_id
})
.filter(|user_id| {
// Only send if the sender doesn't share any encrypted room with the target
// anymore
!share_encrypted_room(&db, sender_id, user_id, &room_id)
}),
);
}
if !left_room.is_empty() {
left_rooms.insert(room_id.clone(), left_room);
}
}
let mut invited_rooms = BTreeMap::new();
for room_id in db.rooms.rooms_invited(&sender_id) {
let room_id = room_id?;
let mut invited_since_last_sync = false;
if pdu.kind == EventType::RoomMember && pdu.state_key == Some(sender_id.to_string()) {
let content = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(pdu.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))?;
if content.membership == MembershipState::Invite {
invited_since_last_sync = true;
break;
}
}
}
if !invited_since_last_sync {
continue;
}
let invited_room = sync_events::InvitedRoom {
invite_state: sync_events::InviteState {
events: db
.rooms
.room_state_full(&room_id)?
.into_iter()
.map(|(_, pdu)| pdu.to_stripped_state_event())
.collect(),
},
};
if !invited_room.is_empty() {
invited_rooms.insert(room_id.clone(), invited_room);
}
}
let still_share_encrypted_room = db
.rooms
.get_shared_rooms(vec![sender_id.clone(), user_id.clone()])
.filter_map(|r| r.ok())
.filter_map(|other_room_id| {
Some(
db.rooms
.room_state_get(&other_room_id, &EventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if still_share_encrypted_room {
device_list_left.insert(user_id);
}
}
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
// Remove all to-device events the device received *last time*
db.users
.remove_to_device_events(sender_id, device_id, since)?;
let response = sync_events::Response {
next_batch,
rooms: sync_events::Rooms {
leave: left_rooms,
join: joined_rooms,
invite: invited_rooms,
},
presence: sync_events::Presence {
events: presence_updates
.into_iter()
.map(|(_, v)| Raw::from(v))
.collect(),
},
account_data: sync_events::AccountData {
events: db
.account_data
.changes_since(None, &sender_id, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.collect::<Vec<_>>(),
},
device_lists: sync_events::DeviceLists {
changed: device_list_updates.into_iter().collect(),
left: device_list_left.into_iter().collect(),
device_one_time_keys_count: if db.users.last_one_time_keys_update(sender_id)? > since
|| since == 0
{
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
db.users.count_one_time_keys(sender_id, device_id)?
} else {
BTreeMap::new()
},
to_device: sync_events::ToDevice {
events: db.users.get_to_device_events(sender_id, device_id)?,
},
};
// TODO: Retry the endpoint instead of returning (waiting for #118)
if !body.full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.device_one_time_keys_count.is_empty()
&& response.to_device.is_empty()
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let mut duration = body.timeout.unwrap_or_default();
if duration.as_secs() > 30 {
duration = Duration::from_secs(30);
}
let mut delay = tokio::time::delay_for(duration);
tokio::select! {
_ = &mut delay => {}
_ = watcher => {}
}
}
Ok(response.into())
}
fn share_encrypted_room(
db: &Database,
sender_id: &UserId,
user_id: &UserId,
ignore_room: &RoomId,
) -> bool {
db.rooms
.get_shared_rooms(vec![sender_id.clone(), user_id.clone()])
.filter_map(|r| r.ok())
.filter(|room_id| room_id != ignore_room)
.filter_map(|other_room_id| {
Some(
db.rooms
.room_state_get(&other_room_id, &EventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.any(|encrypted| encrypted)
}