diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 06e371e0ad356afd0080a0eba244898cd9f5d06e..d5924b1c8d33868a9f5926cd171dbb009a2b31c5 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -4,14 +4,14 @@ use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; use std::{ cell::RefCell, - collections::HashMap, + collections::{hash_map, HashMap}, future::Future, path::{Path, PathBuf}, pin::Pin, sync::Arc, }; use thread_local::ThreadLocal; -use tokio::sync::oneshot::Sender; +use tokio::sync::watch; use tracing::debug; thread_local! { @@ -126,7 +126,7 @@ fn flush(self: &Arc<Self>) -> Result<()> { pub struct SqliteTable { engine: Arc<Engine>, name: String, - watchers: RwLock<HashMap<Vec<u8>, Vec<Sender<()>>>>, + watchers: RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>, } type TupleOfBytes = (Vec<u8>, Vec<u8>); @@ -215,10 +215,8 @@ fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { if !triggered.is_empty() { let mut watchers = self.watchers.write(); for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } + if let Some(tx) = watchers.remove(prefix) { + let _ = tx.0.send(()); } } }; @@ -367,17 +365,18 @@ fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOf #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .entry(prefix.to_vec()) - .or_default() - .push(tx); + let mut rx = match self.watchers.write().entry(prefix.to_vec()) { + hash_map::Entry::Occupied(o) => o.get().1.clone(), + hash_map::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(()); + v.insert((tx, rx.clone())); + rx + } + }; Box::pin(async move { // Tx is never destroyed - rx.await.unwrap(); + rx.changed().await.unwrap(); }) }