diff --git a/Cargo.lock b/Cargo.lock index 2ca3f3add150753258b9d5bb903267ec9ecb4812..e95a75116ed9a0064364413423c405a535182a18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,6 +433,7 @@ dependencies = [ "hmac", "http", "hyper", + "hyperlocal", "image", "jsonwebtoken", "lazy_static", @@ -1058,6 +1059,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.12.1" @@ -1156,6 +1163,17 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyperlocal" +version = "0.8.0" +source = "git+https://github.com/softprops/hyperlocal?rev=2ee4d149644600d326559af0d2b235c945b05c04#2ee4d149644600d326559af0d2b235c945b05c04" +dependencies = [ + "hex", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "idna" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index a9f0d5d5c248f08adca5670719187f4e6eb3d59b..4169fd4b533db835610ec3d2c00a046104af7f4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,10 @@ ruma = { git = "https://github.com/ruma/ruma", rev = "b4853aa8fa5e3a24e3689fc880 #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-msc3575", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] } # Async runtime and utilities +hyperlocal = { git = "https://github.com/softprops/hyperlocal", rev = "2ee4d149644600d326559af0d2b235c945b05c04", features = [ + "server", +] } +hyper = { version = "0.14", features = ["server", "http1"] } tokio = { version = "1.28.1", features = ["fs", "macros", "signal", "sync"] } # Used for storing data permanently #sled = { version = "0.34.7", features = ["compression", "no_metrics"], optional = true } @@ -52,8 +56,6 @@ serde = { version = "1.0.163", features = ["rc"] } rand = "0.8.5" # Used to hash passwords rust-argon2 = "2.0.0" -# Used to send requests -hyper = "0.14.26" reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls-native-roots", "socks"] } # Used for conduit::Error type thiserror = "1.0.40" diff --git a/DEPLOY.md b/DEPLOY.md index cb318eeeae9d8048c01f6d75814ddf416116999f..74d4bbaa61225bebb8dcde6d43a297a18c938372 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -118,6 +118,8 @@ ## Setting up a systemd service Environment="CONDUIT_CONFIG=/etc/matrix-conduit/conduit.toml" User=conduit Group=conduit +RuntimeDirectory=conduit +RuntimeDirectoryMode=0750 Restart=always ExecStart=/usr/local/bin/matrix-conduit @@ -223,9 +225,15 @@ # paste into httpd-ssl.conf or httpd.conf ServerName your.server.name # EDIT THIS AllowEncodedSlashes NoDecode + +# TCP ProxyPass /_matrix/ http://127.0.0.1:6167/_matrix/ timeout=300 nocanon ProxyPassReverse /_matrix/ http://127.0.0.1:6167/_matrix/ +# UNIX socket +#ProxyPass /_matrix/ unix:/run/conduit/conduit.sock|http://127.0.0.1:6167/_matrix/ nocanon +#ProxyPassReverse /_matrix/ unix:/run/conduit/conduit.sock|http://127.0.0.1:6167/_matrix/ + </VirtualHost> ``` @@ -245,7 +253,11 @@ ### Caddy ```caddy your.server.name, your.server.name:8448 { + # TCP reverse_proxy /_matrix/* 127.0.0.1:6167 + + # UNIX socket + #reverse_proxy /_matrix/* unix//run/conduit/conduit.sock } ``` @@ -272,8 +284,18 @@ ### Nginx # Increase this to allow posting large files such as videos client_max_body_size 20M; + # UNIX socket + #upstream backend { + # server unix:/run/conduit/conduit.sock; + #} + location /_matrix/ { + # TCP proxy_pass http://127.0.0.1:6167$request_uri; + + # UNIX socket + #proxy_pass http://backend; + proxy_set_header Host $http_host; proxy_buffering off; proxy_read_timeout 5m; diff --git a/conduit-example.toml b/conduit-example.toml index e8d4b08de17c5bf65269bdcb92109bc3e954130e..787d683e8b53e460bf5aecf42db05862c5737c08 100644 --- a/conduit-example.toml +++ b/conduit-example.toml @@ -68,3 +68,10 @@ allow_public_room_directory_without_auth = false # Set this to true to allow federating device display names / allow external users to see your device display name. # If federation is disabled entirely (`allow_federation`), this is inherently false. allow_device_name_federation = false + +# Uncomment unix_socket_path to listen on a UNIX socket at the specified path. +# If listening on a UNIX socket, you must remove the 'address' key if defined and add your +# reverse proxy (nginx/Caddy/Apache/etc) to the 'conduit' group, unless world RW +# permissions are specified with unix_socket_perms (666 minimum). +#unix_socket_path = "/run/conduit/conduit.sock" +#unix_socket_perms = 660 diff --git a/src/config/mod.rs b/src/config/mod.rs index 48efd638f72226bdbf2c2319f85a721f6805cb54..63e9bba3f275af9c8674abf96bb5b3abbf831799 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -2,11 +2,13 @@ collections::BTreeMap, fmt, net::{IpAddr, Ipv4Addr}, + path::PathBuf, }; +use figment::Figment; use ruma::{OwnedServerName, RoomVersionId}; use serde::{de::IgnoredAny, Deserialize}; -use tracing::warn; +use tracing::{error, warn}; mod proxy; @@ -19,7 +21,9 @@ pub struct Config { #[serde(default = "default_port")] pub port: u16, pub tls: Option<TlsConfig>, - + pub unix_socket_path: Option<PathBuf>, + #[serde(default = "default_unix_socket_perms")] + pub unix_socket_perms: u32, pub server_name: OwnedServerName, #[serde(default = "default_database_backend")] pub database_backend: String, @@ -108,7 +112,7 @@ pub fn warn_deprecated(&self) { .keys() .filter(|key| DEPRECATED_KEYS.iter().any(|s| s == key)) { - warn!("Config parameter {} is deprecated", key); + warn!("Config parameter \"{}\" is deprecated.", key); was_deprecated = true; } @@ -116,6 +120,19 @@ pub fn warn_deprecated(&self) { warn!("Read conduit documentation and check your configuration if any new configuration parameters should be adjusted"); } } + + /// Checks the presence of the `address` and `unix_socket_path` keys in the raw_config, exiting the process if both keys were detected. + pub fn error_dual_listening(&self, raw_config: Figment) -> Result<(), ()> { + let check_address = raw_config.find_value("address"); + let check_unix_socket = raw_config.find_value("unix_socket_path"); + + if check_address.is_ok() && check_unix_socket.is_ok() { + error!("TOML keys \"address\" and \"unix_socket_path\" were both defined. Please specify only one option."); + return Err(()); + } + + Ok(()) + } } impl fmt::Display for Config { @@ -241,6 +258,10 @@ fn default_port() -> u16 { 8000 } +fn default_unix_socket_perms() -> u32 { + 660 +} + fn default_database_backend() -> String { "sqlite".to_owned() } diff --git a/src/database/mod.rs b/src/database/mod.rs index e247d9f0c53584d07920d2d871d7f9357e6a4a4c..ebe944c323c4a040797eb707b8c83189875c0068 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1071,33 +1071,46 @@ pub async fn start_cleanup_task() { let timer_interval = Duration::from_secs(services().globals.config.cleanup_second_interval as u64); + fn perform_cleanup() { + let start = Instant::now(); + if let Err(e) = services().globals.cleanup() { + error!(target: "database-cleanup", "Ran into an error during cleanup: {}", e); + } else { + debug!(target: "database-cleanup", "Finished cleanup in {:#?}.", start.elapsed()); + } + } + tokio::spawn(async move { let mut i = interval(timer_interval); #[cfg(unix)] - let mut s = signal(SignalKind::hangup()).unwrap(); + let mut hangup = signal(SignalKind::hangup()).unwrap(); + let mut ctrl_c = signal(SignalKind::interrupt()).unwrap(); + let mut terminate = signal(SignalKind::terminate()).unwrap(); loop { #[cfg(unix)] tokio::select! { _ = i.tick() => { - debug!("cleanup: Timer ticked"); + debug!(target: "database-cleanup", "Timer ticked"); + } + _ = hangup.recv() => { + debug!(target: "database-cleanup","Received SIGHUP"); } - _ = s.recv() => { - debug!("cleanup: Received SIGHUP"); + _ = ctrl_c.recv() => { + debug!(target: "database-cleanup", "Received Ctrl+C, performing last cleanup"); + perform_cleanup(); + } + _ = terminate.recv() => { + debug!(target: "database-cleanup","Received SIGTERM, performing last cleanup"); + perform_cleanup(); } }; #[cfg(not(unix))] { i.tick().await; - debug!("cleanup: Timer ticked") - } - - let start = Instant::now(); - if let Err(e) = services().globals.cleanup() { - error!("cleanup: Errored: {}", e); - } else { - debug!("cleanup: Finished in {:?}", start.elapsed()); + debug!(target: "database-cleanup", "Timer ticked") } + perform_cleanup(); } }); } diff --git a/src/main.rs b/src/main.rs index c74d6ddb959de5e5d3460377f0ce2365e08cd470..06a5a3cafa1014c3b4b77270bdbe3decbeb53b49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,10 @@ #![allow(clippy::suspicious_else_formatting)] #![deny(clippy::dbg_macro)] -use std::{future::Future, io, net::SocketAddr, sync::atomic, time::Duration}; +use std::{ + fs::Permissions, future::Future, io, net::SocketAddr, os::unix::fs::PermissionsExt, + sync::atomic, time::Duration, +}; use axum::{ extract::{DefaultBodyLimit, FromRequestParts, MatchedPath}, @@ -26,6 +29,8 @@ header::{self, HeaderName}, Method, StatusCode, Uri, }; +use hyper::Server; +use hyperlocal::SocketIncoming; use ruma::api::{ client::{ error::{Error as RumaError, ErrorBody, ErrorKind}, @@ -33,7 +38,7 @@ }, IncomingRequest, }; -use tokio::signal; +use tokio::{net::UnixListener, signal, sync::oneshot}; use tower::ServiceBuilder; use tower_http::{ cors::{self, CorsLayer}, @@ -43,6 +48,8 @@ use tracing::{debug, error, info, warn}; use tracing_subscriber::{prelude::*, EnvFilter}; +use tokio::sync::oneshot::Sender; + pub use conduit::*; // Re-export everything from the library crate #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] @@ -69,12 +76,10 @@ async fn main() { Ok(s) => s, Err(e) => { eprintln!("It looks like your config is invalid. The following error occurred: {e}"); - std::process::exit(1); + return; } }; - config.warn_deprecated(); - let log = format!("{},ruma_state_res=error,_=off,sled=off", config.log); if config.allow_jaeger { @@ -135,11 +140,15 @@ async fn main() { #[cfg(unix)] maximize_fd_limit().expect("should be able to increase the soft limit to the hard limit"); + config.warn_deprecated(); + if let Err(_) = config.error_dual_listening(raw_config) { + return; + }; + info!("Loading database"); if let Err(error) = KeyValueDatabase::load_or_create(config).await { error!(?error, "The database couldn't be loaded or created"); - - std::process::exit(1); + return; }; let config = &services().globals.config; @@ -200,26 +209,57 @@ async fn run_server() -> io::Result<()> { let app = routes().layer(middlewares).into_make_service(); let handle = ServerHandle::new(); + let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shutdown_signal(handle.clone())); + tokio::spawn(shutdown_signal(handle.clone(), tx)); - match &config.tls { - Some(tls) => { - let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?; - let server = bind_rustls(addr, conf).handle(handle).serve(app); + if let Some(path) = &config.unix_socket_path { + if path.exists() { + warn!( + "UNIX socket path {:#?} already exists (unclean shutdown?), attempting to remove it.", + path.display() + ); + tokio::fs::remove_file(&path).await?; + } - #[cfg(feature = "systemd")] - let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; - server.await? - } - None => { - let server = bind(addr).handle(handle).serve(app); + let socket_perms = config.unix_socket_perms.to_string(); + let octal_perms = u32::from_str_radix(&socket_perms, 8).unwrap(); - #[cfg(feature = "systemd")] - let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + let listener = UnixListener::bind(path.clone()).unwrap(); + tokio::fs::set_permissions(path, Permissions::from_mode(octal_perms)) + .await + .unwrap(); + let socket = SocketIncoming::from_listener(listener); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + let server = Server::builder(socket).serve(app); + let graceful = server.with_graceful_shutdown(async { + rx.await.ok(); + }); + + if let Err(e) = graceful.await { + error!("Server error: {:?}", e); + } + } else { + match &config.tls { + Some(tls) => { + let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?; + let server = bind_rustls(addr, conf).handle(handle).serve(app); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + server.await? + } + None => { + let server = bind(addr).handle(handle).serve(app); - server.await? + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + server.await? + } } } @@ -439,7 +479,7 @@ fn routes() -> Router { .fallback(not_found) } -async fn shutdown_signal(handle: ServerHandle) { +async fn shutdown_signal(handle: ServerHandle, tx: Sender<()>) -> Result<()> { let ctrl_c = async { signal::ctrl_c() .await @@ -471,6 +511,9 @@ async fn shutdown_signal(handle: ServerHandle) { #[cfg(feature = "systemd")] let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]); + tx.send(()).unwrap(); + + Ok(()) } async fn not_found(uri: Uri) -> impl IntoResponse { diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 9178971121c7d83423b1f00b4b60e1ad2c543272..4a84d08a55b6c8e46c2926fd0ed8cca33ee3dcf4 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -435,9 +435,26 @@ pub fn well_known_client(&self) -> &Option<String> { &self.config.well_known_client } + pub fn unix_socket_path(&self) -> &Option<PathBuf> { + &self.config.unix_socket_path + } + pub fn shutdown(&self) { self.shutdown.store(true, atomic::Ordering::Relaxed); // On shutdown + + if self.unix_socket_path().is_some() { + match &self.unix_socket_path() { + Some(path) => { + std::fs::remove_file(path.to_owned()).unwrap(); + } + None => error!( + "Unable to remove socket file at {:?} during shutdown.", + &self.unix_socket_path() + ), + }; + }; + info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers..."); services().globals.rotate.fire(); }