Skip to content
Snippets Groups Projects
sync.rs 44.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	Ok(JoinedRoom {
    		account_data: RoomAccountData {
    			events: services()
    				.account_data
    				.changes_since(Some(room_id), sender_user, since)?
    				.into_iter()
    				.filter_map(|(_, v)| {
    					serde_json::from_str(v.json().get())
    						.map_err(|_| Error::bad_database("Invalid account event in database."))
    						.ok()
    				})
    				.collect(),
    		},
    		summary: RoomSummary {
    			heroes,
    			joined_member_count: joined_member_count.map(|n| (n as u32).into()),
    			invited_member_count: invited_member_count.map(|n| (n as u32).into()),
    		},
    		unread_notifications: UnreadNotificationsCount {
    			highlight_count,
    			notification_count,
    		},
    		timeline: Timeline {
    			limited: limited || joined_since_last_sync,
    			prev_batch,
    			events: room_events,
    		},
    		state: State {
    
    🥺's avatar
    🥺 committed
    			events: state_events
    				.iter()
    				.map(|pdu| pdu.to_sync_state_event())
    				.collect(),
    
    		},
    		ephemeral: Ephemeral {
    			events: edus,
    		},
    		unread_thread_notifications: BTreeMap::new(),
    	})
    
    	sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64,
    
    ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
    
    	let timeline_pdus;
    	let limited;
    
    🥺's avatar
    🥺 committed
    	if services()
    		.rooms
    		.timeline
    		.last_timeline_count(sender_user, room_id)?
    		> roomsincecount
    	{
    
    		let mut non_timeline_pdus = services()
    			.rooms
    			.timeline
    			.pdus_until(sender_user, room_id, PduCount::max())?
    			.filter_map(|r| {
    				// Filter out buggy events
    				if r.is_err() {
    					error!("Bad pdu in pdus_since: {:?}", r);
    				}
    				r.ok()
    			})
    			.take_while(|(pducount, _)| pducount > &roomsincecount);
    
    		// Take the last events for the timeline
    
    🥺's avatar
    🥺 committed
    		timeline_pdus = non_timeline_pdus
    			.by_ref()
    			.take(limit as usize)
    			.collect::<Vec<_>>()
    			.into_iter()
    			.rev()
    			.collect::<Vec<_>>();
    
    
    		// They /sync response doesn't always return all messages, so we say the output
    		// is limited unless there are events in non_timeline_pdus
    		limited = non_timeline_pdus.next().is_some();
    	} else {
    		timeline_pdus = Vec::new();
    		limited = false;
    	}
    	Ok((timeline_pdus, limited))
    
    fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId) -> Result<bool> {
    	Ok(services()
    		.rooms
    		.user
    		.get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])?
    
    		.filter(|room_id| room_id != ignore_room)
    		.filter_map(|other_room_id| {
    			Some(
    				services()
    					.rooms
    					.state_accessor
    					.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
    					.ok()?
    					.is_some(),
    			)
    		})
    		.any(|encrypted| encrypted))
    
    /// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
    ///
    /// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
    
    pub(crate) async fn sync_events_v4_route(
    
    	body: Ruma<sync_events::v4::Request>,
    
    ) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
    
    	let sender_user = body.sender_user.expect("user is authenticated");
    	let sender_device = body.sender_device.expect("user is authenticated");
    	let mut body = body.body;
    	// Setup watchers, so if there's no response, we can wait for them
    	let watcher = services().globals.watch(&sender_user, &sender_device);
    
    	let next_batch = services().globals.next_count()?;
    
    
    🥺's avatar
    🥺 committed
    	let globalsince = body
    		.pos
    		.as_ref()
    		.and_then(|string| string.parse().ok())
    		.unwrap_or(0);
    
    
    	if globalsince == 0 {
    		if let Some(conn_id) = &body.conn_id {
    			services().users.forget_sync_request_connection(
    				sender_user.clone(),
    				sender_device.clone(),
    				conn_id.clone(),
    			);
    		}
    	}
    
    	// Get sticky parameters from cache
    	let known_rooms =
    
    🥺's avatar
    🥺 committed
    		services()
    			.users
    			.update_sync_request_with_cache(sender_user.clone(), sender_device.clone(), &mut body);
    
    🥺's avatar
    🥺 committed
    	let all_joined_rooms = services()
    		.rooms
    		.state_cache
    		.rooms_joined(&sender_user)
    		.filter_map(Result::ok)
    		.collect::<Vec<_>>();
    
    
    	if body.extensions.to_device.enabled.unwrap_or(false) {
    
    🥺's avatar
    🥺 committed
    		services()
    			.users
    			.remove_to_device_events(&sender_user, &sender_device, globalsince)?;
    
    	}
    
    	let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
    	let mut device_list_changes = HashSet::new();
    	let mut device_list_left = HashSet::new();
    
    	if body.extensions.e2ee.enabled.unwrap_or(false) {
    		// Look for device list updates of this account
    
    🥺's avatar
    🥺 committed
    		device_list_changes.extend(
    			services()
    				.users
    				.keys_changed(sender_user.as_ref(), globalsince, None)
    				.filter_map(Result::ok),
    		);
    
    
    		for room_id in &all_joined_rooms {
    
    			let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? else {
    
    				error!("Room {} has no state", room_id);
    				continue;
    			};
    
    
    🥺's avatar
    🥺 committed
    			let since_shortstatehash = services()
    				.rooms
    				.user
    				.get_token_shortstatehash(room_id, globalsince)?;
    
    
    			let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
    				.and_then(|shortstatehash| {
    					services()
    						.rooms
    						.state_accessor
    						.state_get(shortstatehash, &StateEventType::RoomMember, sender_user.as_str())
    						.transpose()
    				})
    				.transpose()?
    				.and_then(|pdu| {
    					serde_json::from_str(pdu.content.get())
    						.map_err(|_| Error::bad_database("Invalid PDU in database."))
    						.ok()
    				});
    
    			let encrypted_room = services()
    				.rooms
    				.state_accessor
    				.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
    				.is_some();
    
    			if let Some(since_shortstatehash) = since_shortstatehash {
    				// Skip if there are only timeline changes
    				if since_shortstatehash == current_shortstatehash {
    					continue;
    				}
    
    				let since_encryption = services().rooms.state_accessor.state_get(
    					since_shortstatehash,
    					&StateEventType::RoomEncryption,
    					"",
    				)?;
    
    				let joined_since_last_sync =
    					since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
    
    				let new_encrypted_room = encrypted_room && since_encryption.is_none();
    				if encrypted_room {
    
    🥺's avatar
    🥺 committed
    					let current_state_ids = services()
    						.rooms
    						.state_accessor
    						.state_full_ids(current_shortstatehash)
    						.await?;
    					let since_state_ids = services()
    						.rooms
    						.state_accessor
    						.state_full_ids(since_shortstatehash)
    						.await?;
    
    
    					for (key, id) in current_state_ids {
    						if since_state_ids.get(&key) != Some(&id) {
    
    							let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
    								error!("Pdu in state not found: {}", id);
    								continue;
    
    							};
    							if pdu.kind == TimelineEventType::RoomMember {
    								if let Some(state_key) = &pdu.state_key {
    									let user_id = UserId::parse(state_key.clone())
    										.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
    
    									if user_id == sender_user {
    										continue;
    									}
    
    									let new_membership =
    										serde_json::from_str::<RoomMemberEventContent>(pdu.content.get())
    											.map_err(|_| Error::bad_database("Invalid PDU in database."))?
    											.membership;
    
    									match new_membership {
    										MembershipState::Join => {
    											// A new user joined an encrypted room
    											if !share_encrypted_room(&sender_user, &user_id, room_id)? {
    												device_list_changes.insert(user_id);
    											}
    										},
    										MembershipState::Leave => {
    											// Write down users that have left encrypted rooms we are in
    											left_encrypted_users.insert(user_id);
    										},
    										_ => {},
    									}
    								}
    							}
    						}
    					}
    					if joined_since_last_sync || new_encrypted_room {
    						// If the user is in a new encrypted room, give them all joined users
    						device_list_changes.extend(
    							services()
    								.rooms
    								.state_cache
    								.room_members(room_id)
    								.flatten()
    								.filter(|user_id| {
    									// Don't send key updates from the sender to the sender
    									&sender_user != user_id
    								})
    								.filter(|user_id| {
    									// Only send keys if the sender doesn't share an encrypted room with the target
    									// already
    									!share_encrypted_room(&sender_user, user_id, room_id).unwrap_or(false)
    								}),
    						);
    					}
    				}
    			}
    			// Look for device list updates in this room
    
    🥺's avatar
    🥺 committed
    			device_list_changes.extend(
    				services()
    					.users
    					.keys_changed(room_id.as_ref(), globalsince, None)
    					.filter_map(Result::ok),
    			);
    
    		}
    		for user_id in left_encrypted_users {
    			let dont_share_encrypted_room = services()
    				.rooms
    				.user
    				.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
    
    				.filter_map(|other_room_id| {
    					Some(
    						services()
    							.rooms
    							.state_accessor
    							.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
    							.ok()?
    							.is_some(),
    					)
    				})
    				.all(|encrypted| !encrypted);
    			// If the user doesn't share an encrypted room with the target anymore, we need
    			// to tell them
    			if dont_share_encrypted_room {
    				device_list_left.insert(user_id);
    			}
    		}
    	}
    
    	let mut lists = BTreeMap::new();
    	let mut todo_rooms = BTreeMap::new(); // and required state
    
    	for (list_id, list) in body.lists {
    		if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
    			continue;
    		}
    
    		let mut new_known_rooms = BTreeSet::new();
    
    		lists.insert(
    			list_id.clone(),
    			sync_events::v4::SyncList {
    				ops: list
    					.ranges
    					.into_iter()
    					.map(|mut r| {
    
    						r.0 = r.0.clamp(
    							uint!(0),
    							UInt::try_from(all_joined_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
    						);
    						r.1 = r.1.clamp(
    							r.0,
    							UInt::try_from(all_joined_rooms.len().saturating_sub(1)).unwrap_or(UInt::MAX),
    						);
    
    						let room_ids = all_joined_rooms[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)].to_vec();
    						new_known_rooms.extend(room_ids.iter().cloned());
    						for room_id in &room_ids {
    
    🥺's avatar
    🥺 committed
    							let todo_room = todo_rooms
    								.entry(room_id.clone())
    								.or_insert((BTreeSet::new(), 0, u64::MAX));
    							let limit = list
    								.room_details
    								.timeline_limit
    								.map_or(10, u64::from)
    								.min(100);
    							todo_room
    								.0
    								.extend(list.room_details.required_state.iter().cloned());
    
    							todo_room.1 = todo_room.1.max(limit);
    							// 0 means unknown because it got out of date
    
    🥺's avatar
    🥺 committed
    							todo_room.2 = todo_room.2.min(
    								known_rooms
    									.get(&list_id)
    									.and_then(|k| k.get(room_id))
    									.copied()
    									.unwrap_or(0),
    							);
    
    						}
    						sync_events::v4::SyncOp {
    							op: SlidingOp::Sync,
    							range: Some(r),
    							index: None,
    							room_ids,
    							room_id: None,
    						}
    					})
    					.collect(),
    				count: UInt::from(all_joined_rooms.len() as u32),
    			},
    		);
    
    		if let Some(conn_id) = &body.conn_id {
    			services().users.update_sync_known_rooms(
    				sender_user.clone(),
    				sender_device.clone(),
    				conn_id.clone(),
    				list_id,
    				new_known_rooms,
    				globalsince,
    			);
    		}
    	}
    
    	let mut known_subscription_rooms = BTreeSet::new();
    	for (room_id, room) in &body.room_subscriptions {
    		if !services().rooms.metadata.exists(room_id)? {
    			continue;
    		}
    
    🥺's avatar
    🥺 committed
    		let todo_room = todo_rooms
    			.entry(room_id.clone())
    			.or_insert((BTreeSet::new(), 0, u64::MAX));
    
    		let limit = room.timeline_limit.map_or(10, u64::from).min(100);
    		todo_room.0.extend(room.required_state.iter().cloned());
    		todo_room.1 = todo_room.1.max(limit);
    		// 0 means unknown because it got out of date
    
    🥺's avatar
    🥺 committed
    		todo_room.2 = todo_room.2.min(
    			known_rooms
    				.get("subscriptions")
    				.and_then(|k| k.get(room_id))
    				.copied()
    				.unwrap_or(0),
    		);
    
    		known_subscription_rooms.insert(room_id.clone());
    	}
    
    	for r in body.unsubscribe_rooms {
    		known_subscription_rooms.remove(&r);
    		body.room_subscriptions.remove(&r);
    	}
    
    	if let Some(conn_id) = &body.conn_id {
    		services().users.update_sync_known_rooms(
    			sender_user.clone(),
    			sender_device.clone(),
    			conn_id.clone(),
    			"subscriptions".to_owned(),
    			known_subscription_rooms,
    			globalsince,
    		);
    	}
    
    	if let Some(conn_id) = &body.conn_id {
    		services().users.update_sync_subscriptions(
    			sender_user.clone(),
    			sender_device.clone(),
    			conn_id.clone(),
    			body.room_subscriptions,
    		);
    	}
    
    	let mut rooms = BTreeMap::new();
    	for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
    		let roomsincecount = PduCount::Normal(*roomsince);
    
    		let (timeline_pdus, limited) = load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit)?;
    
    		if roomsince != &0 && timeline_pdus.is_empty() {
    			continue;
    		}
    
    		let prev_batch = timeline_pdus
    			.first()
    			.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
    				Ok(Some(match pdu_count {
    					PduCount::Backfilled(_) => {
    						error!("timeline in backfill state?!");
    						"0".to_owned()
    					},
    					PduCount::Normal(c) => c.to_string(),
    				}))
    			})?
    			.or_else(|| {
    				if roomsince != &0 {
    					Some(roomsince.to_string())
    				} else {
    					None
    				}
    			});
    
    
    🥺's avatar
    🥺 committed
    		let room_events: Vec<_> = timeline_pdus
    			.iter()
    			.map(|(_, pdu)| pdu.to_sync_room_event())
    			.collect();
    
    
    		let required_state = required_state_request
    			.iter()
    
    🥺's avatar
    🥺 committed
    			.map(|state| {
    				services()
    					.rooms
    					.state_accessor
    					.room_state_get(room_id, &state.0, &state.1)
    			})
    
    			.flatten()
    			.map(|state| state.to_sync_state_event())
    			.collect();
    
    		// Heroes
    		let heroes = services()
    			.rooms
    			.state_cache
    			.room_members(room_id)
    
    			.filter(|member| member != &sender_user)
    			.map(|member| {
    				Ok::<_, Error>(
    
    🥺's avatar
    🥺 committed
    					services()
    						.rooms
    						.state_accessor
    						.get_member(room_id, &member)?
    						.map(|memberevent| {
    							(
    								memberevent
    									.displayname
    									.unwrap_or_else(|| member.to_string()),
    								memberevent.avatar_url,
    							)
    						}),
    
    			.flatten()
    			.take(5)
    			.collect::<Vec<_>>();
    		let name = match heroes.len().cmp(&(1_usize)) {
    			Ordering::Greater => {
    
    				let firsts = heroes[1..]
    					.iter()
    					.map(|h| h.0.clone())
    					.collect::<Vec<_>>()
    					.join(", ");
    
    				let last = heroes[0].0.clone();
    
    				Some(format!("{firsts} and {last}"))
    
    			},
    			Ordering::Equal => Some(heroes[0].0.clone()),
    			Ordering::Less => None,
    		};
    
    		let heroes_avatar = if heroes.len() == 1 {
    			heroes[0].1.clone()
    		} else {
    			None
    		};
    
    		rooms.insert(
    			room_id.clone(),
    			sync_events::v4::SlidingSyncRoom {
    				name: services().rooms.state_accessor.get_name(room_id)?.or(name),
    				avatar: if let Some(heroes_avatar) = heroes_avatar {
    					ruma::JsOption::Some(heroes_avatar)
    				} else {
    					match services().rooms.state_accessor.get_avatar(room_id)? {
    
    🥺's avatar
    🥺 committed
    						ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url),
    
    						ruma::JsOption::Null => ruma::JsOption::Null,
    						ruma::JsOption::Undefined => ruma::JsOption::Undefined,
    					}
    				},
    				initial: Some(roomsince == &0),
    				is_dm: None,
    				invite_state: None,
    				unread_notifications: UnreadNotificationsCount {
    					highlight_count: Some(
    						services()
    							.rooms
    							.user
    							.highlight_count(&sender_user, room_id)?
    							.try_into()
    							.expect("notification count can't go that high"),
    					),
    					notification_count: Some(
    						services()
    							.rooms
    							.user
    							.notification_count(&sender_user, room_id)?
    							.try_into()
    							.expect("notification count can't go that high"),
    					),
    				},
    				timeline: room_events,
    				required_state,
    				prev_batch,
    				limited,
    				joined_count: Some(
    
    🥺's avatar
    🥺 committed
    					(services()
    						.rooms
    						.state_cache
    						.room_joined_count(room_id)?
    						.unwrap_or(0) as u32)
    						.into(),
    
    				),
    				invited_count: Some(
    
    🥺's avatar
    🥺 committed
    					(services()
    						.rooms
    						.state_cache
    						.room_invited_count(room_id)?
    						.unwrap_or(0) as u32)
    						.into(),
    
    				),
    				num_live: None, // Count events in timeline greater than global sync counter
    				timestamp: None,
    			},
    		);
    	}
    
    
    🥺's avatar
    🥺 committed
    	if rooms
    		.iter()
    		.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
    	{
    
    		// Hang a few seconds so requests are not spammed
    		// Stop hanging if new info arrives
    		let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
    		if duration.as_secs() > 30 {
    			duration = Duration::from_secs(30);
    		}
    
    		#[allow(clippy::let_underscore_must_use)]
    		{
    			_ = tokio::time::timeout(duration, watcher).await;
    		}
    
    	}
    
    	Ok(sync_events::v4::Response {
    		initial: globalsince == 0,
    		txn_id: body.txn_id.clone(),
    		pos: next_batch.to_string(),
    		lists,
    		rooms,
    		extensions: sync_events::v4::Extensions {
    			to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
    				Some(sync_events::v4::ToDevice {
    
    🥺's avatar
    🥺 committed
    					events: services()
    						.users
    						.get_to_device_events(&sender_user, &sender_device)?,
    
    					next_batch: next_batch.to_string(),
    				})
    			} else {
    				None
    			},
    			e2ee: sync_events::v4::E2EE {
    				device_lists: DeviceLists {
    					changed: device_list_changes.into_iter().collect(),
    					left: device_list_left.into_iter().collect(),
    				},
    
    🥺's avatar
    🥺 committed
    				device_one_time_keys_count: services()
    					.users
    					.count_one_time_keys(&sender_user, &sender_device)?,
    
    				// Fallback keys are not yet supported
    				device_unused_fallback_key_types: None,
    			},
    			account_data: sync_events::v4::AccountData {
    				global: if body.extensions.account_data.enabled.unwrap_or(false) {
    					services()
    						.account_data
    						.changes_since(None, &sender_user, globalsince)?
    						.into_iter()
    						.filter_map(|(_, v)| {
    							serde_json::from_str(v.json().get())
    								.map_err(|_| Error::bad_database("Invalid account event in database."))
    								.ok()
    						})
    						.collect()
    				} else {
    					Vec::new()
    				},
    				rooms: BTreeMap::new(),
    			},
    			receipts: sync_events::v4::Receipts {
    				rooms: BTreeMap::new(),
    			},
    			typing: sync_events::v4::Typing {
    				rooms: BTreeMap::new(),
    			},
    		},
    		delta_token: None,
    	})