Newer
Older
.iter()
.map(|pdu| pdu.to_sync_state_event())
.collect(),
},
ephemeral: Ephemeral {
events: edus,
},
unread_thread_notifications: BTreeMap::new(),
})
services: &Services, sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
.rooms
.timeline
.last_timeline_count(sender_user, room_id)?
> roomsincecount
{
.rooms
.timeline
.pdus_until(sender_user, room_id, PduCount::max())?
.filter_map(|r| {
// Filter out buggy events
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.take_while(|(pducount, _)| pducount > &roomsincecount);
// Take the last events for the timeline
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>();
// They /sync response doesn't always return all messages, so we say the output
// is limited unless there are events in non_timeline_pdus
fn share_encrypted_room(
services: &Services, sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId,
) -> Result<bool> {
Ok(services
.rooms
.user
.get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])?
.filter_map(Result::ok)
.filter(|room_id| room_id != ignore_room)
.filter_map(|other_room_id| {
Some(
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.any(|encrypted| encrypted))
/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
pub(crate) async fn sync_events_v4_route(
State(services): State<crate::State>, body: Ruma<sync_events::v4::Request>,
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.globals.watch(&sender_user, &sender_device);
let next_batch = services.globals.next_count()?;
let globalsince = body
.pos
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
if globalsince == 0 {
if let Some(conn_id) = &body.conn_id {
services
.users
.forget_sync_request_connection(sender_user.clone(), sender_device.clone(), conn_id.clone());
}
}
// Get sticky parameters from cache
let known_rooms =
.users
.update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body);
.rooms
.state_cache
.rooms_joined(&sender_user)
.filter_map(Result::ok)
.collect::<Vec<_>>();
if body.extensions.to_device.enabled.unwrap_or(false) {
.users
.remove_to_device_events(&sender_user, &sender_device, globalsince)?;
}
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
.users
.keys_changed(sender_user.as_ref(), globalsince, None)
.filter_map(Result::ok),
);
let Some(current_shortstatehash) = services.rooms.state.get_room_shortstatehash(room_id)? else {
error!("Room {} has no state", room_id);
continue;
};
.rooms
.user
.get_token_shortstatehash(room_id, globalsince)?;
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
.rooms
.state_accessor
.state_get(shortstatehash, &StateEventType::RoomMember, sender_user.as_str())
.transpose()
})
.transpose()?
.and_then(|pdu| {
serde_json::from_str(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
});
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.is_some();
if let Some(since_shortstatehash) = since_shortstatehash {
// Skip if there are only timeline changes
if since_shortstatehash == current_shortstatehash {
continue;
}
let since_encryption = services.rooms.state_accessor.state_get(
since_shortstatehash,
&StateEventType::RoomEncryption,
"",
)?;
let joined_since_last_sync =
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
let new_encrypted_room = encrypted_room && since_encryption.is_none();
if encrypted_room {
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.await?;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
let Some(pdu) = services.rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
let user_id = UserId::parse(state_key.clone())
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
if user_id == sender_user {
continue;
}
let new_membership =
serde_json::from_str::<RoomMemberEventContent>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership;
match new_membership {
MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(&services, &sender_user, &user_id, room_id)? {
device_list_changes.insert(user_id);
}
},
MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id);
},
_ => {},
}
}
}
}
}
if joined_since_last_sync || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_changes.extend(
.rooms
.state_cache
.room_members(room_id)
.flatten()
.filter(|user_id| {
// Don't send key updates from the sender to the sender
&sender_user != user_id
})
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target
// already
!share_encrypted_room(&services, &sender_user, user_id, room_id).unwrap_or(false)
}),
);
}
}
}
// Look for device list updates in this room
.users
.keys_changed(room_id.as_ref(), globalsince, None)
.filter_map(Result::ok),
);
}
for user_id in left_encrypted_users {
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(Result::ok)
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need
// to tell them
if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
}
let mut lists = BTreeMap::new();
let mut todo_rooms = BTreeMap::new(); // and required state
for (list_id, list) in body.lists {
if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
continue;
}
let mut new_known_rooms = BTreeSet::new();
lists.insert(
list_id.clone(),
sync_events::v4::SyncList {
ops: list
.ranges
.into_iter()
.map(|mut r| {
r.0 = r.0.clamp(
uint!(0),
UInt::try_from(all_joined_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
r.1 = r.1.clamp(
r.0,
UInt::try_from(all_joined_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
);
let room_ids = all_joined_rooms[usize_from_ruma(r.0)..=usize_from_ruma(r.1)].to_vec();
new_known_rooms.extend(room_ids.iter().cloned());
for room_id in &room_ids {
let todo_room = todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0, u64::MAX));
let limit = list
.room_details
.timeline_limit
.map_or(10, u64::from)
.min(100);
todo_room
.0
.extend(list.room_details.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get(&list_id)
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
}
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
range: Some(r),
index: None,
room_ids,
room_id: None,
}
})
.collect(),
},
);
if let Some(conn_id) = &body.conn_id {
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
list_id,
new_known_rooms,
globalsince,
);
}
}
let mut known_subscription_rooms = BTreeSet::new();
for (room_id, room) in &body.room_subscriptions {
if !services.rooms.metadata.exists(room_id)? {
let todo_room = todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0, u64::MAX));
let limit = room.timeline_limit.map_or(10, u64::from).min(100);
todo_room.0.extend(room.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
known_subscription_rooms.insert(room_id.clone());
}
for r in body.unsubscribe_rooms {
known_subscription_rooms.remove(&r);
body.room_subscriptions.remove(&r);
}
if let Some(conn_id) = &body.conn_id {
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
"subscriptions".to_owned(),
known_subscription_rooms,
globalsince,
);
}
if let Some(conn_id) = &body.conn_id {
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
body.room_subscriptions,
);
}
let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince);
let (timeline_pdus, limited) =
load_timeline(&services, &sender_user, room_id, roomsincecount, *timeline_limit)?;
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
if roomsince != &0 && timeline_pdus.is_empty() {
continue;
}
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
Ok(Some(match pdu_count {
PduCount::Backfilled(_) => {
error!("timeline in backfill state?!");
"0".to_owned()
},
PduCount::Normal(c) => c.to_string(),
}))
})?
.or_else(|| {
if roomsince != &0 {
Some(roomsince.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus
.iter()
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let required_state = required_state_request
.iter()
.rooms
.state_accessor
.room_state_get(room_id, &state.0, &state.1)
})
.filter_map(Result::ok)
.flatten()
.map(|state| state.to_sync_state_event())
.collect();
// Heroes
.rooms
.state_cache
.room_members(room_id)
.filter_map(Result::ok)
.filter(|member| member != &sender_user)
.map(|member| {
Ok::<_, Error>(
.rooms
.state_accessor
.get_member(room_id, &member)?
.map(|memberevent| {
(
memberevent
.displayname
.unwrap_or_else(|| member.to_string()),
memberevent.avatar_url,
)
}),
.filter_map(Result::ok)
.flatten()
.take(5)
.collect::<Vec<_>>();
let name = match heroes.len().cmp(&(1_usize)) {
Ordering::Greater => {
let firsts = heroes[1..]
.iter()
.map(|h| h.0.clone())
.collect::<Vec<_>>()
.join(", ");
Some(format!("{firsts} and {last}"))
},
Ordering::Equal => Some(heroes[0].0.clone()),
Ordering::Less => None,
};
let heroes_avatar = if heroes.len() == 1 {
heroes[0].1.clone()
} else {
None
};
rooms.insert(
room_id.clone(),
sync_events::v4::SlidingSyncRoom {
name: services.rooms.state_accessor.get_name(room_id)?.or(name),
avatar: if let Some(heroes_avatar) = heroes_avatar {
ruma::JsOption::Some(heroes_avatar)
} else {
match services.rooms.state_accessor.get_avatar(room_id)? {
ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url),
ruma::JsOption::Null => ruma::JsOption::Null,
ruma::JsOption::Undefined => ruma::JsOption::Undefined,
}
},
initial: Some(roomsince == &0),
is_dm: None,
invite_state: None,
unread_notifications: UnreadNotificationsCount {
highlight_count: Some(
.rooms
.user
.highlight_count(&sender_user, room_id)?
.try_into()
.expect("notification count can't go that high"),
),
notification_count: Some(
.rooms
.user
.notification_count(&sender_user, room_id)?
.try_into()
.expect("notification count can't go that high"),
),
},
timeline: room_events,
required_state,
prev_batch,
limited,
joined_count: Some(
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
.unwrap_or(0)
.try_into()
.unwrap_or_else(|_| uint!(0)),
),
num_live: None, // Count events in timeline greater than global sync counter
timestamp: None,
if rooms
.iter()
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
}
Ok(sync_events::v4::Response {
initial: globalsince == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v4::Extensions {
to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
Some(sync_events::v4::ToDevice {
.users
.get_to_device_events(&sender_user, &sender_device)?,
next_batch: next_batch.to_string(),
})
} else {
None
},
e2ee: sync_events::v4::E2EE {
device_lists: DeviceLists {
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
.users
.count_one_time_keys(&sender_user, &sender_device)?,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v4::AccountData {
global: if body.extensions.account_data.enabled.unwrap_or(false) {
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
.account_data
.changes_since(None, &sender_user, globalsince)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.collect()
} else {
Vec::new()
},
rooms: BTreeMap::new(),
},
receipts: sync_events::v4::Receipts {
rooms: BTreeMap::new(),
},
typing: sync_events::v4::Typing {
rooms: BTreeMap::new(),
},
},
delta_token: None,
})