Skip to content
Snippets Groups Projects
Unverified Commit ac978ab3 authored by Andrew Morgan's avatar Andrew Morgan Committed by GitHub
Browse files

Default PL100 to enable encryption in a room (#7230)

parent b85d7652
No related branches found
No related tags found
No related merge requests found
Require admin privileges to enable room encryption by default. This does not affect existing rooms.
...@@ -806,6 +806,7 @@ class RoomCreationHandler(BaseHandler): ...@@ -806,6 +806,7 @@ class RoomCreationHandler(BaseHandler):
EventTypes.RoomAvatar: 50, EventTypes.RoomAvatar: 50,
EventTypes.Tombstone: 100, EventTypes.Tombstone: 100,
EventTypes.ServerACL: 100, EventTypes.ServerACL: 100,
EventTypes.RoomEncryption: 100,
}, },
"events_default": 0, "events_default": 0,
"state_default": 50, "state_default": 50,
......
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.rest.client.v2_alpha import sync
from tests.unittest import HomeserverTestCase
class PowerLevelsTestCase(HomeserverTestCase):
"""Tests that power levels are enforced in various situations"""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
sync.register_servlets,
]
def make_homeserver(self, reactor, clock):
config = self.default_config()
return self.setup_test_homeserver(config=config)
def prepare(self, reactor, clock, hs):
# register a room admin, moderator and regular user
self.admin_user_id = self.register_user("admin", "pass")
self.admin_access_token = self.login("admin", "pass")
self.mod_user_id = self.register_user("mod", "pass")
self.mod_access_token = self.login("mod", "pass")
self.user_user_id = self.register_user("user", "pass")
self.user_access_token = self.login("user", "pass")
# Create a room
self.room_id = self.helper.create_room_as(
self.admin_user_id, tok=self.admin_access_token
)
# Invite the other users
self.helper.invite(
room=self.room_id,
src=self.admin_user_id,
tok=self.admin_access_token,
targ=self.mod_user_id,
)
self.helper.invite(
room=self.room_id,
src=self.admin_user_id,
tok=self.admin_access_token,
targ=self.user_user_id,
)
# Make the other users join the room
self.helper.join(
room=self.room_id, user=self.mod_user_id, tok=self.mod_access_token
)
self.helper.join(
room=self.room_id, user=self.user_user_id, tok=self.user_access_token
)
# Mod the mod
room_power_levels = self.helper.get_state(
self.room_id, "m.room.power_levels", tok=self.admin_access_token,
)
# Update existing power levels with mod at PL50
room_power_levels["users"].update({self.mod_user_id: 50})
self.helper.send_state(
self.room_id,
"m.room.power_levels",
room_power_levels,
tok=self.admin_access_token,
)
def test_non_admins_cannot_enable_room_encryption(self):
# have the mod try to enable room encryption
self.helper.send_state(
self.room_id,
"m.room.encryption",
{"algorithm": "m.megolm.v1.aes-sha2"},
tok=self.mod_access_token,
expect_code=403, # expect failure
)
# have the user try to enable room encryption
self.helper.send_state(
self.room_id,
"m.room.encryption",
{"algorithm": "m.megolm.v1.aes-sha2"},
tok=self.user_access_token,
expect_code=403, # expect failure
)
def test_non_admins_cannot_send_server_acl(self):
# have the mod try to send a server ACL
self.helper.send_state(
self.room_id,
"m.room.server_acl",
{
"allow": ["*"],
"allow_ip_literals": False,
"deny": ["*.evil.com", "evil.com"],
},
tok=self.mod_access_token,
expect_code=403, # expect failure
)
# have the user try to send a server ACL
self.helper.send_state(
self.room_id,
"m.room.server_acl",
{
"allow": ["*"],
"allow_ip_literals": False,
"deny": ["*.evil.com", "evil.com"],
},
tok=self.user_access_token,
expect_code=403, # expect failure
)
def test_non_admins_cannot_tombstone_room(self):
# Create another room that will serve as our "upgraded room"
self.upgraded_room_id = self.helper.create_room_as(
self.admin_user_id, tok=self.admin_access_token
)
# have the mod try to send a tombstone event
self.helper.send_state(
self.room_id,
"m.room.tombstone",
{
"body": "This room has been replaced",
"replacement_room": self.upgraded_room_id,
},
tok=self.mod_access_token,
expect_code=403, # expect failure
)
# have the user try to send a tombstone event
self.helper.send_state(
self.room_id,
"m.room.tombstone",
{
"body": "This room has been replaced",
"replacement_room": self.upgraded_room_id,
},
tok=self.user_access_token,
expect_code=403, # expect failure
)
def test_admins_can_enable_room_encryption(self):
# have the admin try to enable room encryption
self.helper.send_state(
self.room_id,
"m.room.encryption",
{"algorithm": "m.megolm.v1.aes-sha2"},
tok=self.admin_access_token,
expect_code=200, # expect success
)
def test_admins_can_send_server_acl(self):
# have the admin try to send a server ACL
self.helper.send_state(
self.room_id,
"m.room.server_acl",
{
"allow": ["*"],
"allow_ip_literals": False,
"deny": ["*.evil.com", "evil.com"],
},
tok=self.admin_access_token,
expect_code=200, # expect success
)
def test_admins_can_tombstone_room(self):
# Create another room that will serve as our "upgraded room"
self.upgraded_room_id = self.helper.create_room_as(
self.admin_user_id, tok=self.admin_access_token
)
# have the admin try to send a tombstone event
self.helper.send_state(
self.room_id,
"m.room.tombstone",
{
"body": "This room has been replaced",
"replacement_room": self.upgraded_room_id,
},
tok=self.admin_access_token,
expect_code=200, # expect success
)
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
import json import json
import time import time
from typing import Any, Dict, Optional
import attr import attr
...@@ -142,7 +143,34 @@ class RestHelper(object): ...@@ -142,7 +143,34 @@ class RestHelper(object):
return channel.json_body return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""): def _read_write_state(
self,
room_id: str,
event_type: str,
body: Optional[Dict[str, Any]],
tok: str,
expect_code: int = 200,
state_key: str = "",
method: str = "GET",
) -> Dict:
"""Read or write some state from a given room
Args:
room_id:
event_type: The type of state event
body: Body that is sent when making the request. The content of the state event.
If None, the request to the server will have an empty body
tok: The access token to use
expect_code: The HTTP code to expect in the response
state_key:
method: "GET" or "PUT" for reading or writing state, respectively
Returns:
The response body from the server
Raises:
AssertionError: if expect_code doesn't match the HTTP code we received
"""
path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % ( path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
room_id, room_id,
event_type, event_type,
...@@ -151,9 +179,13 @@ class RestHelper(object): ...@@ -151,9 +179,13 @@ class RestHelper(object):
if tok: if tok:
path = path + "?access_token=%s" % tok path = path + "?access_token=%s" % tok
request, channel = make_request( # Set request body if provided
self.hs.get_reactor(), "PUT", path, json.dumps(body).encode("utf8") content = b""
) if body is not None:
content = json.dumps(body).encode("utf8")
request, channel = make_request(self.hs.get_reactor(), method, path, content)
render(request, self.resource, self.hs.get_reactor()) render(request, self.resource, self.hs.get_reactor())
assert int(channel.result["code"]) == expect_code, ( assert int(channel.result["code"]) == expect_code, (
...@@ -163,6 +195,62 @@ class RestHelper(object): ...@@ -163,6 +195,62 @@ class RestHelper(object):
return channel.json_body return channel.json_body
def get_state(
self,
room_id: str,
event_type: str,
tok: str,
expect_code: int = 200,
state_key: str = "",
):
"""Gets some state from a room
Args:
room_id:
event_type: The type of state event
tok: The access token to use
expect_code: The HTTP code to expect in the response
state_key:
Returns:
The response body from the server
Raises:
AssertionError: if expect_code doesn't match the HTTP code we received
"""
return self._read_write_state(
room_id, event_type, None, tok, expect_code, state_key, method="GET"
)
def send_state(
self,
room_id: str,
event_type: str,
body: Dict[str, Any],
tok: str,
expect_code: int = 200,
state_key: str = "",
):
"""Set some state in a room
Args:
room_id:
event_type: The type of state event
body: Body that is sent when making the request. The content of the state event.
tok: The access token to use
expect_code: The HTTP code to expect in the response
state_key:
Returns:
The response body from the server
Raises:
AssertionError: if expect_code doesn't match the HTTP code we received
"""
return self._read_write_state(
room_id, event_type, body, tok, expect_code, state_key, method="PUT"
)
def upload_media( def upload_media(
self, self,
resource: Resource, resource: Resource,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment