Newer
Older
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
let is_authed = state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
if previous.len() == 1 {
previous.first().cloned()
} else {
None
},
&pdu.auth_events
.iter()
.map(|id| {
auth_cache
.get(id)
.map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
.ok_or_else(|| {
"Auth event not found, event failed recursive auth checks.".to_string()
})
})
.collect::<StdResult<BTreeMap<_, _>, _>>()?,
None, // TODO: third party invite
)
.map_err(|_e| "Auth check failed".to_string())?;
if !is_authed {
return Err("Event has failed auth check with auth events".to_string());
}
Ok((pdu, previous))
})
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
/// events `auth_events`. If the chain is found to have any missing events it fails.
async fn fetch_check_auth_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
event_ids: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<()> {
let mut stack = event_ids.to_vec();
// DFS for auth event chain
while !stack.is_empty() {
let ev_id = stack.pop().unwrap();
if auth_cache.contains_key(&ev_id) {
continue;
}
// TODO: Batch these async calls so we can wait on multiple at once
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await
.map(|mut vec| {
vec.pop()
.ok_or_else(|| Error::Conflict("Event was not found in fetch_events"))
})??;
stack.extend(ev.auth_events());
}
Ok(())
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// 0. Look in the auth_cache
/// 1. Look in the main timeline (pduid_pdu tree)
/// 2. Look at outlier pdu tree
/// 3. Ask origin server over federation
/// 4. TODO: Ask other servers over federation?
///
/// If the event is unknown to the `auth_cache` it is added. This guarantees that any
/// event we need to know of will be present.
pub(crate) async fn fetch_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
events: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<Vec<Arc<PduEvent>>> {
let mut pdus = vec![];
for id in events {
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
let pdu = match auth_cache.get(id) {
Some(pdu) => pdu.clone(),
// `get_pdu` checks the outliers tree for us
None => match db.rooms.get_pdu(&id)? {
Some(pdu) => Arc::new(pdu),
None => match db
.sending
.send_federation_request(
&db.globals,
origin,
get_event::v1::Request { event_id: &id },
)
.await
{
Ok(res) => {
let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu);
let (pdu, _) =
validate_event(db, value, event_id, key_map, origin, auth_cache)
.await
.map_err(|e| {
error!("{:?}", e);
Error::Conflict("Authentication of event failed")
})?;
db.rooms.append_pdu_outlier(&pdu)?;
pdu
}
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
},
},
};
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
pdus.push(pdu);
}
Ok(pdus)
/// Search the DB for the signing keys of the given server, if we don't have them
/// fetch them from the server and save to our DB.
pub(crate) async fn fetch_signing_keys(
room_id: &RoomId,
) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
match db.globals.signing_keys_for(origin)? {
keys if !keys.is_empty() => Ok(keys),
_ => {
match db
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
{
Ok(keys) => {
db.globals.add_signing_key(origin, &keys.server_key)?;
Ok(keys.server_key.verify_keys)
}
_ => {
for server in db.rooms.room_servers(room_id) {
let server = server?;
if let Ok(keys) = db
.sending
.send_federation_request(
&db.globals,
&server,
get_remote_server_keys::v2::Request::new(
&server,
SystemTime::now()
.checked_add(Duration::from_secs(3600))
.expect("SystemTime to large"),
),
)
.await
{
let keys: Vec<ServerSigningKeys> = keys.server_keys;
let key = keys.into_iter().fold(None, |mut key, next| {
if let Some(verified) = &key {
// rustc cannot elide this type for some reason
let v: &ServerSigningKeys = verified;
if v.verify_keys
.iter()
.zip(next.verify_keys.iter())
.all(|(a, b)| a.1.key == b.1.key)
{
}
} else {
key = Some(next)
}
key
});
}
}
Err(Error::BadServerResponse(
"Failed to find public key for server",
))
}
}
}
}
}
/// Gather all state snapshots needed to resolve the current state of the room.
///
/// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
///
/// The state snapshot of the incoming event __needs__ to be added to the resulting list.
pub(crate) async fn calculate_forward_extremities(
db: &Database,
pdu: &PduEvent,
) -> Result<Vec<EventId>> {
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
let mut is_incoming_leaf = true;
// Make sure the incoming event is not already a forward extremity
// FIXME: I think this could happen if different servers send us the same event??
if current_leaves.contains(pdu.event_id()) {
error!("The incoming event is already present in get_pdu_leaves BUG");
is_incoming_leaf = false;
// Not sure what to do here
}
// If the incoming event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if
// it has been referenced.
if db.rooms.is_pdu_referenced(pdu)? {
is_incoming_leaf = false;
}
// TODO:
// [dendrite] Checks if any other leaves have been referenced and removes them
// but as long as we update the pdu leaves here and for events on our server this
// should not be possible.
// Remove any forward extremities that are referenced by this incoming events prev_events
for incoming_leaf in &pdu.prev_events {
if current_leaves.contains(incoming_leaf) {
if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) {
current_leaves.remove(pos);
}
}
}
// Add the incoming event only if it is a leaf, we do this after fetching all the
// state since we know we have already fetched the state of the incoming event so lets
// not do it again!
if is_incoming_leaf {
current_leaves.push(pdu.event_id().clone());
}
Ok(current_leaves)
}
/// This should always be called after the incoming event has been appended to the DB.
///
/// This guarantees that the incoming event will be in the state sets (at least our servers
/// and the sending server).
pub(crate) async fn build_forward_extremity_snapshots(
db: &Database,
pdu: Arc<PduEvent>,
origin: &ServerName,
mut current_state: StateMap<Arc<PduEvent>>,
current_leaves: &[EventId],
pub_key_map: &PublicKeyMap,
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> {
let current_hash = db.rooms.current_state_hash(pdu.room_id())?;
let mut includes_current_state = false;
let mut fork_states = BTreeSet::new();
for id in current_leaves {
match db.rooms.get_pdu_id(id)? {
// We can skip this because it is handled outside of this function
// The current server state and incoming event state are built to be
// the state after.
// This would be the incoming state from the server.
Some(_) if id == pdu.event_id() => {}
Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => {
let state_hash = db
.rooms
.pdu_state_hash(&pduid)?
.expect("found pdu with no statehash");
if current_hash.as_ref() == Some(&state_hash) {
includes_current_state = true;
}
let mut state_before = db
.rooms
.state_full(pdu.room_id(), &state_hash)?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect::<StateMap<_>>();
// Now it's the state after
if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? {
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, Arc::new(pdu));
}
fork_states.insert(state_before);
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
_ => {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
let res = db
.sending
.send_federation_request(
&db.globals,
origin,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: id,
},
)
.await?;
// TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this...
fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let mut state_before =
fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
.await?
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect::<StateMap<_>>();
if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
.await?
.pop()
{
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, pdu);
}
// Now it's the state after
fork_states.insert(state_before);
}
// This guarantees that our current room state is included
if !includes_current_state {
current_state.insert((pdu.kind(), pdu.state_key()), pdu);
fork_states.insert(current_state);
Ok(fork_states)
pub(crate) fn update_resolved_state(
db: &Database,
room_id: &RoomId,
state: Option<StateMap<Arc<PduEvent>>>,
) -> Result<()> {
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
if let Some(state) = state {
let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state {
match db.rooms.get_pdu_id(pdu.event_id())? {
Some(pduid) => {
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pduid.to_vec(),
);
}
None => {
error!("We are missing a state event for the current room state.");
}
}
}
db.rooms.force_state(room_id, new_state, &db.globals)?;
}
Ok(())
}
/// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event.
pub(crate) fn append_incoming_pdu(
db: &Database,
pdu: &PduEvent,
new_room_leaves: &[EventId],
state: &StateMap<Arc<PduEvent>>,
) -> Result<()> {
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
let mut new_state = HashMap::new();
for ((ev_type, state_k), state_pdu) in state {
match db.rooms.get_pdu_id(state_pdu.event_id())? {
Some(state_pduid) => {
new_state.insert(
(
ev_type.clone(),
state_k
.clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
state_pduid.to_vec(),
);
}
None => error!("We are missing a state event for the incoming event snapshot"),
}
db.rooms
.force_state(pdu.room_id(), new_state, &db.globals)?;
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
&new_room_leaves,
db.rooms.set_room_state(pdu.room_id(), &state_hash)?;
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
}
Ok(())
}
#[cfg_attr(
feature = "conduit_bin",
post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>")
)]
pub fn get_missing_events_route<'a>(
db: State<'a, Database>,
body: Ruma<get_missing_events::v1::Request<'_>>,
) -> ConduitResult<get_missing_events::v1::Response> {
return Err(Error::bad_config("Federation is disabled."));
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
let mut queued_events = body.latest_events.clone();
let mut events = Vec::new();
let mut i = 0;
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? {
if body.earliest_events.contains(
&serde_json::from_value(
pdu.get("event_id")
.cloned()
.ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?,
)
.map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?,
) {
i += 1;
continue;
}
queued_events.extend_from_slice(
&serde_json::from_value::<Vec<EventId>>(
pdu.get("prev_events").cloned().ok_or_else(|| {
Error::bad_database("Invalid prev_events field of pdu in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
);
events.push(serde_json::from_value(pdu).expect("Raw<..> is always valid"));
}
i += 1;
}
Ok(get_missing_events::v1::Response { events }.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/federation/v1/query/profile", data = "<body>")
)]
pub fn get_profile_information_route<'a>(
db: State<'a, Database>,
body: Ruma<get_profile_information::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> {
return Err(Error::bad_config("Federation is disabled."));
let mut displayname = None;
let mut avatar_url = None;
match &body.field {
// TODO: what to do with custom
Some(ProfileField::_Custom(_s)) => {}
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
None => {
displayname = db.users.displayname(&body.user_id)?;
avatar_url = db.users.avatar_url(&body.user_id)?;
}
}
Ok(get_profile_information::v1::Response {
displayname,
avatar_url,
}
.into())
}
/*
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
)]
pub fn get_user_devices_route<'a>(
db: State<'a, Database>,
body: Ruma<membership::v1::Request<'_>>,
) -> ConduitResult<get_profile_information::v1::Response> {
return Err(Error::bad_config("Federation is disabled."));
let mut displayname = None;
let mut avatar_url = None;
match body.field {
Some(ProfileField::DisplayName) => displayname = db.users.displayname(&body.user_id)?,
Some(ProfileField::AvatarUrl) => avatar_url = db.users.avatar_url(&body.user_id)?,
None => {
displayname = db.users.displayname(&body.user_id)?;
avatar_url = db.users.avatar_url(&body.user_id)?;
}
}
Ok(get_profile_information::v1::Response {
displayname,
avatar_url,
}
.into())
}
*/
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
#[cfg(test)]
mod tests {
use super::{add_port_to_hostname, get_ip_with_port};
#[test]
fn ips_get_default_ports() {
assert_eq!(
get_ip_with_port(String::from("1.1.1.1")),
Some(String::from("1.1.1.1:8448"))
);
assert_eq!(
get_ip_with_port(String::from("dead:beef::")),
Some(String::from("[dead:beef::]:8448"))
);
}
#[test]
fn ips_keep_custom_ports() {
assert_eq!(
get_ip_with_port(String::from("1.1.1.1:1234")),
Some(String::from("1.1.1.1:1234"))
);
assert_eq!(
get_ip_with_port(String::from("[dead::beef]:8933")),
Some(String::from("[dead::beef]:8933"))
);
}
#[test]
fn hostnames_get_default_ports() {
assert_eq!(
add_port_to_hostname(String::from("example.com")),
"example.com:8448"
)
}
#[test]
fn hostnames_keep_custom_ports() {
assert_eq!(
add_port_to_hostname(String::from("example.com:1337")),
"example.com:1337"
)
}
}