diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 325a2e2f2b2a6c6e6eddedcac1785dc1a60f6ac3..665e3287f71a7601cda828aa0c387046c53f4303 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -397,6 +397,24 @@ pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<Vec<EventId>> {
         Ok(events)
     }
 
+    /// Force an update to the leaves of a room.
+    pub fn force_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> {
+        let mut prefix = room_id.as_bytes().to_vec();
+        prefix.push(0xff);
+
+        for key in self.roomid_pduleaves.scan_prefix(&prefix).keys() {
+            self.roomid_pduleaves.remove(key?)?;
+        }
+
+        for event_id in event_ids.iter() {
+            let mut key = prefix.to_owned();
+            key.extend_from_slice(event_id.as_bytes());
+            self.roomid_pduleaves.insert(&key, event_id.as_bytes())?;
+        }
+
+        Ok(())
+    }
+
     /// Replace the leaves of a room with a new event.
     pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> {
         let mut prefix = room_id.as_bytes().to_vec();
diff --git a/src/pdu.rs b/src/pdu.rs
index 340ddee563e26991d190fde4bc0ed37f00ec0b02..e38410fd9b2ef59c0667d3ee481a64a9036ab6a0 100644
--- a/src/pdu.rs
+++ b/src/pdu.rs
@@ -9,7 +9,7 @@
 };
 use serde::{Deserialize, Serialize};
 use serde_json::json;
-use std::{collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH};
+use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH};
 
 #[derive(Clone, Deserialize, Serialize, Debug)]
 pub struct PduEvent {
@@ -284,6 +284,25 @@ fn unsigned(&self) -> &BTreeMap<String, serde_json::Value> {
     }
 }
 
+// These impl's allow us to dedup state snapshots when resolving state
+// for incoming events (federation/send/{txn}).
+impl Eq for PduEvent {}
+impl PartialEq for PduEvent {
+    fn eq(&self, other: &Self) -> bool {
+        self.event_id == other.event_id
+    }
+}
+impl PartialOrd for PduEvent {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        self.event_id.partial_cmp(&other.event_id)
+    }
+}
+impl Ord for PduEvent {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.event_id.cmp(&other.event_id)
+    }
+}
+
 /// Generates a correct eventId for the incoming pdu.
 ///
 /// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`.
diff --git a/src/server_server.rs b/src/server_server.rs
index 0eb7d6f59df5fdd6de9d68458efc6b0328dba5b4..16a1a8e9e825eaf1a3cdca6bbc8dfc345e7ec8cf 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -5,7 +5,6 @@
 use rocket::{get, post, put, response::content::Json, State};
 use ruma::{
     api::{
-        client::r0::state,
         federation::{
             directory::{get_public_rooms, get_public_rooms_filtered},
             discovery::{
@@ -25,7 +24,7 @@
 };
 use state_res::{Event, EventMap, StateMap};
 use std::{
-    collections::{BTreeMap, BTreeSet},
+    collections::{BTreeMap, BTreeSet, HashSet},
     convert::TryFrom,
     fmt::Debug,
     future::Future,
@@ -600,31 +599,21 @@ pub async fn send_transaction_message_route<'a>(
 
         let server_name = &body.body.origin;
         let mut pub_key_map = BTreeMap::new();
-        if let Some(sig) = value.get("signatures") {
-            match sig {
-                CanonicalJsonValue::Object(entity) => {
-                    for key in entity.keys() {
-                        // TODO: save this in a DB maybe...
-                        // fetch the public signing key
-                        let origin = <&ServerName>::try_from(key.as_str()).unwrap();
-                        let keys = fetch_signing_keys(&db, origin).await?;
-
-                        pub_key_map.insert(
-                            origin.to_string(),
-                            keys.into_iter()
-                                .map(|(k, v)| (k.to_string(), v.key))
-                                .collect(),
-                        );
-                    }
-                }
-                _ => {
-                    resolved_map.insert(
-                        event_id,
-                        Err("`signatures` is not a JSON object".to_string()),
-                    );
-                    continue;
-                }
-            }
+
+        if let Some(CanonicalJsonValue::String(sender)) = value.get("sender") {
+            let sender =
+                UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field");
+            let origin = sender.server_name();
+
+            // TODO: this could fail or the server not respond...
+            let keys = fetch_signing_keys(&db, origin).await?;
+
+            pub_key_map.insert(
+                origin.to_string(),
+                keys.into_iter()
+                    .map(|(k, v)| (k.to_string(), v.key))
+                    .collect(),
+            );
         } else {
             resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string()));
             continue;
@@ -642,8 +631,9 @@ pub async fn send_transaction_message_route<'a>(
         // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
         // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
         // 7. if not timeline event: stop
-        // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
-        let (pdu, previous) = match validate_event(
+        // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
+        // the events found in step 8 can be authed/resolved and appended to the DB
+        let (pdu, previous): (_, Vec<Arc<PduEvent>>) = match validate_event(
             &db,
             value,
             event_id.clone(),
@@ -670,6 +660,9 @@ pub async fn send_transaction_message_route<'a>(
         // 6. persist the event as an outlier.
         db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?;
 
+        // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
+        // the checks in this list starting at 1. These are not timeline events.
+        //
         // Step 10. check the auth of the event passes based on the calculated state of the event
         let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
             match db
@@ -771,8 +764,12 @@ pub async fn send_transaction_message_route<'a>(
             );
         };
 
-        // Gather the forward extremities and resolve
-        let fork_states = match forward_extremities(
+        // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
+        // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
+        //
+        // calculate_forward_extremities takes care of adding the current state if not already in the state sets
+        // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
+        let (mut fork_states, fork_ids) = match calculate_forward_extremities(
             &db,
             &pdu,
             server_name,
@@ -788,6 +785,12 @@ pub async fn send_transaction_message_route<'a>(
             }
         };
 
+        // add the incoming events to the mix of state snapshots
+        // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
+        fork_states.insert(state_at_event.clone());
+
+        let fork_states = fork_states.into_iter().collect::<Vec<_>>();
+
         // 13. start state-res with all previous forward extremities minus the ones that are in
         // the prev_events of this event plus the new one created by this event and use
         // the result as the new room state
@@ -901,7 +904,9 @@ pub async fn send_transaction_message_route<'a>(
                 Err("Event has been soft failed".into()),
             );
         } else {
-            append_state(&db, &pdu)?;
+            // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
+            append_state(&db, &pdu, &fork_ids)?;
+
             // Event has passed all auth/stateres checks
             resolved_map.insert(pdu.event_id().clone(), Ok(()));
         }
@@ -1106,25 +1111,52 @@ async fn fetch_signing_keys(
 /// Gather all state snapshots needed to resolve the current state of the room.
 ///
 /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
-/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
-async fn forward_extremities(
+/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
+///
+/// The state snapshot of the incoming event __needs__ to be added to the resulting list.
+async fn calculate_forward_extremities(
     db: &Database,
     pdu: &PduEvent,
     origin: &ServerName,
     pub_key_map: &PublicKeyMap,
     auth_cache: &mut EventMap<Arc<PduEvent>>,
-) -> Result<Vec<StateMap<Arc<PduEvent>>>> {
+) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> {
     let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
 
+    let mut is_incoming_leaf = true;
+    // Make sure the incoming event is not already a forward extremity
+    // FIXME: I think this could happen if different servers send us the same event??
+    if current_leaves.contains(pdu.event_id()) {
+        is_incoming_leaf = false;
+        // Not sure what to do here
+    }
+
+    // If the incoming event is already referenced by an existing event
+    // then do nothing - it's not a candidate to be a new extremity if
+    // it has been referenced.
+    if already_referenced(db, pdu)? {
+        is_incoming_leaf = false;
+        // This event has been dealt with already??
+    }
+
+    // TODO:
+    // [dendrite] Checks if any other leaves have been referenced and removes them
+    // but as long as we update the pdu leaves here and for events on our server this
+    // should not be possible.
+
+    // Remove any forward extremities that are referenced by this incoming events prev_events
     for incoming_leaf in &pdu.prev_events {
-        if !current_leaves.contains(incoming_leaf) {
-            current_leaves.push(incoming_leaf.clone());
+        if current_leaves.contains(incoming_leaf) {
+            if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) {
+                current_leaves.remove(pos);
+            }
         }
     }
 
     let current_hash = db.rooms.current_state_hash(pdu.room_id())?;
+
     let mut includes_current_state = false;
-    let mut fork_states = vec![];
+    let mut fork_states = BTreeSet::new();
     for id in &current_leaves {
         if let Some(id) = db.rooms.get_pdu_id(id)? {
             let state_hash = db
@@ -1142,8 +1174,10 @@ async fn forward_extremities(
                 .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
                 .collect();
 
-            fork_states.push(state);
+            fork_states.insert(state);
         } else {
+            error!("Forward extremity not found... {}", id);
+
             let res = db
                 .sending
                 .send_federation_request(
@@ -1166,25 +1200,37 @@ async fn forward_extremities(
                 .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
                 .collect();
 
-            fork_states.push(state);
+            fork_states.insert(state);
         }
     }
 
+    // Add the incoming event only if it is a leaf, we do this after fetching all the
+    // state since we know we have already fetched the state of the incoming event so lets
+    // not do it again!
+    if is_incoming_leaf {
+        current_leaves.push(pdu.event_id().clone());
+    }
+
     // This guarantees that our current room state is included
     if !includes_current_state && current_hash.is_some() {
-        fork_states.push(
+        fork_states.insert(
             db.rooms
                 .state_full(pdu.room_id(), current_hash.as_ref().unwrap())?
                 .into_iter()
                 .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
                 .collect(),
-        )
+        );
     }
 
-    Ok(fork_states)
+    Ok((fork_states, dbg!(current_leaves)))
+}
+
+/// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG)
+fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result<bool> {
+    Ok(false)
 }
 
-fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
+fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> {
     let count = db.globals.next_count()?;
     let mut pdu_id = pdu.room_id.as_bytes().to_vec();
     pdu_id.push(0xff);
@@ -1195,13 +1241,17 @@ fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
 
     db.rooms.append_pdu(
-        &pdu,
+        pdu,
         utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
         count,
         pdu_id.clone().into(),
         &db,
     )?;
 
+    // If we update the room leaves after calling append_pdu it will stick since append_pdu
+    // calls replace_pdu_leaves with only the given event.
+    db.rooms.force_pdu_leaves(pdu.room_id(), new_room_leaves)?;
+
     // We set the room state after inserting the pdu, so that we never have a moment in time
     // where events in the current room state do not exist
     db.rooms.set_room_state(&pdu.room_id, &statehashid)?;