Skip to content
Snippets Groups Projects
Unverified Commit 9439f2c1 authored by Timo Kösters's avatar Timo Kösters
Browse files

feat: send logs into admin room

Log entries will automatically be deduplicated, so a message won't be
sent if the same line has already been sent in the last 30 mins
parent ecea0d4a
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ impl Database { ...@@ -39,7 +39,7 @@ impl Database {
/// Tries to remove the old database but ignores all errors. /// Tries to remove the old database but ignores all errors.
pub fn try_remove(server_name: &str) -> Result<()> { pub fn try_remove(server_name: &str) -> Result<()> {
let mut path = ProjectDirs::from("xyz", "koesters", "conduit") let mut path = ProjectDirs::from("xyz", "koesters", "conduit")
.ok_or(Error::BadConfig( .ok_or_else(|| Error::bad_config(
"The OS didn't return a valid home directory path.", "The OS didn't return a valid home directory path.",
))? ))?
.data_dir() .data_dir()
...@@ -59,7 +59,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> { ...@@ -59,7 +59,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> {
.map(|x| Ok::<_, Error>(x.to_owned())) .map(|x| Ok::<_, Error>(x.to_owned()))
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
let path = ProjectDirs::from("xyz", "koesters", "conduit") let path = ProjectDirs::from("xyz", "koesters", "conduit")
.ok_or(Error::BadConfig( .ok_or_else(|| Error::bad_config(
"The OS didn't return a valid home directory path.", "The OS didn't return a valid home directory path.",
))? ))?
.data_dir() .data_dir()
...@@ -67,7 +67,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> { ...@@ -67,7 +67,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> {
Ok(path Ok(path
.to_str() .to_str()
.ok_or(Error::BadConfig("Database path contains invalid unicode."))? .ok_or_else(|| Error::bad_config("Database path contains invalid unicode."))?
.to_owned()) .to_owned())
})?; })?;
...@@ -79,7 +79,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> { ...@@ -79,7 +79,7 @@ pub fn load_or_create(config: &Config) -> Result<Self> {
.get_int("cache_capacity") .get_int("cache_capacity")
.unwrap_or(1024 * 1024 * 1024), .unwrap_or(1024 * 1024 * 1024),
) )
.map_err(|_| Error::BadConfig("Cache capacity needs to be a u64."))?, .map_err(|_| Error::bad_config("Cache capacity needs to be a u64."))?,
) )
.print_profile_on_drop(false) .print_profile_on_drop(false)
.open()?; .open()?;
......
...@@ -62,12 +62,12 @@ pub fn load(globals: sled::Tree, config: &rocket::Config) -> Result<Self> { ...@@ -62,12 +62,12 @@ pub fn load(globals: sled::Tree, config: &rocket::Config) -> Result<Self> {
.unwrap_or("localhost") .unwrap_or("localhost")
.to_string() .to_string()
.try_into() .try_into()
.map_err(|_| Error::BadConfig("Invalid server_name."))?, .map_err(|_| Error::bad_config("Invalid server_name."))?,
max_request_size: config max_request_size: config
.get_int("max_request_size") .get_int("max_request_size")
.unwrap_or(20 * 1024 * 1024) // Default to 20 MB .unwrap_or(20 * 1024 * 1024) // Default to 20 MB
.try_into() .try_into()
.map_err(|_| Error::BadConfig("Invalid max_request_size."))?, .map_err(|_| Error::bad_config("Invalid max_request_size."))?,
registration_disabled: config.get_bool("registration_disabled").unwrap_or(false), registration_disabled: config.get_bool("registration_disabled").unwrap_or(false),
encryption_disabled: config.get_bool("encryption_disabled").unwrap_or(false), encryption_disabled: config.get_bool("encryption_disabled").unwrap_or(false),
federation_enabled: config.get_bool("federation_enabled").unwrap_or(false), federation_enabled: config.get_bool("federation_enabled").unwrap_or(false),
......
use std::{time::Duration, collections::HashMap, sync::RwLock, time::Instant};
use log::error; use log::error;
use ruma::api::client::{error::ErrorKind, r0::uiaa::UiaaInfo}; use ruma::{
api::client::{error::ErrorKind, r0::uiaa::UiaaInfo},
events::room::message,
};
use thiserror::Error; use thiserror::Error;
use crate::{database::admin::AdminCommand, Database};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use { use {
crate::RumaResponse, crate::RumaResponse,
...@@ -53,6 +60,11 @@ pub fn bad_database(message: &'static str) -> Self { ...@@ -53,6 +60,11 @@ pub fn bad_database(message: &'static str) -> Self {
error!("BadDatabase: {}", message); error!("BadDatabase: {}", message);
Self::BadDatabase(message) Self::BadDatabase(message)
} }
pub fn bad_config(message: &'static str) -> Self {
error!("BadConfig: {}", message);
Self::BadConfig(message)
}
} }
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
...@@ -95,3 +107,50 @@ fn respond_to(self, r: &'r Request<'_>) -> response::Result<'o> { ...@@ -95,3 +107,50 @@ fn respond_to(self, r: &'r Request<'_>) -> response::Result<'o> {
.respond_to(r) .respond_to(r)
} }
} }
pub struct ConduitLogger {
pub db: Database,
pub last_logs: RwLock<HashMap<String, Instant>>,
}
impl log::Log for ConduitLogger {
fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
true
}
fn log(&self, record: &log::Record<'_>) {
let output = format!("{} - {}", record.level(), record.args());
println!("{}", output);
if self.enabled(record.metadata())
&& record
.module_path()
.map_or(false, |path| path.starts_with("conduit::"))
{
if self
.last_logs
.read()
.unwrap()
.get(&output)
.map_or(false, |i| i.elapsed() < Duration::from_secs(60 * 30))
{
return;
}
if let Ok(mut_last_logs) = &mut self.last_logs.try_write() {
mut_last_logs.insert(output.clone(), Instant::now());
}
self.db.admin.send(AdminCommand::SendTextMessage(
message::TextMessageEventContent {
body: output,
formatted: None,
relates_to: None,
},
));
}
}
fn flush(&self) {}
}
...@@ -11,7 +11,8 @@ ...@@ -11,7 +11,8 @@
mod utils; mod utils;
pub use database::Database; pub use database::Database;
pub use error::{Error, Result}; pub use error::{ConduitLogger, Error, Result};
use log::LevelFilter;
pub use pdu::PduEvent; pub use pdu::PduEvent;
pub use rocket::State; pub use rocket::State;
pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse}; pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse};
...@@ -19,6 +20,9 @@ ...@@ -19,6 +20,9 @@
use rocket::{fairing::AdHoc, routes}; use rocket::{fairing::AdHoc, routes};
fn setup_rocket() -> rocket::Rocket { fn setup_rocket() -> rocket::Rocket {
// Force log level off, so we can use our own logger
std::env::set_var("ROCKET_LOG", "off");
rocket::ignite() rocket::ignite()
.mount( .mount(
"/", "/",
...@@ -133,6 +137,12 @@ fn setup_rocket() -> rocket::Rocket { ...@@ -133,6 +137,12 @@ fn setup_rocket() -> rocket::Rocket {
let data = Database::load_or_create(rocket.config().await).expect("valid config"); let data = Database::load_or_create(rocket.config().await).expect("valid config");
data.sending.start_handler(&data.globals, &data.rooms); data.sending.start_handler(&data.globals, &data.rooms);
log::set_boxed_logger(Box::new(ConduitLogger {
db: data.clone(),
last_logs: Default::default(),
}))
.unwrap();
log::set_max_level(LevelFilter::Info);
Ok(rocket.manage(data)) Ok(rocket.manage(data))
})) }))
...@@ -140,10 +150,5 @@ fn setup_rocket() -> rocket::Rocket { ...@@ -140,10 +150,5 @@ fn setup_rocket() -> rocket::Rocket {
#[rocket::main] #[rocket::main]
async fn main() { async fn main() {
// Default log level
if std::env::var("ROCKET_LOG").is_err() {
std::env::set_var("ROCKET_LOG", "critical");
}
setup_rocket().launch().await.unwrap(); setup_rocket().launch().await.unwrap();
} }
...@@ -97,7 +97,7 @@ fn from_data( ...@@ -97,7 +97,7 @@ fn from_data(
handle.read_to_end(&mut body).await.unwrap(); handle.read_to_end(&mut body).await.unwrap();
let http_request = http_request.body(body.clone()).unwrap(); let http_request = http_request.body(body.clone()).unwrap();
log::info!("{:?}", http_request); log::debug!("{:?}", http_request);
match <T as Outgoing>::Incoming::try_from(http_request) { match <T as Outgoing>::Incoming::try_from(http_request) {
Ok(t) => Success(Ruma { Ok(t) => Success(Ruma {
......
...@@ -58,12 +58,12 @@ pub async fn send_request<T: OutgoingRequest>( ...@@ -58,12 +58,12 @@ pub async fn send_request<T: OutgoingRequest>(
T: Debug, T: Debug,
{ {
if !globals.federation_enabled() { if !globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let resolver = AsyncResolver::tokio_from_system_conf() let resolver = AsyncResolver::tokio_from_system_conf()
.await .await
.map_err(|_| Error::BadConfig("Failed to set up trust dns resolver with system config."))?; .map_err(|_| Error::bad_config("Failed to set up trust dns resolver with system config."))?;
let mut host = None; let mut host = None;
...@@ -213,7 +213,7 @@ pub async fn send_request<T: OutgoingRequest>( ...@@ -213,7 +213,7 @@ pub async fn send_request<T: OutgoingRequest>(
#[cfg_attr(feature = "conduit_bin", get("/_matrix/federation/v1/version"))] #[cfg_attr(feature = "conduit_bin", get("/_matrix/federation/v1/version"))]
pub fn get_server_version(db: State<'_, Database>) -> ConduitResult<get_server_version::Response> { pub fn get_server_version(db: State<'_, Database>) -> ConduitResult<get_server_version::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
Ok(get_server_version::Response { Ok(get_server_version::Response {
...@@ -276,7 +276,7 @@ pub async fn get_public_rooms_filtered_route( ...@@ -276,7 +276,7 @@ pub async fn get_public_rooms_filtered_route(
body: Ruma<get_public_rooms_filtered::v1::Request<'_>>, body: Ruma<get_public_rooms_filtered::v1::Request<'_>>,
) -> ConduitResult<get_public_rooms_filtered::v1::Response> { ) -> ConduitResult<get_public_rooms_filtered::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let response = client_server::get_public_rooms_filtered_helper( let response = client_server::get_public_rooms_filtered_helper(
...@@ -323,7 +323,7 @@ pub async fn get_public_rooms_route( ...@@ -323,7 +323,7 @@ pub async fn get_public_rooms_route(
body: Ruma<get_public_rooms::v1::Request<'_>>, body: Ruma<get_public_rooms::v1::Request<'_>>,
) -> ConduitResult<get_public_rooms::v1::Response> { ) -> ConduitResult<get_public_rooms::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let response = client_server::get_public_rooms_filtered_helper( let response = client_server::get_public_rooms_filtered_helper(
...@@ -370,7 +370,7 @@ pub fn send_transaction_message_route<'a>( ...@@ -370,7 +370,7 @@ pub fn send_transaction_message_route<'a>(
body: Ruma<send_transaction_message::v1::Request<'_>>, body: Ruma<send_transaction_message::v1::Request<'_>>,
) -> ConduitResult<send_transaction_message::v1::Response> { ) -> ConduitResult<send_transaction_message::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
//dbg!(&*body); //dbg!(&*body);
...@@ -423,7 +423,7 @@ pub fn get_missing_events_route<'a>( ...@@ -423,7 +423,7 @@ pub fn get_missing_events_route<'a>(
body: Ruma<get_missing_events::v1::Request<'_>>, body: Ruma<get_missing_events::v1::Request<'_>>,
) -> ConduitResult<get_missing_events::v1::Response> { ) -> ConduitResult<get_missing_events::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let mut queued_events = body.latest_events.clone(); let mut queued_events = body.latest_events.clone();
...@@ -468,7 +468,7 @@ pub fn get_profile_information_route<'a>( ...@@ -468,7 +468,7 @@ pub fn get_profile_information_route<'a>(
body: Ruma<get_profile_information::v1::Request<'_>>, body: Ruma<get_profile_information::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> { ) -> ConduitResult<get_profile_information::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let mut displayname = None; let mut displayname = None;
...@@ -500,7 +500,7 @@ pub fn get_user_devices_route<'a>( ...@@ -500,7 +500,7 @@ pub fn get_user_devices_route<'a>(
body: Ruma<membership::v1::Request<'_>>, body: Ruma<membership::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> { ) -> ConduitResult<get_profile_information::v1::Response> {
if !db.globals.federation_enabled() { if !db.globals.federation_enabled() {
return Err(Error::BadConfig("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
let mut displayname = None; let mut displayname = None;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment