From 15126ee1b2ee41014a84dae8d41a019ecdb80c5f Mon Sep 17 00:00:00 2001 From: Jason Volk <jason@zemos.net> Date: Sun, 28 Jul 2024 09:03:17 +0000 Subject: [PATCH] additional weak references where applicable Signed-off-by: Jason Volk <jason@zemos.net> --- src/service/admin/mod.rs | 19 ++++++++---- src/service/manager.rs | 3 +- src/service/service.rs | 63 +++++++++++++++++++++++----------------- src/service/services.rs | 29 +++++++++++------- 4 files changed, 70 insertions(+), 44 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 71f3b73e6..00d6597ed 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -5,7 +5,7 @@ use std::{ future::Future, pin::Pin, - sync::{Arc, RwLock as StdRwLock}, + sync::{Arc, RwLock as StdRwLock, Weak}, }; use async_trait::async_trait; @@ -41,7 +41,7 @@ struct Services { timeline: Dep<rooms::timeline::Service>, state: Dep<rooms::state::Service>, state_cache: Dep<rooms::state_cache::Service>, - services: StdRwLock<Option<Arc<crate::Services>>>, + services: StdRwLock<Option<Weak<crate::Services>>>, } #[derive(Debug)] @@ -174,7 +174,14 @@ async fn handle_command(&self, command: CommandInput) { } async fn process_command(&self, command: CommandInput) -> CommandResult { - let Some(services) = self.services.services.read().expect("locked").clone() else { + let Some(services) = self + .services + .services + .read() + .expect("locked") + .as_ref() + .and_then(Weak::upgrade) + else { return Err!("Services self-reference not initialized."); }; @@ -365,7 +372,9 @@ async fn console_auto_stop(&self) { /// Sets the self-reference to crate::Services which will provide context to /// the admin commands. - pub(super) fn set_services(&self, services: Option<Arc<crate::Services>>) { - *self.services.services.write().expect("locked for writing") = services; + pub(super) fn set_services(&self, services: &Option<Arc<crate::Services>>) { + let receiver = &mut *self.services.services.write().expect("locked for writing"); + let weak = services.as_ref().map(Arc::downgrade); + *receiver = weak; } } diff --git a/src/service/manager.rs b/src/service/manager.rs index 087fd3fac..42260bb30 100644 --- a/src/service/manager.rs +++ b/src/service/manager.rs @@ -60,7 +60,8 @@ pub(super) async fn start(self: Arc<Self>) -> Result<()> { .read() .expect("locked for reading") .values() - .map(|v| v.0.clone()) + .map(|val| val.0.upgrade()) + .map(|arc| arc.expect("services available for manager startup")) .collect(); debug!("Starting service workers..."); diff --git a/src/service/service.rs b/src/service/service.rs index d0d8d9402..0b9bc76c7 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -57,8 +57,10 @@ pub(crate) struct Dep<T> { name: &'static str, } -pub(crate) type Map = RwLock<BTreeMap<String, MapVal>>; -pub(crate) type MapVal = (Arc<dyn Service>, Arc<dyn Any + Send + Sync>); +pub(crate) type Map = RwLock<MapType>; +pub(crate) type MapType = BTreeMap<MapKey, MapVal>; +pub(crate) type MapVal = (Weak<dyn Service>, Weak<dyn Any + Send + Sync>); +pub(crate) type MapKey = String; impl<T: Send + Sync + 'static> Deref for Dep<T> { type Target = Arc<T>; @@ -76,9 +78,9 @@ fn deref(&self) -> &Self::Target { } } -impl Args<'_> { +impl<'a> Args<'a> { /// Create a lazy-reference to a service when constructing another Service. - pub(crate) fn depend<T: Send + Sync + 'static>(&self, name: &'static str) -> Dep<T> { + pub(crate) fn depend<T: Send + Sync + 'a + 'static>(&'a self, name: &'static str) -> Dep<T> { Dep::<T> { dep: OnceLock::new(), service: Arc::downgrade(self.service), @@ -88,48 +90,55 @@ pub(crate) fn depend<T: Send + Sync + 'static>(&self, name: &'static str) -> Dep /// Create a reference immediately to a service when constructing another /// Service. The other service must be constructed. - pub(crate) fn require<T: Send + Sync + 'static>(&self, name: &str) -> Arc<T> { require::<T>(self.service, name) } + pub(crate) fn require<T: Send + Sync + 'a + 'static>(&'a self, name: &'static str) -> Arc<T> { + require::<T>(self.service, name) + } } /// Reference a Service by name. Panics if the Service does not exist or was /// incorrectly cast. -pub(crate) fn require<T: Send + Sync + 'static>(map: &Map, name: &str) -> Arc<T> { +pub(crate) fn require<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Arc<T> { try_get::<T>(map, name) .inspect_err(inspect_log) .expect("Failure to reference service required by another service.") } -/// Reference a Service by name. Returns Err if the Service does not exist or -/// was incorrectly cast. -pub(crate) fn try_get<T: Send + Sync + 'static>(map: &Map, name: &str) -> Result<Arc<T>> { - map.read() - .expect("locked for reading") - .get(name) - .map_or_else( - || Err!("Service {name:?} does not exist or has not been built yet."), - |(_, s)| { - s.clone() - .downcast::<T>() - .map_err(|_| err!("Service {name:?} must be correctly downcast.")) - }, - ) -} - /// Reference a Service by name. Returns None if the Service does not exist, but /// panics if incorrectly cast. /// /// # Panics /// Incorrect type is not a silent failure (None) as the type never has a reason /// to be incorrect. -pub(crate) fn get<T: Send + Sync + 'static>(map: &Map, name: &str) -> Option<Arc<T>> { +pub(crate) fn get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Option<Arc<T>> { map.read() .expect("locked for reading") .get(name) .map(|(_, s)| { - s.clone() - .downcast::<T>() - .expect("Service must be correctly downcast.") - }) + s.upgrade().map(|s| { + s.downcast::<T>() + .expect("Service must be correctly downcast.") + }) + })? +} + +/// Reference a Service by name. Returns Err if the Service does not exist or +/// was incorrectly cast. +pub(crate) fn try_get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(map: &'b Map, name: &'a str) -> Result<Arc<T>> { + map.read() + .expect("locked for reading") + .get(name) + .map_or_else( + || Err!("Service {name:?} does not exist or has not been built yet."), + |(_, s)| { + s.upgrade().map_or_else( + || Err!("Service {name:?} no longer exists."), + |s| { + s.downcast::<T>() + .map_err(|_| err!("Service {name:?} must be correctly downcast.")) + }, + ) + }, + ) } /// Utility for service implementations; see Service::name() in the trait. diff --git a/src/service/services.rs b/src/service/services.rs index b283db6cf..74e11d0b4 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -109,7 +109,7 @@ macro_rules! build { pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> { debug_info!("Starting services..."); - self.admin.set_services(Some(Arc::clone(self))); + self.admin.set_services(&Some(Arc::clone(self))); globals::migrations::migrations(self).await?; self.manager .lock() @@ -131,7 +131,7 @@ pub async fn stop(&self) { manager.stop().await; } - self.admin.set_services(None); + self.admin.set_services(&None); debug_info!("Services shutdown complete."); } @@ -146,7 +146,9 @@ pub async fn poll(&self) -> Result<()> { pub async fn clear_cache(&self) { for (service, ..) in self.service.read().expect("locked for reading").values() { - service.clear_cache(); + if let Some(service) = service.upgrade() { + service.clear_cache(); + } } //TODO @@ -161,7 +163,9 @@ pub async fn clear_cache(&self) { pub async fn memory_usage(&self) -> Result<String> { let mut out = String::new(); for (service, ..) in self.service.read().expect("locked for reading").values() { - service.memory_usage(&mut out)?; + if let Some(service) = service.upgrade() { + service.memory_usage(&mut out)?; + } } //TODO @@ -179,27 +183,30 @@ pub async fn memory_usage(&self) -> Result<String> { fn interrupt(&self) { debug!("Interrupting services..."); - for (name, (service, ..)) in self.service.read().expect("locked for reading").iter() { - trace!("Interrupting {name}"); - service.interrupt(); + if let Some(service) = service.upgrade() { + trace!("Interrupting {name}"); + service.interrupt(); + } } } - pub fn try_get<T: Send + Sync + 'static>(&self, name: &str) -> Result<Arc<T>> { + pub fn try_get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(&'b self, name: &'a str) -> Result<Arc<T>> { service::try_get::<T>(&self.service, name) } - pub fn get<T: Send + Sync + 'static>(&self, name: &str) -> Option<Arc<T>> { service::get::<T>(&self.service, name) } + pub fn get<'a, 'b, T: Send + Sync + 'a + 'b + 'static>(&'b self, name: &'a str) -> Option<Arc<T>> { + service::get::<T>(&self.service, name) + } } +#[allow(clippy::needless_pass_by_value)] fn add_service(map: &Arc<Map>, s: Arc<dyn Service>, a: Arc<dyn Any + Send + Sync>) { let name = s.name(); let len = map.read().expect("locked for reading").len(); trace!("built service #{len}: {name:?}"); - map.write() .expect("locked for writing") - .insert(name.to_owned(), (s, a)); + .insert(name.to_owned(), (Arc::downgrade(&s), Arc::downgrade(&a))); } -- GitLab