Newer
Older
server_server, utils, Database, Error, Result, Ruma,
api::{
client::{
error::ErrorKind,
ban_user, forget_room, get_member_events, invite_user, join_room_by_id,
join_room_by_id_or_alias, joined_members, joined_rooms, kick_user, leave_room,
unban_user, IncomingThirdPartySigned,
federation::{self, membership::create_invite},
},
events::{
room::{
create::RoomCreateEventContent,
member::{MembershipState, RoomMemberEventContent},
},
serde::{to_canonical_value, Base64, CanonicalJsonObject, CanonicalJsonValue},
state_res::{self, RoomVersion},
uint, EventId, RoomId, RoomVersionId, ServerName, UserId,
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
time::{Duration, Instant},
};
use tracing::{debug, error, warn};
/// # `POST /_matrix/client/r0/rooms/{roomId}/join`
///
/// Tries to join the sender user into a room.
///
/// - If the server knowns about this room: creates the join event and does auth rules locally
/// - If the server does not know about the room: asks other servers over federation
pub async fn join_room_by_id_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut servers = Vec::new(); // There is no body.server_name for /roomId/join
servers.extend(
db.rooms
.invite_state(sender_user, &body.room_id)?
.unwrap_or_default()
.iter()
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
.filter_map(|sender| UserId::parse(sender).ok())
.map(|user| user.server_name().to_owned()),
);
servers.push(body.room_id.server_name().to_owned());
&body.room_id,
&servers,
body.third_party_signed.as_ref(),
)
/// # `POST /_matrix/client/r0/join/{roomIdOrAlias}`
///
/// Tries to join the sender user into a room.
///
/// - If the server knowns about this room: creates the join event and does auth rules locally
/// - If the server does not know about the room: asks other servers over federation
pub async fn join_room_by_id_or_alias_route(
let sender_user = body.sender_user.as_deref().expect("user is authenticated");
let body = body.body;
let (servers, room_id) = match Box::<RoomId>::try_from(body.room_id_or_alias) {
Ok(room_id) => {
let mut servers = body.server_name.clone();
servers.extend(
db.rooms
.invite_state(sender_user, &room_id)?
.unwrap_or_default()
.iter()
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
.filter_map(|sender| UserId::parse(sender).ok())
.map(|user| user.server_name().to_owned()),
);
servers.push(room_id.server_name().to_owned());
(servers, room_id)
}
let response = client_server::get_alias_helper(&db, &room_alias).await?;
(response.servers.into_iter().collect(), response.room_id)
let join_room_response = join_room_by_id_helper(
&db,
&room_id,
&servers,
body.third_party_signed.as_ref(),
)
.await?;
room_id: join_room_response.room_id,
})
/// # `POST /_matrix/client/r0/rooms/{roomId}/leave`
///
/// Tries to leave the sender user from a room.
///
/// - This should always work if the user is currently joined.
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
db.rooms.leave_room(sender_user, &body.room_id, &db).await?;
/// # `POST /_matrix/client/r0/rooms/{roomId}/invite`
///
/// Tries to send an invite event into the room.
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if let invite_user::v3::IncomingInvitationRecipient::UserId { user_id } = &body.recipient {
invite_helper(sender_user, user_id, &body.room_id, &db, false).await?;
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "User not found."))
}
}
/// # `POST /_matrix/client/r0/rooms/{roomId}/kick`
///
/// Tries to send a kick event into the room.
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut event: RoomMemberEventContent = serde_json::from_str(
db.rooms
.room_state_get(
&body.room_id,
&body.user_id.to_string(),
)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot kick member that's not in the room.",
))?
)
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
.write()
.unwrap()
.entry(body.room_id.clone())
.or_default(),
);
content: to_raw_value(&event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
/// # `POST /_matrix/client/r0/rooms/{roomId}/ban`
///
/// Tries to send a ban event into the room.
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
// TODO: reason
let event = db
.rooms
.room_state_get(
&body.room_id,
&body.user_id.to_string(),
)?
.map_or(
displayname: db.users.displayname(&body.user_id)?,
avatar_url: db.users.avatar_url(&body.user_id)?,
is_direct: None,
third_party_invite: None,
blurhash: db.users.blurhash(&body.user_id)?,
serde_json::from_str(event.content.get())
.map(|event: RoomMemberEventContent| RoomMemberEventContent {
membership: MembershipState::Ban,
..event
})
.map_err(|_| Error::bad_database("Invalid member event in database."))
.write()
.unwrap()
.entry(body.room_id.clone())
.or_default(),
);
content: to_raw_value(&event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
/// # `POST /_matrix/client/r0/rooms/{roomId}/unban`
///
/// Tries to send an unban event into the room.
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut event: RoomMemberEventContent = serde_json::from_str(
db.rooms
.room_state_get(
&body.room_id,
&body.user_id.to_string(),
)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot unban a user who is not banned.",
))?
)
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
.write()
.unwrap()
.entry(body.room_id.clone())
.or_default(),
);
content: to_raw_value(&event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
/// # `POST /_matrix/client/r0/rooms/{roomId}/forget`
///
/// Forgets about a room.
///
/// - If the sender user currently left the room: Stops sender user from receiving information about the room
///
/// Note: Other devices of the user have no way of knowing the room was forgotten, so this has to
/// be called from every device
pub async fn forget_room_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
db.rooms.forget(&body.room_id, sender_user)?;
/// # `POST /_matrix/client/r0/joined_rooms`
///
/// Lists all rooms the user has joined.
pub async fn joined_rooms_route(
body: Ruma<joined_rooms::v3::Request>,
) -> Result<joined_rooms::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
.rooms_joined(sender_user)
.filter_map(|r| r.ok())
.collect(),
/// # `POST /_matrix/client/r0/rooms/{roomId}/members`
///
/// Lists all joined users in a room (TODO: at a specific point in time, with a specific membership).
///
/// - Only works if the user is currently joined
pub async fn get_member_events_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
// TODO: check history visibility?
if !db.rooms.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
));
}
.room_state_full(&body.room_id)
.await?
.filter(|(key, _)| key.0 == StateEventType::RoomMember)
.map(|(_, pdu)| pdu.to_member_event().into())
/// # `POST /_matrix/client/r0/rooms/{roomId}/joined_members`
///
/// Lists all members of a room.
///
/// - The sender user must be in the room
/// - TODO: An appservice just needs a puppet joined
pub async fn joined_members_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !db.rooms.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You aren't a member of the room.",
));
}
let mut joined = BTreeMap::new();
for user_id in db.rooms.room_members(&body.room_id).filter_map(|r| r.ok()) {
let display_name = db.users.displayname(&user_id)?;
let avatar_url = db.users.avatar_url(&user_id)?;
joined.insert(
user_id,
display_name,
avatar_url,
},
);
}
#[tracing::instrument(skip(db))]
async fn join_room_by_id_helper(
db: &Database,
room_id: &RoomId,
_third_party_signed: Option<&IncomingThirdPartySigned>,
let sender_user = sender_user.expect("user is authenticated");
// Ask a remote server if we don't have this room
let mut make_join_response_and_server = Err(Error::BadServerResponse(
"No server available to assist in joining.",
));
for remote_server in servers {
let make_join_response = db
.sending
.send_federation_request(
&db.globals,
federation::membership::prepare_join_event::v1::Request {
ver: &db.globals.supported_room_versions(),
make_join_response_and_server = make_join_response.map(|r| (r, remote_server));
if make_join_response_and_server.is_ok() {
break;
}
}
let (make_join_response, remote_server) = make_join_response_and_server?;
let room_version = match make_join_response.room_version {
Some(room_version) if db.rooms.is_supported_version(&db, &room_version) => room_version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|_| {
Error::BadServerResponse("Invalid make_join event json received from server.")
})?;
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(db.globals.server_name().as_str().to_owned()),
);
join_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
join_event_stub.insert(
"content".to_owned(),
to_canonical_value(RoomMemberEventContent {
membership: MembershipState::Join,
displayname: db.users.displayname(sender_user)?,
avatar_url: db.users.avatar_url(sender_user)?,
is_direct: None,
third_party_invite: None,
blurhash: db.users.blurhash(sender_user)?,
})
.expect("event is valid, we just created it"),
);
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
join_event_stub.remove("event_id");
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
ruma::signatures::hash_and_sign_event(
db.globals.server_name().as_str(),
db.globals.keypair(),
&mut join_event_stub,
)
.expect("event is valid, we just created it");
// Generate event id
ruma::signatures::reference_hash(&join_event_stub, &room_version)
.expect("ruma can calculate reference hashes")
);
let event_id = <&EventId>::try_from(event_id.as_str())
.expect("ruma's reference hashes are valid event ids");
join_event_stub.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
// It has enough fields to be called a proper event now
let join_event = join_event_stub;
let send_join_response = db
.sending
.send_federation_request(
&db.globals,
federation::membership::create_join_event::v2::Request {
room_id,
pdu: &PduEvent::convert_to_outgoing_federation_event(join_event.clone()),
db.rooms.get_or_create_shortroomid(room_id, &db.globals)?;
let parsed_pdu = PduEvent::from_id_val(event_id, join_event.clone())
.map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?;
server_server::fetch_join_signing_keys(
&send_join_response,
&room_version,
&pub_key_map,
.await?;
for result in send_join_response
.room_state
.state
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, db))
let (event_id, value) = match result {
Ok(t) => t,
Err(_) => continue,
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
warn!("{:?}: {}", value, e);
Error::BadServerResponse("Invalid PDU in send_join response.")
})?;
db.rooms.add_pdu_outlier(&event_id, &value)?;
if let Some(state_key) = &pdu.state_key {
let shortstatekey = db.rooms.get_or_create_shortstatekey(
&pdu.kind.to_string().into(),
state_key,
&db.globals,
)?;
state.insert(shortstatekey, pdu.event_id.clone());
let incoming_shortstatekey = db.rooms.get_or_create_shortstatekey(
parsed_pdu
.state_key
.as_ref()
.expect("Pdu is a membership state event"),
&db.globals,
)?;
state.insert(incoming_shortstatekey, parsed_pdu.event_id.clone());
let create_shortstatekey = db
.rooms
.expect("Room exists");
if state.get(&create_shortstatekey).is_none() {
return Err(Error::BadServerResponse("State contained no create event."));
}
db.rooms.force_state(
room_id,
state
.into_iter()
.map(|(k, id)| db.rooms.compress_state_event(k, &id, &db.globals))
for result in send_join_response
.room_state
.auth_chain
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, db))
{
let (event_id, value) = match result {
Ok(t) => t,
Err(_) => continue,
};
db.rooms.add_pdu_outlier(&event_id, &value)?;
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = db.rooms.append_to_state(&parsed_pdu, &db.globals)?;
&parsed_pdu,
join_event,
iter::once(&*parsed_pdu.event_id),
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
db.rooms.set_room_state(room_id, statehashid)?;
let event = RoomMemberEventContent {
membership: MembershipState::Join,
displayname: db.users.displayname(sender_user)?,
avatar_url: db.users.avatar_url(sender_user)?,
is_direct: None,
third_party_invite: None,
blurhash: db.users.blurhash(sender_user)?,
content: to_raw_value(&event).expect("event is valid, we just created it"),
state_key: Some(sender_user.to_string()),
sender_user,
room_id,
db,
Ok(join_room_by_id::v3::Response::new(room_id.to_owned()))
fn validate_and_add_event_id(
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
let mut value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
ruma::signatures::reference_hash(&value, room_version)
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
};
if let Some((time, tries)) = db
.globals
.bad_event_ratelimiter
.read()
.unwrap()
.get(&event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off"));
}
}
if let Err(e) = ruma::signatures::verify_event(
&*pub_key_map
.read()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?,
&value,
room_version,
) {
warn!("Event {} failed verification {:?} {}", event_id, pdu, e);
back_off(event_id);
return Err(Error::BadServerResponse("Event failed verification."));
}
value.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
Ok((event_id, value))
}
pub(crate) async fn invite_helper<'a>(
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
is_direct: bool,
) -> Result<()> {
if user_id.server_name() != db.globals.server_name() {
let (room_version_id, pdu_json, invite_room_state) = {
.write()
.unwrap()
.rooms
.get_pdu_leaves(room_id)?
.into_iter()
.take(20)
let create_event = db
.rooms
let create_event_content: Option<RoomCreateEventContent> = create_event
.as_ref()
.map(|create_event| {
serde_json::from_str(create_event.content.get()).map_err(|e| {
warn!("Invalid create event: {}", e);
Error::bad_database("Invalid create event in db.")
})
// If there was no create event yet, assume we are creating a room with the default
// version right now
let room_version_id = create_event_content
.map_or(db.globals.default_room_version(), |create_event| {
create_event.room_version
});
let room_version =
RoomVersion::new(&room_version_id).expect("room version is supported");
avatar_url: None,
displayname: None,
is_direct: Some(is_direct),
membership: MembershipState::Invite,
third_party_invite: None,
blurhash: None,
})
.expect("member event is valid value");
let state_key = user_id.to_string();
let auth_events = db.rooms.get_auth_events(
room_id,
Some(&state_key),
&content,
)?;
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
.max()
.unwrap_or_else(|| uint!(0))
+ uint!(1);
let mut unsigned = BTreeMap::new();
if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
unsigned.insert(
"prev_sender".to_owned(),
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.to_owned(),
sender: sender_user.to_owned(),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
content,
state_key: Some(state_key),
prev_events,
depth,
auth_events: auth_events
.iter()
.map(|(_, pdu)| pdu.event_id.clone())
.collect(),
redacts: None,
unsigned: if unsigned.is_empty() {
None
} else {
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
},
hashes: EventHash {
sha256: "aaa".to_owned(),
},
let auth_check = state_res::auth_check(
&room_version,
&pdu,
None::<PduEvent>, // TODO: third_party_invite
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
)
.map_err(|e| {
error!("{:?}", e);
Error::bad_database("Auth check failed.")
})?;
if !auth_check {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Event is not authorized.",
));
}
// Hash and sign
let mut pdu_json =
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
pdu_json.remove("event_id");
// Add origin because synapse likes that (and it's required in the spec)
pdu_json.insert(
"origin".to_owned(),
to_canonical_value(db.globals.server_name())
.expect("server name is a valid CanonicalJsonValue"),
);
ruma::signatures::hash_and_sign_event(
db.globals.server_name().as_str(),
db.globals.keypair(),
&mut pdu_json,
&room_version_id,
)
.expect("event is valid, we just created it");
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
(room_version_id, pdu_json, invite_room_state)
};
"${}",
ruma::signatures::reference_hash(&pdu_json, &room_version_id)
.expect("ruma can calculate reference hashes")
);
let expected_event_id = <&EventId>::try_from(expected_event_id.as_str())
.expect("ruma's reference hashes are valid event ids");
let response = db
.sending
.send_federation_request(
&db.globals,
user_id.server_name(),
create_invite::v2::Request {
room_version: &room_version_id,
event: &PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()),
invite_room_state: &invite_room_state,
},
)
.await?;
let pub_key_map = RwLock::new(BTreeMap::new());
// We do not add the event_id field to the pdu here because of signature and hashes checks
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&response.event, &db)
{
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
}
};
if expected_event_id != event_id {
warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value);
}
let origin: Box<ServerName> = serde_json::from_value(
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event needs an origin field.",
))?)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
let pdu_id = server_server::handle_incoming_pdu(
&origin,
&event_id,
value,
true,
&pub_key_map,
)
.await
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Error while handling incoming PDU.",
)
})?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
))?;
.rooms
.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name());
db.sending.send_pdu(servers, &pdu_id)?;
if !db.rooms.is_joined(sender_user, &room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
));
}
.write()
.unwrap()
.or_default(),
);
db.rooms.build_and_append_pdu(
PduBuilder {
content: to_raw_value(&RoomMemberEventContent {
membership: MembershipState::Invite,
displayname: db.users.displayname(user_id)?,
avatar_url: db.users.avatar_url(user_id)?,
blurhash: db.users.blurhash(user_id)?,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
// Make a user leave all their joined rooms
#[tracing::instrument(skip(self, db))]
pub async fn leave_all_rooms(&self, user_id: &UserId, db: &Database) -> Result<()> {
let all_rooms = db
.rooms
.rooms_joined(user_id)
.chain(db.rooms.rooms_invited(user_id).map(|t| t.map(|(r, _)| r)))
.collect::<Vec<_>>();
for room_id in all_rooms {
let room_id = match room_id {
Ok(room_id) => room_id,
Err(_) => continue,
};
let _ = self.leave_room(user_id, &room_id, db).await;
}
Ok(())
}
#[tracing::instrument(skip(self, db))]
pub async fn leave_room(
&self,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
) -> Result<()> {
// Ask a remote server if we don't have this room
if !self.exists(room_id)? && room_id.server_name() != db.globals.server_name() {
if let Err(e) = self.remote_leave_room(user_id, room_id, db).await {
warn!("Failed to leave room {} remotely: {}", user_id, e);
// Don't tell the client about this error
}
let last_state = self
.invite_state(user_id, room_id)?
.map_or_else(|| self.left_state(user_id, room_id), |s| Ok(Some(s)))?;
// We always drop the invite, we can't rely on other servers
self.update_membership(
room_id,
user_id,
MembershipState::Leave,
user_id,
last_state,
db,
true,
)?;
} else {
let mutex_state = Arc::clone(
db.globals
.roomid_mutex_state
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let mut event: RoomMemberEventContent = serde_json::from_str(
self.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
))?
.content
.get(),
)
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = MembershipState::Leave;
self.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
user_id,
room_id,
db,
&state_lock,
)?;
}
Ok(())
}
#[tracing::instrument(skip(self, db))]
async fn remote_leave_room(
&self,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
) -> Result<()> {
let mut make_leave_response_and_server = Err(Error::BadServerResponse(
"No server available to assist in leaving.",
));
let invite_state = db
.rooms
.invite_state(user_id, room_id)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"User is not invited.",
))?;
let servers: HashSet<_> = invite_state
.iter()
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
.filter_map(|sender| UserId::parse(sender).ok())
.map(|user| user.server_name().to_owned())
.collect();
for remote_server in servers {
let make_leave_response = db
.sending
.send_federation_request(
&db.globals,
&remote_server,
federation::membership::prepare_leave_event::v1::Request { room_id, user_id },
)
.await;
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
if make_leave_response_and_server.is_ok() {
break;
}
}
let (make_leave_response, remote_server) = make_leave_response_and_server?;
let room_version_id = match make_leave_response.room_version {
Some(version) if self.is_supported_version(&db, &version) => version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
let mut leave_event_stub =
serde_json::from_str::<CanonicalJsonObject>(make_leave_response.event.get()).map_err(
|_| Error::BadServerResponse("Invalid make_leave event json received from server."),
)?;
// TODO: Is origin needed?
leave_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(db.globals.server_name().as_str().to_owned()),
);
leave_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
// We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms
leave_event_stub.remove("event_id");
// In order to create a compatible ref hash (EventID) the `hashes` field needs to be present
ruma::signatures::hash_and_sign_event(
db.globals.server_name().as_str(),
db.globals.keypair(),
&mut leave_event_stub,
&room_version_id,
)
.expect("event is valid, we just created it");
// Generate event id
let event_id = EventId::parse(format!(
"${}",
ruma::signatures::reference_hash(&leave_event_stub, &room_version_id)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
// Add event_id back
leave_event_stub.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.as_str().to_owned()),
);
// It has enough fields to be called a proper event now
let leave_event = leave_event_stub;
db.sending
.send_federation_request(
&db.globals,
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id,
event_id: &event_id,
pdu: &PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
},
)
.await?;
Ok(())
}