From c9c99746412155fcdce6a6430bd5ef9c567cc3fe Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Thu, 16 Dec 2021 14:52:19 +0100
Subject: [PATCH] fix: stack overflows when fetching auth events

---
 src/database/abstraction/rocksdb.rs |  18 ++--
 src/server_server.rs                | 145 +++++++++++++++-------------
 2 files changed, 88 insertions(+), 75 deletions(-)

diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs
index 3ff6ab861..825c02e09 100644
--- a/src/database/abstraction/rocksdb.rs
+++ b/src/database/abstraction/rocksdb.rs
@@ -22,14 +22,20 @@ impl DatabaseEngine for Engine {
     fn open(config: &Config) -> Result<Arc<Self>> {
         let mut db_opts = rocksdb::Options::default();
         db_opts.create_if_missing(true);
-        db_opts.set_max_open_files(16);
+        db_opts.set_max_open_files(512);
         db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
-        db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
-        db_opts.set_target_file_size_base(256 << 20);
-        db_opts.set_write_buffer_size(256 << 20);
+        db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
+        db_opts.set_target_file_size_base(2 << 22);
+        db_opts.set_max_bytes_for_level_base(2 << 24);
+        db_opts.set_max_bytes_for_level_multiplier(2.0);
+        db_opts.set_num_levels(8);
+        db_opts.set_write_buffer_size(2 << 27);
+
+        let rocksdb_cache = rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize).unwrap();
 
         let mut block_based_options = rocksdb::BlockBasedOptions::default();
-        block_based_options.set_block_size(512 << 10);
+        block_based_options.set_block_size(2 << 19);
+        block_based_options.set_block_cache(&rocksdb_cache);
         db_opts.set_block_based_table_factory(&block_based_options);
 
         let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
@@ -45,7 +51,6 @@ fn open(config: &Config) -> Result<Arc<Self>> {
                 let mut options = rocksdb::Options::default();
                 let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
                 options.set_prefix_extractor(prefix_extractor);
-                options.set_merge_operator_associative("increment", utils::increment_rocksdb);
 
                 rocksdb::ColumnFamilyDescriptor::new(name, options)
             }),
@@ -63,7 +68,6 @@ fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
             let mut options = rocksdb::Options::default();
             let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
             options.set_prefix_extractor(prefix_extractor);
-            options.set_merge_operator_associative("increment", utils::increment_rocksdb);
 
             let _ = self.rocks.create_cf(name, &options);
             println!("created cf");
diff --git a/src/server_server.rs b/src/server_server.rs
index 594152aed..d6bc9b91d 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu(
                 let mut starting_events = Vec::with_capacity(leaf_state.len());
 
                 for (k, id) in leaf_state {
-                    let k = db
-                        .rooms
-                        .get_statekey_from_short(k)
-                        .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
-
-                    state.insert(k, id.clone());
+                    if let Ok(k) = db.rooms.get_statekey_from_short(k) {
+                        state.insert(k, id.clone());
+                    } else {
+                        warn!("Failed to get_statekey_from_short.");
+                    }
                     starting_events.push(id);
                 }
 
@@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu(
                 .into_iter()
                 .map(|map| {
                     map.into_iter()
-                        .map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id)))
-                        .collect::<Result<StateMap<_>>>()
+                        .filter_map(|(k, id)| {
+                            db.rooms
+                                .get_statekey_from_short(k)
+                                .map(|k| (k, id))
+                                .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e))
+                                .ok()
+                        })
+                        .collect::<StateMap<_>>()
                 })
-                .collect::<Result<_>>()
-                .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
+                .collect();
 
             let state = match state_res::resolve(
                 room_version_id,
@@ -1871,73 +1875,78 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
             // a. Look in the main timeline (pduid_pdu tree)
             // b. Look at outlier pdu tree
             // (get_pdu_json checks both)
-            let local_pdu = db.rooms.get_pdu(id);
-            let pdu = match local_pdu {
-                Ok(Some(pdu)) => {
+            if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) {
+                trace!("Found {} in db", id);
+                pdus.push((local_pdu, None));
+            }
+
+            // c. Ask origin server over federation
+            // We also handle its auth chain here so we don't get a stack overflow in
+            // handle_outlier_pdu.
+            let mut todo_auth_events = vec![id];
+            let mut events_in_reverse_order = Vec::new();
+            while let Some(next_id) = todo_auth_events.pop() {
+                if let Ok(Some(_)) = db.rooms.get_pdu(next_id) {
                     trace!("Found {} in db", id);
-                    (pdu, None)
+                    continue;
                 }
-                Ok(None) => {
-                    // c. Ask origin server over federation
-                    warn!("Fetching {} over federation.", id);
-                    match db
-                        .sending
-                        .send_federation_request(
-                            &db.globals,
-                            origin,
-                            get_event::v1::Request { event_id: id },
-                        )
-                        .await
-                    {
-                        Ok(res) => {
-                            warn!("Got {} over federation", id);
-                            let (calculated_event_id, value) =
-                                match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
-                                    Ok(t) => t,
-                                    Err(_) => {
-                                        back_off((**id).to_owned());
-                                        continue;
-                                    }
-                                };
 
-                            if calculated_event_id != **id {
-                                warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}",
-                                    id, calculated_event_id, &res.pdu);
-                            }
-
-                            // This will also fetch the auth chain
-                            match handle_outlier_pdu(
-                                origin,
-                                create_event,
-                                id,
-                                room_id,
-                                value.clone(),
-                                db,
-                                pub_key_map,
-                            )
-                            .await
-                            {
-                                Ok((pdu, json)) => (pdu, Some(json)),
-                                Err(e) => {
-                                    warn!("Authentication of event {} failed: {:?}", id, e);
-                                    back_off((**id).to_owned());
+                warn!("Fetching {} over federation.", next_id);
+                match db
+                    .sending
+                    .send_federation_request(
+                        &db.globals,
+                        origin,
+                        get_event::v1::Request { event_id: next_id },
+                    )
+                    .await
+                {
+                    Ok(res) => {
+                        warn!("Got {} over federation", next_id);
+                        let (calculated_event_id, value) =
+                            match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
+                                Ok(t) => t,
+                                Err(_) => {
+                                    back_off((**next_id).to_owned());
                                     continue;
                                 }
-                            }
-                        }
-                        Err(_) => {
-                            warn!("Failed to fetch event: {}", id);
-                            back_off((**id).to_owned());
-                            continue;
+                            };
+
+                        if calculated_event_id != **next_id {
+                            warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}",
+                                next_id, calculated_event_id, &res.pdu);
                         }
+
+                        events_in_reverse_order.push((next_id, value));
+                    }
+                    Err(_) => {
+                        warn!("Failed to fetch event: {}", next_id);
+                        back_off((**next_id).to_owned());
                     }
                 }
-                Err(e) => {
-                    warn!("Error loading {}: {}", id, e);
-                    continue;
+            }
+
+            while let Some((next_id, value)) = events_in_reverse_order.pop() {
+                match handle_outlier_pdu(
+                    origin,
+                    create_event,
+                    next_id,
+                    room_id,
+                    value.clone(),
+                    db,
+                    pub_key_map,
+                )
+                .await
+                {
+                    Ok((pdu, json)) => {
+                        pdus.push((pdu, Some(json)));
+                    }
+                    Err(e) => {
+                        warn!("Authentication of event {} failed: {:?}", next_id, e);
+                        back_off((**next_id).to_owned());
+                    }
                 }
-            };
-            pdus.push(pdu);
+            }
         }
         pdus
     })
-- 
GitLab