use std::{ cmp, collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, time::Instant, }; use base64::{engine::general_purpose, Engine as _}; use conduit::{debug, error, utils::math::continue_exponential_backoff_secs, warn}; use federation::transactions::send_transaction_message; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use ruma::{ api::federation::{ self, transactions::edu::{ DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap, }, }, device_id, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; #[derive(Debug)] enum TransactionStatus { Running, Failed(u32, Instant), // number of times failed, time of last failure Retrying(u32), // number of times failed } type SendingError = (Destination, Error); type SendingResult = Result<Destination, SendingError>; type SendingFuture<'a> = BoxFuture<'a, SendingResult>; type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>; type CurTransactionStatus = HashMap<Destination, TransactionStatus>; const DEQUEUE_LIMIT: usize = 48; const SELECT_EDU_LIMIT: usize = 16; impl Service { pub async fn start_handler(self: &Arc<Self>) { let self_ = Arc::clone(self); let handle = services().server.runtime().spawn(async move { self_ .handler() .await .expect("Failed to start sending handler"); }); _ = self.handler_join.lock().await.insert(handle); } #[tracing::instrument(skip_all, name = "sender")] async fn handler(&self) -> Result<()> { let receiver = self.receiver.lock().await; let mut futures: SendingFutures<'_> = FuturesUnordered::new(); let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); self.initial_transactions(&futures, &mut statuses); loop { debug_assert!(!receiver.is_closed(), "channel error"); tokio::select! { request = receiver.recv_async() => match request { Ok(request) => self.handle_request(request, &futures, &mut statuses), Err(_) => return Ok(()), }, Some(response) = futures.next() => { self.handle_response(response, &mut futures, &mut statuses); }, } } } fn handle_response( &self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { match response { Ok(dest) => self.handle_response_ok(&dest, futures, statuses), Err((dest, e)) => Self::handle_response_err(dest, futures, statuses, &e), }; } fn handle_response_err( dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, ) { debug!(dest = ?dest, "{e:?}"); statuses.entry(dest).and_modify(|e| { *e = match e { TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n + 1, Instant::now()), TransactionStatus::Failed(..) => panic!("Request that was not even running failed?!"), } }); } fn handle_response_ok( &self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { let _cork = services().db.cork(); self.db .delete_all_active_requests_for(dest) .expect("all active requests deleted"); // Find events that have been added since starting the last request let new_events = self .db .queued_requests(dest) .filter_map(Result::ok) .take(DEQUEUE_LIMIT) .collect::<Vec<_>>(); // Insert any pdus we found if !new_events.is_empty() { self.db .mark_as_active(&new_events) .expect("marked as active"); let new_events_vec = new_events.into_iter().map(|(event, _)| event).collect(); futures.push(Box::pin(send_events(dest.clone(), new_events_vec))); } else { statuses.remove(dest); } } fn handle_request(&self, msg: Msg, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { let iv = vec![(msg.event, msg.queue_id)]; if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) { if !events.is_empty() { futures.push(Box::pin(send_events(msg.dest, events))); } else { statuses.remove(&msg.dest); } } } fn initial_transactions(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) { let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new(); for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { let entry = txns.entry(dest.clone()).or_default(); if self.startup_netburst_keep >= 0 && entry.len() >= keep { warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key)); self.db .delete_active_request(&key) .expect("active request deleted"); } else { entry.push(event); } } for (dest, events) in txns { if self.startup_netburst && !events.is_empty() { statuses.insert(dest.clone(), TransactionStatus::Running); futures.push(Box::pin(send_events(dest.clone(), events))); } } } #[tracing::instrument(skip_all)] fn select_events( &self, dest: &Destination, new_events: Vec<(SendingEvent, Vec<u8>)>, // Events we want to send: event and full key statuses: &mut CurTransactionStatus, ) -> Result<Option<Vec<SendingEvent>>> { let (allow, retry) = self.select_events_current(dest.clone(), statuses)?; // Nothing can be done for this remote, bail out. if !allow { return Ok(None); } let _cork = services().db.cork(); let mut events = Vec::new(); // Must retry any previous transaction for this remote. if retry { self.db .active_requests_for(dest) .filter_map(Result::ok) .for_each(|(_, e)| events.push(e)); return Ok(Some(events)); } // Compose the next transaction let _cork = services().db.cork(); if !new_events.is_empty() { self.db.mark_as_active(&new_events)?; for (e, _) in new_events { events.push(e); } } // Add EDU's into the transaction if let Destination::Normal(server_name) = dest { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { events.extend(select_edus.into_iter().map(SendingEvent::Edu)); self.db.set_latest_educount(server_name, last_count)?; } } Ok(Some(events)) } #[tracing::instrument(skip_all)] fn select_events_current(&self, dest: Destination, statuses: &mut CurTransactionStatus) -> Result<(bool, bool)> { let (mut allow, mut retry) = (true, false); statuses .entry(dest) .and_modify(|e| match e { TransactionStatus::Failed(tries, time) => { // Fail if a request has failed recently (exponential backoff) let min = services().globals.config.sender_timeout; let max = services().globals.config.sender_retry_backoff_limit; if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) { allow = false; } else { retry = true; *e = TransactionStatus::Retrying(*tries); } }, TransactionStatus::Running | TransactionStatus::Retrying(_) => { allow = false; // already running }, }) .or_insert(TransactionStatus::Running); Ok((allow, retry)) } #[tracing::instrument(skip_all)] fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> { // u64: count of last edu let since = self.db.get_latest_educount(server_name)?; let mut events = Vec::new(); let mut max_edu_count = since; let mut device_list_changes = HashSet::new(); for room_id in services().rooms.state_cache.server_rooms(server_name) { let room_id = room_id?; // Look for device list updates in this room device_list_changes.extend( services() .users .keys_changed(room_id.as_ref(), since, None) .filter_map(Result::ok) .filter(|user_id| user_is_local(user_id)), ); if services().globals.allow_outgoing_read_receipts() && !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? { break; } } for user_id in device_list_changes { // Empty prev id forces synapse to resync; because synapse resyncs, // we can just insert placeholder data let edu = Edu::DeviceListUpdate(DeviceListUpdateContent { user_id, device_id: device_id!("placeholder").to_owned(), device_display_name: Some("Placeholder".to_owned()), stream_id: uint!(1), prev_id: Vec::new(), deleted: None, keys: None, }); events.push(serde_json::to_vec(&edu).expect("json can be serialized")); } if services().globals.allow_outgoing_presence() { select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?; } Ok((events, max_edu_count)) } } /// Look for presence fn select_edus_presence( server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, ) -> Result<bool> { // Look for presence updates for this server let mut presence_updates = Vec::new(); for (user_id, count, presence_bytes) in services().presence.presence_since(since) { *max_edu_count = cmp::max(count, *max_edu_count); if !user_is_local(&user_id) { continue; } if !services() .rooms .state_cache .server_sees_user(server_name, &user_id)? { continue; } let presence_event = Presence::from_json_bytes_to_event(&presence_bytes, &user_id)?; presence_updates.push(PresenceUpdate { user_id, presence: presence_event.content.presence, currently_active: presence_event.content.currently_active.unwrap_or(false), last_active_ago: presence_event .content .last_active_ago .unwrap_or_else(|| uint!(0)), status_msg: presence_event.content.status_msg, }); if presence_updates.len() >= SELECT_EDU_LIMIT { break; } } if !presence_updates.is_empty() { let presence_content = Edu::Presence(PresenceContent::new(presence_updates)); events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized")); } Ok(true) } /// Look for read receipts in this room fn select_edus_receipts( room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, ) -> Result<bool> { for r in services() .rooms .read_receipt .readreceipts_since(room_id, since) { let (user_id, count, read_receipt) = r?; *max_edu_count = cmp::max(count, *max_edu_count); if !user_is_local(&user_id) { continue; } let event = serde_json::from_str(read_receipt.json().get()) .map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event { let mut read = BTreeMap::new(); let (event_id, mut receipt) = r .content .0 .into_iter() .next() .expect("we only use one event per read receipt"); let receipt = receipt .remove(&ReceiptType::Read) .expect("our read receipts always set this") .remove(&user_id) .expect("our read receipts always have the user here"); read.insert( user_id, ReceiptData { data: receipt.clone(), event_ids: vec![event_id.clone()], }, ); let receipt_map = ReceiptMap { read, }; let mut receipts = BTreeMap::new(); receipts.insert(room_id.to_owned(), receipt_map); Edu::Receipt(ReceiptContent { receipts, }) } else { Error::bad_database("Invalid event type in read_receipts"); continue; }; events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); if events.len() >= SELECT_EDU_LIMIT { return Ok(false); } } Ok(true) } async fn send_events(dest: Destination, events: Vec<SendingEvent>) -> SendingResult { //debug_assert!(!events.is_empty(), "sending empty transaction"); match dest { Destination::Normal(ref server) => send_events_dest_normal(&dest, server, events).await, Destination::Appservice(ref id) => send_events_dest_appservice(&dest, id, events).await, Destination::Push(ref userid, ref pushkey) => send_events_dest_push(&dest, userid, pushkey, events).await, } } #[tracing::instrument(skip(dest, events))] async fn send_events_dest_appservice(dest: &Destination, id: &str, events: Vec<SendingEvent>) -> SendingResult { let mut pdu_jsons = Vec::new(); for event in &events { match event { SendingEvent::Pdu(pdu_id) => { pdu_jsons.push( services() .rooms .timeline .get_pdu_from_id(pdu_id) .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { ( dest.clone(), Error::bad_database("[Appservice] Event in servernameevent_data not found in db."), ) })? .to_room_event(), ); }, SendingEvent::Edu(_) | SendingEvent::Flush => { // Appservices don't need EDUs (?) and flush only; // no new content }, } } //debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction"); match appservice::send_request( services() .appservice .get_registration(id) .await .ok_or_else(|| { ( dest.clone(), Error::bad_database("[Appservice] Could not load registration from db."), ) })?, ruma::api::appservice::event::push_events::v1::Request { events: pdu_jsons, txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( &events .iter() .map(|e| match e { SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b, SendingEvent::Flush => &[], }) .collect::<Vec<_>>(), ))) .into(), }, ) .await { Ok(_) => Ok(dest.clone()), Err(e) => Err((dest.clone(), e)), } } #[tracing::instrument(skip(dest, events))] async fn send_events_dest_push( dest: &Destination, userid: &OwnedUserId, pushkey: &str, events: Vec<SendingEvent>, ) -> SendingResult { let mut pdus = Vec::new(); for event in &events { match event { SendingEvent::Pdu(pdu_id) => { pdus.push( services() .rooms .timeline .get_pdu_from_id(pdu_id) .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { ( dest.clone(), Error::bad_database("[Push] Event in servernameevent_data not found in db."), ) })?, ); }, SendingEvent::Edu(_) | SendingEvent::Flush => { // Push gateways don't need EDUs (?) and flush only; // no new content }, } } for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) if let Some(unsigned) = &pdu.unsigned { if let Ok(unsigned) = serde_json::from_str::<serde_json::Value>(unsigned.get()) { if unsigned.get("redacted_because").is_some() { continue; } } } let Some(pusher) = services() .pusher .get_pusher(userid, pushkey) .map_err(|e| (dest.clone(), e))? else { continue; }; let rules_for_user = services() .account_data .get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into()) .unwrap_or_default() .and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok()) .map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global); let unread: UInt = services() .rooms .user .notification_count(userid, &pdu.room_id) .map_err(|e| (dest.clone(), e))? .try_into() .expect("notification count can't go that high"); let _response = services() .pusher .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) .await .map(|_response| dest.clone()) .map_err(|e| (dest.clone(), e)); } Ok(dest.clone()) } #[tracing::instrument(skip(dest, events), name = "")] async fn send_events_dest_normal( dest: &Destination, server: &OwnedServerName, events: Vec<SendingEvent>, ) -> SendingResult { let mut edu_jsons = Vec::new(); let mut pdu_jsons = Vec::new(); for event in &events { match event { SendingEvent::Pdu(pdu_id) => pdu_jsons.push(convert_to_outgoing_federation_event( // TODO: check room version and remove event_id if needed services() .rooms .timeline .get_pdu_json_from_id(pdu_id) .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { error!(?dest, ?server, ?pdu_id, "event not found"); ( dest.clone(), Error::bad_database("[Normal] Event in servernameevent_data not found in db."), ) })?, )), SendingEvent::Edu(edu) => { if let Ok(raw) = serde_json::from_slice(edu) { edu_jsons.push(raw); } }, SendingEvent::Flush => { // flush only; no new content }, } } let client = &services().globals.client.sender; //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty // transaction"); send::send( client, server, send_transaction_message::v1::Request { origin: services().globals.server_name().to_owned(), pdus: pdu_jsons, edus: edu_jsons, origin_server_ts: MilliSecondsSinceUnixEpoch::now(), transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( &events .iter() .map(|e| match e { SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b, SendingEvent::Flush => &[], }) .collect::<Vec<_>>(), ))) .into(), }, ) .await .map(|response| { for pdu in response.pdus { if pdu.1.is_err() { warn!("error for {} from remote: {:?}", pdu.0, pdu.1); } } dest.clone() }) .map_err(|e| (dest.clone(), e)) } /// This does not return a full `Pdu` it is only to satisfy ruma's types. #[tracing::instrument] pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> { if let Some(unsigned) = pdu_json .get_mut("unsigned") .and_then(|val| val.as_object_mut()) { unsigned.remove("transaction_id"); } // room v3 and above removed the "event_id" field from remote PDU format if let Some(room_id) = pdu_json .get("room_id") .and_then(|val| RoomId::parse(val.as_str()?).ok()) { match services().rooms.state.get_room_version(&room_id) { Ok(room_version_id) => match room_version_id { RoomVersionId::V1 | RoomVersionId::V2 => {}, _ => _ = pdu_json.remove("event_id"), }, Err(_) => _ = pdu_json.remove("event_id"), } } else { pdu_json.remove("event_id"); } // TODO: another option would be to convert it to a canonical string to validate // size and return a Result<Raw<...>> // serde_json::from_str::<Raw<_>>( // ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is // valid serde_json::Value"), ) // .expect("Raw::from_value always works") to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value") }