diff --git a/Cargo.lock b/Cargo.lock index a330d22f2805a555724a2aa334941b8af42e6807..18f9633c4765ed737a16c04c69c412a1498bebeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -355,6 +355,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +[[package]] +name = "chrono" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +dependencies = [ + "num-traits", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -421,6 +430,7 @@ dependencies = [ "axum-server-dual-protocol", "base64 0.22.0", "bytes", + "chrono", "clap", "cyborgtime", "either", diff --git a/Cargo.toml b/Cargo.toml index c2987d7333463cd1029bbf7ab7da370ba5c24293..eb9976827043465c90717a9d838407a901a3b4a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,12 @@ cyborgtime = "2.1.1" bytes = "1.5.0" http = "0.2.12" +# standard date and time tools +[dependencies.chrono] +version = "0.4.35" +features = ["alloc"] +default-features = false + # Web framework [dependencies.axum] version = "0.6.20" diff --git a/src/config/mod.rs b/src/config/mod.rs index 3aae2b26f6faaa78f73a258de21d4ca550383fd2..51a7d07471d27765cb58390e465a313fa4a22fec 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -42,6 +42,9 @@ pub struct Config { #[serde(default = "default_database_backend")] pub database_backend: String, pub database_path: String, + pub database_backup_path: Option<String>, + #[serde(default = "default_database_backups_to_keep")] + pub database_backups_to_keep: i16, #[serde(default = "default_db_cache_capacity_mb")] pub db_cache_capacity_mb: f64, #[serde(default = "default_new_user_displayname_suffix")] @@ -252,6 +255,14 @@ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { ("Server name", self.server_name.host()), ("Database backend", &self.database_backend), ("Database path", &self.database_path), + ( + "Database backup path", + match self.database_backup_path.as_ref() { + Some(path) => path, + None => "", + }, + ), + ("Database backups to keep", &self.database_backups_to_keep.to_string()), ("Database cache capacity (MB)", &self.db_cache_capacity_mb.to_string()), ("Cache capacity modifier", &self.conduit_cache_capacity_modifier.to_string()), ("PDU cache capacity", &self.pdu_cache_capacity.to_string()), @@ -446,6 +457,8 @@ fn default_port() -> ListeningPort { fn default_unix_socket_perms() -> u32 { 660 } +fn default_database_backups_to_keep() -> i16 { 1 } + fn default_database_backend() -> String { "rocksdb".to_owned() } fn default_db_cache_capacity_mb() -> f64 { 256.0 } diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index cdc308eebe38590a99e730e75773055e1ad8e2d9..5f6e6000b2c1696b2edf27184458f6f08fb55e2f 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin, sync::Arc}; +use std::{error::Error, future::Future, pin::Pin, sync::Arc}; use super::Config; use crate::Result; @@ -26,6 +26,14 @@ fn memory_usage(&self) -> Result<String> { #[allow(dead_code)] fn clear_caches(&self) {} + + fn backup(&self) -> Result<(), Box<dyn Error>> { + unimplemented!() + } + + fn backup_list(&self) -> Result<String> { + Ok(String::new()) + } } pub(crate) trait KvTree: Send + Sync { diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 34af27f3d4665ca85c5a6f3a2bb27b7033c1399d..5487cc4224ca1f3dd6e3b2cac2e683f2d1664b5c 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -3,12 +3,17 @@ pin::Pin, sync::{Arc, RwLock}, }; +use chrono::{ + DateTime, + Utc, +}; use rust_rocksdb::{ + backup::{BackupEngine, BackupEngineOptions}, LogLevel::{Debug, Error, Fatal, Info, Warn}, WriteBatchWithTransaction, }; -use tracing::{debug, info}; +use tracing::{debug, error, info}; use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use crate::{utils, Result}; @@ -221,6 +226,68 @@ fn cleanup(&self) -> Result<()> { Ok(()) } + fn backup(&self) -> Result<(), Box<dyn std::error::Error>> { + let path = self.config.database_backup_path.as_ref(); + if path.is_none() || path.is_some_and(String::is_empty) { + return Ok(()); + } + + let options = BackupEngineOptions::new(&path.unwrap())?; + let mut engine = BackupEngine::open(&options, &self.env)?; + let ret = if self.config.database_backups_to_keep > 0 { + match engine.create_new_backup_flush(&self.rocks, true) { + Err(e) => return Err(Box::new(e)), + Ok(_) => { + let _info = engine.get_backup_info(); + let info = &_info.last().unwrap(); + info!( + "Created database backup #{} using {} bytes in {} files", + info.backup_id, + info.size, + info.num_files, + ); + Ok(()) + }, + } + } else { + Ok(()) + }; + + if self.config.database_backups_to_keep >= 0 { + let keep = u32::try_from(self.config.database_backups_to_keep)?; + if let Err(e) = engine.purge_old_backups(keep.try_into()?) { + error!("Failed to purge old backup: {:?}", e.to_string()) + } + } + + ret + } + + fn backup_list(&self) -> Result<String> { + let path = self.config.database_backup_path.as_ref(); + if path.is_none() || path.is_some_and(String::is_empty) { + return Ok("Configure database_backup_path to enable backups".to_owned()); + } + + let mut res = String::new(); + let options = BackupEngineOptions::new(&path.unwrap())?; + let engine = BackupEngine::open(&options, &self.env)?; + for info in engine.get_backup_info() { + std::fmt::write(&mut res, format_args!( + "#{} {}: {} bytes, {} files\n", + info.backup_id, + DateTime::<Utc>::from_timestamp(info.timestamp, 0) + .unwrap() + .to_rfc2822(), + info.size, + info.num_files, + )) + .unwrap(); + } + + Ok(res) + } + // TODO: figure out if this is needed for rocksdb #[allow(dead_code)] fn clear_caches(&self) {} diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 48e55578e4bd086e96ae82c3380c976e33949ec2..273442facc6115b0846d88c5d05675ed6fcf8421 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -272,4 +272,12 @@ fn bump_database_version(&self, new_version: u64) -> Result<()> { self.global.insert(b"version", &new_version.to_be_bytes())?; Ok(()) } + + fn backup(&self) -> Result<(), Box<dyn std::error::Error>> { + self.db.backup() + } + + fn backup_list(&self) -> Result<String> { + self.db.backup_list() + } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index deca697c629dd68b5515a7cc35c4bc6235973e13..11fc58c1551940290f62ef67a55ca0c2ce5e8032 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -436,6 +436,12 @@ enum ServerCommand { ClearServiceCaches { amount: u32, }, + + /// - Backup the database + BackupDatabase, + + /// - List database backups + ListBackups, } #[derive(Debug)] @@ -1866,6 +1872,27 @@ async fn process_admin_command(&self, command: AdminCommand, body: Vec<&str>) -> RoomMessageEventContent::text_plain("Done.") }, + ServerCommand::ListBackups => { + let result = services().globals.db.backup_list()?; + + RoomMessageEventContent::text_plain(result) + }, + ServerCommand::BackupDatabase => { + let mut result = tokio::task::spawn_blocking(move || { + match services().globals.db.backup() { + Ok(_) => String::new(), + Err(e) => (*e).to_string(), + } + }) + .await + .unwrap(); + + if result.is_empty() { + result = services().globals.db.backup_list()?; + } + + RoomMessageEventContent::text_plain(&result) + }, }, AdminCommand::Debug(command) => match command { DebugCommand::GetAuthChain { diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index afcd56a69733fb789dcd589f81921bbc3f88924e..a9cda9e12d188da61ed4b542c00cde29b2118408 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::error::Error; use async_trait::async_trait; use ruma::{ @@ -32,4 +33,6 @@ fn add_signing_key( fn signing_keys_for(&self, origin: &ServerName) -> Result<BTreeMap<OwnedServerSigningKeyId, VerifyKey>>; fn database_version(&self) -> Result<u64>; fn bump_database_version(&self, new_version: u64) -> Result<()>; + fn backup(&self) -> Result<(), Box<dyn Error>> { unimplemented!() } + fn backup_list(&self) -> Result<String> { Ok(String::new()) } }