diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index e4482f5baa8620844ea12aee26e44dbfc121ac7c..a799251d3e0e46564a6f305b5cb49593dc215b2f 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -55,34 +55,6 @@ pub enum OutgoingKind { Normal(OwnedServerName), } -impl OutgoingKind { - #[tracing::instrument(skip(self))] - pub fn get_prefix(&self) -> Vec<u8> { - let mut prefix = match self { - OutgoingKind::Appservice(server) => { - let mut p = b"+".to_vec(); - p.extend_from_slice(server.as_bytes()); - p - }, - OutgoingKind::Push(user, pushkey) => { - let mut p = b"$".to_vec(); - p.extend_from_slice(user.as_bytes()); - p.push(0xFF); - p.extend_from_slice(pushkey.as_bytes()); - p - }, - OutgoingKind::Normal(server) => { - let mut p = Vec::new(); - p.extend_from_slice(server.as_bytes()); - p - }, - }; - prefix.push(0xFF); - - prefix - } -} - #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[allow(clippy::module_name_repetitions)] pub enum SendingEventType { @@ -106,6 +78,7 @@ pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { receiver: Mutex::new(receiver), maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), startup_netburst: config.startup_netburst, + timeout: config.sender_timeout, }) } @@ -251,6 +224,41 @@ pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { Ok(()) } + #[tracing::instrument(skip(self, destination, request))] + pub async fn send_federation_request<T>(&self, destination: &ServerName, request: T) -> Result<T::IncomingResponse> + where + T: OutgoingRequest + Debug, + { + let permit = self.maximum_requests.acquire().await; + let timeout = Duration::from_secs(self.timeout); + let response = tokio::time::timeout(timeout, send::send_request(destination, request)) + .await + .map_err(|_| { + warn!("Timeout after 300 seconds waiting for server response of {destination}"); + Error::BadServerResponse("Timeout after 300 seconds waiting for server response") + })?; + drop(permit); + + response + } + + /// Sends a request to an appservice + /// + /// Only returns None if there is no url specified in the appservice + /// registration file + pub async fn send_appservice_request<T>( + &self, registration: Registration, request: T, + ) -> Option<Result<T::IncomingResponse>> + where + T: OutgoingRequest + Debug, + { + let permit = self.maximum_requests.acquire().await; + let response = appservice::send_request(registration, request).await; + drop(permit); + + response + } + pub fn start_handler(self: &Arc<Self>) { let self2 = Arc::clone(self); tokio::spawn(async move { @@ -286,7 +294,7 @@ async fn handler(&self) -> Result<()> { for (outgoing_kind, events) in initial_transactions { current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(Self::handle_events(outgoing_kind.clone(), events)); + futures.push(handle_events(outgoing_kind.clone(), events)); } } @@ -299,18 +307,19 @@ async fn handler(&self) -> Result<()> { self.db.delete_all_active_requests_for(&outgoing_kind)?; // Find events that have been added since starting the last request - let new_events = self.db.queued_requests(&outgoing_kind).filter_map(Result::ok).take(30).collect::<Vec<_>>(); + let new_events = self + .db + .queued_requests(&outgoing_kind) + .filter_map(Result::ok) + .take(30).collect::<Vec<_>>(); if !new_events.is_empty() { // Insert pdus we found self.db.mark_as_active(&new_events)?; - - futures.push( - Self::handle_events( - outgoing_kind.clone(), - new_events.into_iter().map(|(event, _)| event).collect(), - ) - ); + futures.push(handle_events( + outgoing_kind.clone(), + new_events.into_iter().map(|(event, _)| event).collect(), + )); } else { current_transaction_status.remove(&outgoing_kind); } @@ -333,7 +342,7 @@ async fn handler(&self) -> Result<()> { vec![(event, key)], &mut current_transaction_status, ) { - futures.push(Self::handle_events(outgoing_kind, events)); + futures.push(handle_events(outgoing_kind, events)); } } } @@ -347,69 +356,75 @@ fn select_events( new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, ) -> Result<Option<Vec<SendingEventType>>> { - let mut retry = false; - let mut allow = true; - - let _cork = services().globals.db.cork(); - let entry = current_transaction_status.entry(outgoing_kind.clone()); - - entry - .and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - }, - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); - } - }, - }) - .or_insert(TransactionStatus::Running); + let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; + // Nothing can be done for this remote, bail out. if !allow { return Ok(None); } + let _cork = services().globals.db.cork(); let mut events = Vec::new(); + // Must retry any previous transaction for this remote. if retry { - // We retry the previous transaction - for (_, e) in self - .db + self.db .active_requests_for(outgoing_kind) .filter_map(Result::ok) - { + .for_each(|(_, e)| events.push(e)); + + return Ok(Some(events)); + } + + // Compose the next transaction + let _cork = services().globals.db.cork(); + if !new_events.is_empty() { + self.db.mark_as_active(&new_events)?; + for (e, _) in new_events { events.push(e); } - } else { - if !new_events.is_empty() { - self.db.mark_as_active(&new_events)?; - for (e, _) in new_events { - events.push(e); - } - } - - if let OutgoingKind::Normal(server_name) = outgoing_kind { - if let Ok((select_edus, last_count)) = self.select_edus(server_name) { - events.extend(select_edus.into_iter().map(SendingEventType::Edu)); + } - self.db.set_latest_educount(server_name, last_count)?; - } + // Add EDU's into the transaction + if let OutgoingKind::Normal(server_name) = outgoing_kind { + if let Ok((select_edus, last_count)) = self.select_edus(server_name) { + events.extend(select_edus.into_iter().map(SendingEventType::Edu)); + self.db.set_latest_educount(server_name, last_count)?; } } Ok(Some(events)) } + #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] + fn select_events_current( + &self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, + ) -> Result<(bool, bool)> { + let (mut allow, mut retry) = (true, false); + current_transaction_status + .entry(outgoing_kind) + .and_modify(|e| match e { + TransactionStatus::Failed(tries, time) => { + // Fail if a request has failed recently (exponential backoff) + const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); + let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); + min_elapsed_duration = std::cmp::min(min_elapsed_duration, MAX_DURATION); + if time.elapsed() < min_elapsed_duration { + allow = false; + } else { + retry = true; + *e = TransactionStatus::Retrying(*tries); + } + }, + TransactionStatus::Running | TransactionStatus::Retrying(_) => { + allow = false; // already running + }, + }) + .or_insert(TransactionStatus::Running); + + Ok((allow, retry)) + } + #[tracing::instrument(skip(self, server_name))] pub fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> { // u64: count of last edu @@ -418,7 +433,7 @@ pub fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64 let mut max_edu_count = since; let mut device_list_changes = HashSet::new(); - 'outer: for room_id in services().rooms.state_cache.server_rooms(server_name) { + for room_id in services().rooms.state_cache.server_rooms(server_name) { let room_id = room_id?; // Look for device list updates in this room device_list_changes.extend( @@ -428,109 +443,17 @@ pub fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64 .filter_map(Result::ok) .filter(|user_id| user_id.server_name() == services().globals.server_name()), ); - - if services().globals.allow_outgoing_presence() { - // Look for presence updates in this room - let mut presence_updates = Vec::new(); - - for (user_id, count, presence_event) in services() - .rooms - .edus - .presence - .presence_since(&room_id, since) - { - if count > max_edu_count { - max_edu_count = count; - } - - if user_id.server_name() != services().globals.server_name() { - continue; - } - - presence_updates.push(PresenceUpdate { - user_id, - presence: presence_event.content.presence, - currently_active: presence_event.content.currently_active.unwrap_or(false), - last_active_ago: presence_event - .content - .last_active_ago - .unwrap_or_else(|| uint!(0)), - status_msg: presence_event.content.status_msg, - }); - } - - let presence_content = Edu::Presence(PresenceContent::new(presence_updates)); - events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized")); + if !select_edus_presence(&room_id, since, &mut max_edu_count, &mut events)? { + break; } - - // Look for read receipts in this room - for r in services() - .rooms - .edus - .read_receipt - .readreceipts_since(&room_id, since) - { - let (user_id, count, read_receipt) = r?; - - if count > max_edu_count { - max_edu_count = count; - } - - if user_id.server_name() != services().globals.server_name() { - continue; - } - - let event = serde_json::from_str(read_receipt.json().get()) - .map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; - let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event { - let mut read = BTreeMap::new(); - - let (event_id, mut receipt) = r - .content - .0 - .into_iter() - .next() - .expect("we only use one event per read receipt"); - let receipt = receipt - .remove(&ReceiptType::Read) - .expect("our read receipts always set this") - .remove(&user_id) - .expect("our read receipts always have the user here"); - - read.insert( - user_id, - ReceiptData { - data: receipt.clone(), - event_ids: vec![event_id.clone()], - }, - ); - - let receipt_map = ReceiptMap { - read, - }; - - let mut receipts = BTreeMap::new(); - receipts.insert(room_id.clone(), receipt_map); - - Edu::Receipt(ReceiptContent { - receipts, - }) - } else { - Error::bad_database("Invalid event type in read_receipts"); - continue; - }; - - events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); - - if events.len() >= 20 { - break 'outer; - } + if !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? { + break; } } for user_id in device_list_changes { - // Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767 - // Because synapse resyncs, we can just insert placeholder data + // Empty prev id forces synapse to resync; because synapse resyncs, + // we can just insert placeholder data let edu = Edu::DeviceListUpdate(DeviceListUpdateContent { user_id, device_id: device_id!("placeholder").to_owned(), @@ -546,297 +469,379 @@ pub fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64 Ok((events, max_edu_count)) } +} - #[tracing::instrument(skip(events, kind))] - async fn handle_events( - kind: OutgoingKind, events: Vec<SendingEventType>, - ) -> Result<OutgoingKind, (OutgoingKind, Error)> { - match &kind { - OutgoingKind::Appservice(id) => { - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdu_jsons.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id) - .map_err(|e| (kind.clone(), e))? - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database( - "[Appservice] Event in servernameevent_data not found in db.", - ), - ) - })? - .to_room_event(), - ); - }, - SendingEventType::Edu(_) | SendingEventType::Flush => { - // Appservices don't need EDUs (?) and flush only; - // no new content - }, - } - } +/// Look for presence [in this room] <--- XXX +#[tracing::instrument(skip(room_id, since, max_edu_count, events))] +pub fn select_edus_presence( + room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, +) -> Result<bool> { + if !services().globals.allow_outgoing_presence() { + return Ok(true); + } - let permit = services().sending.maximum_requests.acquire().await; + // Look for presence updates in this room + let mut presence_updates = Vec::new(); + for (user_id, count, presence_event) in services() + .rooms + .edus + .presence + .presence_since(room_id, since) + { + if count > *max_edu_count { + *max_edu_count = count; + } + if user_id.server_name() != services().globals.server_name() { + continue; + } - let response = match appservice::send_request( - services() - .appservice - .get_registration(id) - .await - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database("[Appservice] Could not load registration from db."), - ) - })?, - ruma::api::appservice::event::push_events::v1::Request { - events: pdu_jsons, - txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, - SendingEventType::Flush => &[], - }) - .collect::<Vec<_>>(), - ))) - .into(), - }, - ) - .await - { - None => Ok(kind.clone()), - Some(op_resp) => op_resp - .map(|_response| kind.clone()) - .map_err(|e| (kind.clone(), e)), - }; + presence_updates.push(PresenceUpdate { + user_id, + presence: presence_event.content.presence, + currently_active: presence_event.content.currently_active.unwrap_or(false), + last_active_ago: presence_event + .content + .last_active_ago + .unwrap_or_else(|| uint!(0)), + status_msg: presence_event.content.status_msg, + }); + } - drop(permit); + let presence_content = Edu::Presence(PresenceContent::new(presence_updates)); + events.push(serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized")); - response - }, - OutgoingKind::Push(userid, pushkey) => { - let mut pdus = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - pdus.push( - services() - .rooms - .timeline - .get_pdu_from_id(pdu_id) - .map_err(|e| (kind.clone(), e))? - .ok_or_else(|| { - ( - kind.clone(), - Error::bad_database( - "[Push] Event in servernamevent_datas not found in db.", - ), - ) - })?, - ); - }, - SendingEventType::Edu(_) | SendingEventType::Flush => { - // Push gateways don't need EDUs (?) and flush only; - // no new content - }, - } - } + Ok(true) +} - for pdu in pdus { - // Redacted events are not notification targets (we don't send push for them) - if let Some(unsigned) = &pdu.unsigned { - if let Ok(unsigned) = serde_json::from_str::<serde_json::Value>(unsigned.get()) { - if unsigned.get("redacted_because").is_some() { - continue; - } - } - } +/// Look for read receipts in this room +#[tracing::instrument(skip(room_id, since, max_edu_count, events))] +pub fn select_edus_receipts( + room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>, +) -> Result<bool> { + for r in services() + .rooms + .edus + .read_receipt + .readreceipts_since(room_id, since) + { + let (user_id, count, read_receipt) = r?; - let Some(pusher) = services() - .pusher - .get_pusher(userid, pushkey) - .map_err(|e| (OutgoingKind::Push(userid.clone(), pushkey.clone()), e))? - else { - continue; - }; + if count > *max_edu_count { + *max_edu_count = count; + } + if user_id.server_name() != services().globals.server_name() { + continue; + } - let rules_for_user = services() - .account_data - .get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into()) - .unwrap_or_default() - .and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok()) - .map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global); + let event = serde_json::from_str(read_receipt.json().get()) + .map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; + let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event { + let mut read = BTreeMap::new(); + + let (event_id, mut receipt) = r + .content + .0 + .into_iter() + .next() + .expect("we only use one event per read receipt"); + let receipt = receipt + .remove(&ReceiptType::Read) + .expect("our read receipts always set this") + .remove(&user_id) + .expect("our read receipts always have the user here"); + + read.insert( + user_id, + ReceiptData { + data: receipt.clone(), + event_ids: vec![event_id.clone()], + }, + ); - let unread: UInt = services() - .rooms - .user - .notification_count(userid, &pdu.room_id) - .map_err(|e| (kind.clone(), e))? - .try_into() - .expect("notification count can't go that high"); + let receipt_map = ReceiptMap { + read, + }; - let permit = services().sending.maximum_requests.acquire().await; + let mut receipts = BTreeMap::new(); + receipts.insert(room_id.to_owned(), receipt_map); - let _response = services() - .pusher - .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) - .await - .map(|_response| kind.clone()) - .map_err(|e| (kind.clone(), e)); + Edu::Receipt(ReceiptContent { + receipts, + }) + } else { + Error::bad_database("Invalid event type in read_receipts"); + continue; + }; - drop(permit); - } - Ok(OutgoingKind::Push(userid.clone(), pushkey.clone())) - }, - OutgoingKind::Normal(server) => { - let mut edu_jsons = Vec::new(); - let mut pdu_jsons = Vec::new(); - - for event in &events { - match event { - SendingEventType::Pdu(pdu_id) => { - // TODO: check room version and remove event_id if needed - let raw = PduEvent::convert_to_outgoing_federation_event( - services() - .rooms - .timeline - .get_pdu_json_from_id(pdu_id) - .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? - .ok_or_else(|| { - error!("event not found: {server} {pdu_id:?}"); - ( - OutgoingKind::Normal(server.clone()), - Error::bad_database( - "[Normal] Event in servernamevent_datas not found in db.", - ), - ) - })?, - ); - pdu_jsons.push(raw); - }, - SendingEventType::Edu(edu) => { - if let Ok(raw) = serde_json::from_slice(edu) { - edu_jsons.push(raw); - } - }, - SendingEventType::Flush => { - // flush only; no new content - }, - } - } + events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); - let permit = services().sending.maximum_requests.acquire().await; - - let response = send::send_request( - server, - send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( - &events - .iter() - .map(|e| match e { - SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, - SendingEventType::Flush => &[], - }) - .collect::<Vec<_>>(), - ))) - .into(), - }, - ) - .await - .map(|response| { - for pdu in response.pdus { - if pdu.1.is_err() { - warn!("Failed to send to {}: {:?}", server, pdu); - } - } - kind.clone() - }) - .map_err(|e| (kind, e)); + if events.len() >= 20 { + return Ok(false); + } + } + + Ok(true) +} + +#[tracing::instrument(skip(events, kind))] +async fn handle_events( + kind: OutgoingKind, events: Vec<SendingEventType>, +) -> Result<OutgoingKind, (OutgoingKind, Error)> { + match kind { + OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await, + OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await, + OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await, + } +} - drop(permit); +#[tracing::instrument(skip(kind, events))] +async fn handle_events_kind_appservice( + kind: &OutgoingKind, id: &String, events: Vec<SendingEventType>, +) -> Result<OutgoingKind, (OutgoingKind, Error)> { + let mut pdu_jsons = Vec::new(); - response + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdu_jsons.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id) + .map_err(|e| (kind.clone(), e))? + .ok_or_else(|| { + ( + kind.clone(), + Error::bad_database("[Appservice] Event in servernameevent_data not found in db."), + ) + })? + .to_room_event(), + ); + }, + SendingEventType::Edu(_) | SendingEventType::Flush => { + // Appservices don't need EDUs (?) and flush only; + // no new content }, } } - #[tracing::instrument(skip(self, destination, request))] - pub async fn send_federation_request<T>(&self, destination: &ServerName, request: T) -> Result<T::IncomingResponse> - where - T: OutgoingRequest + Debug, + let permit = services().sending.maximum_requests.acquire().await; + + let response = match appservice::send_request( + services() + .appservice + .get_registration(id) + .await + .ok_or_else(|| { + ( + kind.clone(), + Error::bad_database("[Appservice] Could not load registration from db."), + ) + })?, + ruma::api::appservice::event::push_events::v1::Request { + events: pdu_jsons, + txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + SendingEventType::Flush => &[], + }) + .collect::<Vec<_>>(), + ))) + .into(), + }, + ) + .await { - if !services().globals.allow_federation() { - return Err(Error::bad_config("Federation is disabled.")); - } + None => Ok(kind.clone()), + Some(op_resp) => op_resp + .map(|_response| kind.clone()) + .map_err(|e| (kind.clone(), e)), + }; - if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) { - info!( - "Destination {} is an IP literal, checking against IP range denylist.", - destination - ); - let ip = IPAddress::parse(destination.host()).map_err(|e| { - warn!("Failed to parse IP literal from string: {}", e); - Error::BadServerResponse("Invalid IP address") - })?; + drop(permit); - let cidr_ranges_s = services().globals.ip_range_denylist().to_vec(); - let mut cidr_ranges: Vec<IPAddress> = Vec::new(); + response +} - for cidr in cidr_ranges_s { - cidr_ranges.push(IPAddress::parse(cidr).expect("we checked this at startup")); - } +#[tracing::instrument(skip(kind, events))] +async fn handle_events_kind_push( + kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>, +) -> Result<OutgoingKind, (OutgoingKind, Error)> { + let mut pdus = Vec::new(); - debug!("List of pushed CIDR ranges: {:?}", cidr_ranges); + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdus.push( + services() + .rooms + .timeline + .get_pdu_from_id(pdu_id) + .map_err(|e| (kind.clone(), e))? + .ok_or_else(|| { + ( + kind.clone(), + Error::bad_database("[Push] Event in servernamevent_datas not found in db."), + ) + })?, + ); + }, + SendingEventType::Edu(_) | SendingEventType::Flush => { + // Push gateways don't need EDUs (?) and flush only; + // no new content + }, + } + } - for cidr in cidr_ranges { - if cidr.includes(&ip) { - return Err(Error::BadServerResponse("Not allowed to send requests to this IP")); + for pdu in pdus { + // Redacted events are not notification targets (we don't send push for them) + if let Some(unsigned) = &pdu.unsigned { + if let Ok(unsigned) = serde_json::from_str::<serde_json::Value>(unsigned.get()) { + if unsigned.get("redacted_because").is_some() { + continue; } } - - info!("IP literal {} is allowed.", destination); } - debug!("Waiting for permit"); - let permit = self.maximum_requests.acquire().await; - debug!("Got permit"); - let response = tokio::time::timeout(Duration::from_secs(5 * 60), send::send_request(destination, request)) + let Some(pusher) = services() + .pusher + .get_pusher(userid, pushkey) + .map_err(|e| (kind.clone(), e))? + else { + continue; + }; + + let rules_for_user = services() + .account_data + .get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into()) + .unwrap_or_default() + .and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok()) + .map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global); + + let unread: UInt = services() + .rooms + .user + .notification_count(userid, &pdu.room_id) + .map_err(|e| (kind.clone(), e))? + .try_into() + .expect("notification count can't go that high"); + + let permit = services().sending.maximum_requests.acquire().await; + + let _response = services() + .pusher + .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) .await - .map_err(|_| { - warn!("Timeout after 300 seconds waiting for server response of {destination}"); - Error::BadServerResponse("Timeout after 300 seconds waiting for server response") - })?; + .map(|_response| kind.clone()) + .map_err(|e| (kind.clone(), e)); + drop(permit); + } - response + Ok(kind.clone()) +} + +#[tracing::instrument(skip(kind, events))] +async fn handle_events_kind_normal( + kind: &OutgoingKind, server: &OwnedServerName, events: Vec<SendingEventType>, +) -> Result<OutgoingKind, (OutgoingKind, Error)> { + let mut edu_jsons = Vec::new(); + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + // TODO: check room version and remove event_id if needed + let raw = PduEvent::convert_to_outgoing_federation_event( + services() + .rooms + .timeline + .get_pdu_json_from_id(pdu_id) + .map_err(|e| (kind.clone(), e))? + .ok_or_else(|| { + error!("event not found: {server} {pdu_id:?}"); + ( + kind.clone(), + Error::bad_database("[Normal] Event in servernamevent_datas not found in db."), + ) + })?, + ); + pdu_jsons.push(raw); + }, + SendingEventType::Edu(edu) => { + if let Ok(raw) = serde_json::from_slice(edu) { + edu_jsons.push(raw); + } + }, + SendingEventType::Flush => { + // flush only; no new content + }, + } } - /// Sends a request to an appservice - /// - /// Only returns None if there is no url specified in the appservice - /// registration file - pub async fn send_appservice_request<T>( - &self, registration: Registration, request: T, - ) -> Option<Result<T::IncomingResponse>> - where - T: OutgoingRequest + Debug, - { - let permit = self.maximum_requests.acquire().await; - let response = appservice::send_request(registration, request).await; - drop(permit); + let permit = services().sending.maximum_requests.acquire().await; + + let response = send::send_request( + server, + send_transaction_message::v1::Request { + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + transaction_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + SendingEventType::Flush => &[], + }) + .collect::<Vec<_>>(), + ))) + .into(), + }, + ) + .await + .map(|response| { + for pdu in response.pdus { + if pdu.1.is_err() { + warn!("Failed to send to {}: {:?}", server, pdu); + } + } + kind.clone() + }) + .map_err(|e| (kind.clone(), e)); - response + drop(permit); + + response +} + +impl OutgoingKind { + #[tracing::instrument(skip(self))] + pub fn get_prefix(&self) -> Vec<u8> { + let mut prefix = match self { + OutgoingKind::Appservice(server) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(server.as_bytes()); + p + }, + OutgoingKind::Push(user, pushkey) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(user.as_bytes()); + p.push(0xFF); + p.extend_from_slice(pushkey.as_bytes()); + p + }, + OutgoingKind::Normal(server) => { + let mut p = Vec::new(); + p.extend_from_slice(server.as_bytes()); + p + }, + }; + prefix.push(0xFF); + + prefix } }