Skip to content
Snippets Groups Projects
Commit 49e453fe authored by Jason Volk's avatar Jason Volk Committed by 🥺
Browse files

cleanup/refactor sender base loop


Signed-off-by: default avatarJason Volk <jason@zemos.net>
parent d19573c7
No related branches found
No related tags found
No related merge requests found
......@@ -4,15 +4,13 @@
database::KeyValueDatabase,
service::{
self,
sending::{Destination, SendingEventType},
sending::{Destination, SendingEvent},
},
services, utils, Error, Result,
};
impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Destination, SendingEventType)>> + 'a> {
fn active_requests<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Destination, SendingEvent)>> + 'a> {
Box::new(
self.servercurrentevent_data
.iter()
......@@ -22,7 +20,7 @@ fn active_requests<'a>(
fn active_requests_for<'a>(
&'a self, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEvent)>> + 'a> {
let prefix = destination.get_prefix();
Box::new(
self.servercurrentevent_data
......@@ -55,17 +53,17 @@ fn delete_all_requests_for(&self, destination: &Destination) -> Result<()> {
Ok(())
}
fn queue_requests(&self, requests: &[(&Destination, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
fn queue_requests(&self, requests: &[(&Destination, SendingEvent)]) -> Result<Vec<Vec<u8>>> {
let mut batch = Vec::new();
let mut keys = Vec::new();
for (destination, event) in requests {
let mut key = destination.get_prefix();
if let SendingEventType::Pdu(value) = &event {
if let SendingEvent::Pdu(value) = &event {
key.extend_from_slice(value);
} else {
key.extend_from_slice(&services().globals.next_count()?.to_be_bytes());
}
let value = if let SendingEventType::Edu(value) = &event {
let value = if let SendingEvent::Edu(value) = &event {
&**value
} else {
&[]
......@@ -80,7 +78,7 @@ fn queue_requests(&self, requests: &[(&Destination, SendingEventType)]) -> Resul
fn queued_requests<'a>(
&'a self, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
) -> Box<dyn Iterator<Item = Result<(SendingEvent, Vec<u8>)>> + 'a> {
let prefix = destination.get_prefix();
return Box::new(
self.servernameevent_data
......@@ -89,13 +87,13 @@ fn queued_requests<'a>(
);
}
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()> {
fn mark_as_active(&self, events: &[(SendingEvent, Vec<u8>)]) -> Result<()> {
for (e, key) in events {
if key.is_empty() {
continue;
}
let value = if let SendingEventType::Edu(value) = &e {
let value = if let SendingEvent::Edu(value) = &e {
&**value
} else {
&[]
......@@ -122,7 +120,7 @@ fn get_latest_educount(&self, server_name: &ServerName) -> Result<u64> {
}
#[tracing::instrument(skip(key))]
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(Destination, SendingEventType)> {
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(Destination, SendingEvent)> {
// Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
......@@ -138,9 +136,9 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(Destination,
(
Destination::Appservice(server),
if value.is_empty() {
SendingEventType::Pdu(event.to_vec())
SendingEvent::Pdu(event.to_vec())
} else {
SendingEventType::Edu(value)
SendingEvent::Edu(value)
},
)
} else if key.starts_with(b"$") {
......@@ -165,10 +163,10 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(Destination,
(
Destination::Push(user_id, pushkey_string),
if value.is_empty() {
SendingEventType::Pdu(event.to_vec())
SendingEvent::Pdu(event.to_vec())
} else {
// I'm pretty sure this should never be called
SendingEventType::Edu(value)
SendingEvent::Edu(value)
},
)
} else {
......@@ -188,9 +186,9 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(Destination,
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
),
if value.is_empty() {
SendingEventType::Pdu(event.to_vec())
SendingEvent::Pdu(event.to_vec())
} else {
SendingEventType::Edu(value)
SendingEvent::Edu(value)
},
)
})
......
use ruma::ServerName;
use super::{Destination, SendingEventType};
use super::{Destination, SendingEvent};
use crate::Result;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Destination, SendingEventType)>> + 'a>;
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Destination, SendingEvent)>> + 'a>;
type SendingEventIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEvent)>> + 'a>;
pub(crate) trait Data: Send + Sync {
fn active_requests(&self) -> OutgoingSendingIter<'_>;
fn active_requests_for(&self, destination: &Destination) -> SendingEventTypeIter<'_>;
fn active_requests_for(&self, destination: &Destination) -> SendingEventIter<'_>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, destination: &Destination) -> Result<()>;
/// TODO: use this?
#[allow(dead_code)]
fn delete_all_requests_for(&self, destination: &Destination) -> Result<()>;
fn queue_requests(&self, requests: &[(&Destination, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
fn queue_requests(&self, requests: &[(&Destination, SendingEvent)]) -> Result<Vec<Vec<u8>>>;
fn queued_requests<'a>(
&'a self, destination: &Destination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
) -> Box<dyn Iterator<Item = Result<(SendingEvent, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEvent, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;
fn get_latest_educount(&self, server_name: &ServerName) -> Result<u64>;
}
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment