Skip to content
Snippets Groups Projects
Commit b0ac5255 authored by Jason Volk's avatar Jason Volk
Browse files

move sending service impl properly back to mod root


Signed-off-by: default avatarJason Volk <jason@zemos.net>
parent 29fc5b9b
No related branches found
No related tags found
1 merge request!530de-global services
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use conduit::{err, Result, Server}; use conduit::{err, Result, Server};
use ruma::{ use ruma::{
api::{appservice::Registration, OutgoingRequest}, api::{appservice::Registration, OutgoingRequest},
...@@ -47,6 +48,32 @@ pub enum SendingEvent { ...@@ -47,6 +48,32 @@ pub enum SendingEvent {
Flush, // none Flush, // none
} }
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let (sender, receiver) = loole::unbounded();
Ok(Arc::new(Self {
db: data::Data::new(args.db.clone()),
server: args.server.clone(),
sender,
receiver: Mutex::new(receiver),
}))
}
async fn worker(self: Arc<Self>) -> Result<()> {
// trait impl can't be split between files so this just glues to mod sender
self.sender().await
}
fn interrupt(&self) {
if !self.sender.is_closed() {
self.sender.close();
}
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service { impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey), level = "debug")] #[tracing::instrument(skip(self, pdu_id, user, pushkey), level = "debug")]
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
......
...@@ -2,11 +2,9 @@ ...@@ -2,11 +2,9 @@
cmp, cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn}; use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn};
use federation::transactions::send_transaction_message; use federation::transactions::send_transaction_message;
...@@ -24,9 +22,9 @@ ...@@ -24,9 +22,9 @@
ServerName, UInt, ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::{sync::Mutex, time::sleep_until}; use tokio::time::sleep_until;
use super::{appservice, data::Data, send, Destination, Msg, SendingEvent, Service}; use super::{appservice, send, Destination, Msg, SendingEvent, Service};
use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result};
#[derive(Debug)] #[derive(Debug)]
...@@ -46,20 +44,9 @@ enum TransactionStatus { ...@@ -46,20 +44,9 @@ enum TransactionStatus {
const SELECT_EDU_LIMIT: usize = 16; const SELECT_EDU_LIMIT: usize = 16;
const CLEANUP_TIMEOUT_MS: u64 = 3500; const CLEANUP_TIMEOUT_MS: u64 = 3500;
#[async_trait] impl Service {
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let (sender, receiver) = loole::unbounded();
Ok(Arc::new(Self {
db: Data::new(args.db.clone()),
server: args.server.clone(),
sender,
receiver: Mutex::new(receiver),
}))
}
#[tracing::instrument(skip_all, name = "sender")] #[tracing::instrument(skip_all, name = "sender")]
async fn worker(self: Arc<Self>) -> Result<()> { pub(super) async fn sender(&self) -> Result<()> {
let receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
let mut futures: SendingFutures<'_> = FuturesUnordered::new(); let mut futures: SendingFutures<'_> = FuturesUnordered::new();
let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
...@@ -82,16 +69,6 @@ async fn worker(self: Arc<Self>) -> Result<()> { ...@@ -82,16 +69,6 @@ async fn worker(self: Arc<Self>) -> Result<()> {
Ok(()) Ok(())
} }
fn interrupt(&self) {
if !self.sender.is_closed() {
self.sender.close();
}
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
fn handle_response( fn handle_response(
&self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, &self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
) { ) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment