Skip to content
Snippets Groups Projects
test_user.py 127 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2018-2021 The Matrix.org Foundation C.I.C.
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import hashlib
    import hmac
    
    from binascii import unhexlify
    
    from typing import List, Optional
    
    from unittest.mock import Mock, patch
    
    from parameterized import parameterized, parameterized_class
    
    import synapse.rest.admin
    from synapse.api.constants import UserTypes
    
    from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
    
    from synapse.api.room_versions import RoomVersions
    
    from synapse.rest.client import devices, login, logout, profile, room, sync
    
    from synapse.rest.media.v1.filepath import MediaFilePaths
    
    from synapse.types import JsonDict, UserID
    
    from tests.server import FakeSite, make_request
    
    from tests.test_utils import SMALL_PNG, make_awaitable
    
    from tests.unittest import override_config
    
    
    
    class UserRegisterTestCase(unittest.HomeserverTestCase):
    
    
        servlets = [
            synapse.rest.admin.register_servlets_for_client_rest_resource,
            profile.register_servlets,
        ]
    
            self.url = "/_synapse/admin/v1/register"
    
    
            self.registration_handler = Mock()
            self.identity_handler = Mock()
            self.login_handler = Mock()
            self.device_handler = Mock()
            self.device_handler.check_device_registered = Mock(return_value="FAKE")
    
            self.datastore = Mock(return_value=Mock())
            self.datastore.get_current_state_deltas = Mock(return_value=(0, []))
    
            self.hs = self.setup_test_homeserver()
    
    
            self.hs.config.registration.registration_shared_secret = "shared"
    
    
            self.hs.get_media_repository = Mock()
            self.hs.get_deactivate_account_handler = Mock()
    
            return self.hs
    
        def test_disabled(self):
            """
            If there is no shared secret, registration through this method will be
            prevented.
            """
    
            self.hs.config.registration.registration_shared_secret = None
    
            channel = self.make_request("POST", self.url, b"{}")
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(
                "Shared secret registration is not enabled", channel.json_body["error"]
            )
    
        def test_get_nonce(self):
            """
            Calling GET on the endpoint will return a randomised nonce, using the
            homeserver's secrets provider.
            """
    
            with patch("secrets.token_hex") as token_hex:
                # Patch secrets.token_hex for the duration of this context
                token_hex.return_value = "abcd"
    
                channel = self.make_request("GET", self.url)
    
                self.assertEqual(channel.json_body, {"nonce": "abcd"})
    
    
        def test_expired_nonce(self):
            """
            Calling GET on the endpoint will return a randomised nonce, which will
            only last for SALT_TIMEOUT (60s).
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            # 59 seconds
            self.reactor.advance(59)
    
    
            body = {"nonce": nonce}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("username must be specified", channel.json_body["error"])
    
            # 61 seconds
            self.reactor.advance(2)
    
    
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("unrecognised nonce", channel.json_body["error"])
    
        def test_register_incorrect_nonce(self):
            """
            Only the provided nonce can be used, as it's checked in the MAC.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob",
                "password": "abc123",
                "admin": True,
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(403, channel.code, msg=channel.json_body)
    
            self.assertEqual("HMAC incorrect", channel.json_body["error"])
    
        def test_register_correct_nonce(self):
            """
            When the correct nonce is provided, and the right key is provided, the
            user is registered.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(
                nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
            )
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob",
                "password": "abc123",
                "admin": True,
                "user_type": UserTypes.SUPPORT,
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
        def test_nonce_reuse(self):
            """
            A valid unrecognised nonce.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob",
                "password": "abc123",
                "admin": True,
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
            # Now, try and reuse it
    
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("unrecognised nonce", channel.json_body["error"])
    
        def test_missing_parts(self):
            """
            Synapse will complain if you don't give nonce, username, password, and
            mac.  Admin and user_types are optional.  Additional checks are done for length
            and type.
            """
    
            def nonce():
    
                channel = self.make_request("GET", self.url)
    
            # Must be an empty body present
            channel = self.make_request("POST", self.url, {})
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("nonce must be specified", channel.json_body["error"])
    
            #
            # Username checks
            #
    
            # Must be present
    
            channel = self.make_request("POST", self.url, {"nonce": nonce()})
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("username must be specified", channel.json_body["error"])
    
            # Must be a string
    
            body = {"nonce": nonce(), "username": 1234}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            # Must not have null bytes
    
            body = {"nonce": nonce(), "username": "abcd\u0000"}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            # Must not have null bytes
    
            body = {"nonce": nonce(), "username": "a" * 1000}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            #
            # Password checks
            #
    
            # Must be present
    
            body = {"nonce": nonce(), "username": "a"}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("password must be specified", channel.json_body["error"])
    
            # Must be a string
    
            body = {"nonce": nonce(), "username": "a", "password": 1234}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            # Must not have null bytes
    
            body = {"nonce": nonce(), "username": "a", "password": "abcd\u0000"}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            # Super long
    
            body = {"nonce": nonce(), "username": "a", "password": "A" * 1000}
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            #
            # user_type check
            #
    
            # Invalid user_type
    
            body = {
                "nonce": nonce(),
                "username": "a",
                "password": "1234",
                "user_type": "invalid",
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual("Invalid user type", channel.json_body["error"])
    
    
        def test_displayname(self):
            """
            Test that displayname of new user is set
            """
    
            # set no displayname
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob1\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob1",
                "password": "abc123",
                "mac": want_mac,
            }
    
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob1:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob1:test/displayname")
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("bob1", channel.json_body["displayname"])
    
            # displayname is None
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob2\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob2",
                "displayname": None,
                "password": "abc123",
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob2:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob2:test/displayname")
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("bob2", channel.json_body["displayname"])
    
            # displayname is empty
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob3\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob3",
                "displayname": "",
                "password": "abc123",
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob3:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob3:test/displayname")
    
            self.assertEqual(404, channel.code, msg=channel.json_body)
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob4\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob4",
                "displayname": "Bob's Name",
                "password": "abc123",
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob4:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob4:test/displayname")
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("Bob's Name", channel.json_body["displayname"])
    
    
        @override_config(
            {"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
        )
        def test_register_mau_limit_reached(self):
            """
            Check we can register a user via the shared secret registration API
            even if the MAU limit is reached.
            """
            handler = self.hs.get_registration_handler()
            store = self.hs.get_datastore()
    
            # Set monthly active users to the limit
    
            store.get_monthly_active_count = Mock(
    
                return_value=make_awaitable(self.hs.config.server.max_mau_value)
    
            # Check that the blocking of monthly active users is working as expected
            # The registration of a new user fails due to the limit
            self.get_failure(
                handler.register_user(localpart="local_part"), ResourceLimitError
            )
    
            # Register new user with admin API
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(
                nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
            )
            want_mac = want_mac.hexdigest()
    
    
            body = {
                "nonce": nonce,
                "username": "bob",
                "password": "abc123",
                "admin": True,
                "user_type": UserTypes.SUPPORT,
                "mac": want_mac,
            }
            channel = self.make_request("POST", self.url, body)
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
    
    
    class UsersListTestCase(unittest.HomeserverTestCase):
    
        servlets = [
            synapse.rest.admin.register_servlets,
            login.register_servlets,
        ]
        url = "/_synapse/admin/v2/users"
    
        def prepare(self, reactor, clock, hs):
    
            self.store = hs.get_datastore()
    
    
            self.admin_user = self.register_user("admin", "pass", admin=True)
            self.admin_user_tok = self.login("admin", "pass")
    
        def test_no_auth(self):
            """
            Try to list users without authentication.
            """
    
            channel = self.make_request("GET", self.url, b"{}")
    
            self.assertEqual(401, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
    
        def test_requester_is_no_admin(self):
            """
            If the user is not a server admin, an error is returned.
            """
    
            self._create_users(1)
    
            other_user_token = self.login("user1", "pass1")
    
    
            channel = self.make_request("GET", self.url, access_token=other_user_token)
    
            self.assertEqual(403, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
    
    
        def test_all_users(self):
            """
            List all users, including deactivated users.
            """
    
            self._create_users(2)
    
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(3, len(channel.json_body["users"]))
    
            self.assertEqual(3, channel.json_body["total"])
    
            self._check_fields(channel.json_body["users"])
    
    
        def test_search_term(self):
            """Test that searching for a users works correctly"""
    
            def _search_test(
                expected_user_id: Optional[str],
                search_term: str,
                search_field: Optional[str] = "name",
                expected_http_code: Optional[int] = 200,
            ):
                """Search for a user and check that the returned user's id is a match
    
                Args:
                    expected_user_id: The user_id expected to be returned by the API. Set
                        to None to expect zero results for the search
                    search_term: The term to search for user names with
                    search_field: Field which is to request: `name` or `user_id`
                    expected_http_code: The expected http code for the request
                """
    
                url = self.url + "?%s=%s" % (
                    search_field,
                    search_term,
                )
    
                channel = self.make_request(
    
                    access_token=self.admin_user_tok,
    
                )
                self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
    
                if expected_http_code != 200:
                    return
    
                # Check that users were returned
                self.assertTrue("users" in channel.json_body)
    
                self._check_fields(channel.json_body["users"])
    
                users = channel.json_body["users"]
    
                # Check that the expected number of users were returned
                expected_user_count = 1 if expected_user_id else 0
                self.assertEqual(len(users), expected_user_count)
                self.assertEqual(channel.json_body["total"], expected_user_count)
    
                if expected_user_id:
                    # Check that the first returned user id is correct
                    u = users[0]
                    self.assertEqual(expected_user_id, u["name"])
    
    
            self._create_users(2)
    
            user1 = "@user1:test"
            user2 = "@user2:test"
    
    
            _search_test(user1, "er1")
            _search_test(user1, "me 1")
    
            _search_test(user2, "er2")
            _search_test(user2, "me 2")
    
            _search_test(user1, "er1", "user_id")
            _search_test(user2, "er2", "user_id")
    
            _search_test(user1, "ER1")
            _search_test(user1, "NAME 1")
    
            _search_test(user2, "ER2")
            _search_test(user2, "NAME 2")
    
            _search_test(user1, "ER1", "user_id")
            _search_test(user2, "ER2", "user_id")
    
    
            _search_test(None, "foo")
            _search_test(None, "bar")
    
            _search_test(None, "foo", "user_id")
            _search_test(None, "bar", "user_id")
    
    
        def test_invalid_parameter(self):
            """
            If parameters are invalid, an error is returned.
            """
    
            # negative limit
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=-5",
                access_token=self.admin_user_tok,
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
    
            # negative from
            channel = self.make_request(
    
                "GET",
                self.url + "?from=-5",
                access_token=self.admin_user_tok,
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
    
            # invalid guests
            channel = self.make_request(
    
                "GET",
                self.url + "?guests=not_bool",
                access_token=self.admin_user_tok,
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
            # invalid deactivated
            channel = self.make_request(
    
                "GET",
                self.url + "?deactivated=not_bool",
                access_token=self.admin_user_tok,
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
    
            # unkown order_by
            channel = self.make_request(
                "GET",
                self.url + "?order_by=bar",
                access_token=self.admin_user_tok,
            )
    
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
            # invalid search order
            channel = self.make_request(
                "GET",
                self.url + "?dir=bar",
                access_token=self.admin_user_tok,
            )
    
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
    
        def test_limit(self):
            """
            Testing list of users with limit
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=5",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 5)
            self.assertEqual(channel.json_body["next_token"], "5")
            self._check_fields(channel.json_body["users"])
    
        def test_from(self):
            """
            Testing list of users with a defined starting point (from)
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?from=5",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 15)
            self.assertNotIn("next_token", channel.json_body)
            self._check_fields(channel.json_body["users"])
    
        def test_limit_and_from(self):
            """
            Testing list of users with a defined starting point and limit
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?from=5&limit=10",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(channel.json_body["next_token"], "15")
            self.assertEqual(len(channel.json_body["users"]), 10)
            self._check_fields(channel.json_body["users"])
    
        def test_next_token(self):
            """
            Testing that `next_token` appears at the right place
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            #  `next_token` does not appear
            # Number of results is the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=20",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), number_users)
            self.assertNotIn("next_token", channel.json_body)
    
            #  `next_token` does not appear
            # Number of max results is larger than the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=21",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), number_users)
            self.assertNotIn("next_token", channel.json_body)
    
            #  `next_token` does appear
            # Number of max results is smaller than the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=19",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 19)
            self.assertEqual(channel.json_body["next_token"], "19")
    
            # Check
            # Set `from` to value of `next_token` for request remaining entries
            #  `next_token` does not appear
            channel = self.make_request(
    
                "GET",
                self.url + "?from=19",
                access_token=self.admin_user_tok,
    
            self.assertEqual(200, channel.code, msg=channel.json_body)
    
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 1)
            self.assertNotIn("next_token", channel.json_body)
    
    
        def test_order_by(self):
            """
            Testing order list with parameter `order_by`
            """
    
    
            # make sure that the users do not have the same timestamps
            self.reactor.advance(10)
    
            user1 = self.register_user("user1", "pass1", admin=False, displayname="Name Z")
    
            self.reactor.advance(10)
    
            user2 = self.register_user("user2", "pass2", admin=False, displayname="Name Y")
    
            # Modify user
            self.get_success(self.store.set_user_deactivated_status(user1, True))
            self.get_success(self.store.set_shadow_banned(UserID.from_string(user1), True))
    
            # Set avatar URL to all users, that no user has a NULL value to avoid
            # different sort order between SQlite and PostreSQL
            self.get_success(self.store.set_profile_avatar_url("user1", "mxc://url3"))
            self.get_success(self.store.set_profile_avatar_url("user2", "mxc://url2"))
            self.get_success(self.store.set_profile_avatar_url("admin", "mxc://url1"))
    
            # order by default (name)
            self._order_test([self.admin_user, user1, user2], None)
            self._order_test([self.admin_user, user1, user2], None, "f")
            self._order_test([user2, user1, self.admin_user], None, "b")
    
            # order by name
            self._order_test([self.admin_user, user1, user2], "name")
            self._order_test([self.admin_user, user1, user2], "name", "f")
            self._order_test([user2, user1, self.admin_user], "name", "b")
    
            # order by displayname
            self._order_test([user2, user1, self.admin_user], "displayname")
            self._order_test([user2, user1, self.admin_user], "displayname", "f")
            self._order_test([self.admin_user, user1, user2], "displayname", "b")
    
            # order by is_guest
            # like sort by ascending name, as no guest user here
            self._order_test([self.admin_user, user1, user2], "is_guest")
            self._order_test([self.admin_user, user1, user2], "is_guest", "f")
            self._order_test([self.admin_user, user1, user2], "is_guest", "b")
    
            # order by admin
            self._order_test([user1, user2, self.admin_user], "admin")
            self._order_test([user1, user2, self.admin_user], "admin", "f")
            self._order_test([self.admin_user, user1, user2], "admin", "b")
    
            # order by deactivated
            self._order_test([self.admin_user, user2, user1], "deactivated")
            self._order_test([self.admin_user, user2, user1], "deactivated", "f")
            self._order_test([user1, self.admin_user, user2], "deactivated", "b")
    
            # order by user_type
            # like sort by ascending name, as no special user type here
            self._order_test([self.admin_user, user1, user2], "user_type")
            self._order_test([self.admin_user, user1, user2], "user_type", "f")
            self._order_test([self.admin_user, user1, user2], "is_guest", "b")
    
            # order by shadow_banned
            self._order_test([self.admin_user, user2, user1], "shadow_banned")
            self._order_test([self.admin_user, user2, user1], "shadow_banned", "f")
            self._order_test([user1, self.admin_user, user2], "shadow_banned", "b")
    
            # order by avatar_url
            self._order_test([self.admin_user, user2, user1], "avatar_url")
            self._order_test([self.admin_user, user2, user1], "avatar_url", "f")
            self._order_test([user1, user2, self.admin_user], "avatar_url", "b")
    
    
            # order by creation_ts
            self._order_test([self.admin_user, user1, user2], "creation_ts")
            self._order_test([self.admin_user, user1, user2], "creation_ts", "f")
            self._order_test([user2, user1, self.admin_user], "creation_ts", "b")
    
    
        def _order_test(
            self,
            expected_user_list: List[str],
            order_by: Optional[str],
            dir: Optional[str] = None,
        ):
            """Request the list of users in a certain order. Assert that order is what
            we expect
            Args:
                expected_user_list: The list of user_id in the order we expect to get
                    back from the server
                order_by: The type of ordering to give the server
                dir: The direction of ordering to give the server
            """
    
            url = self.url + "?deactivated=true&"
            if order_by is not None:
                url += "order_by=%s&" % (order_by,)
            if dir is not None and dir in ("b", "f"):
                url += "dir=%s" % (dir,)
            channel = self.make_request(
                "GET",
    
                access_token=self.admin_user_tok,
            )
            self.assertEqual(200, channel.code, msg=channel.json_body)
            self.assertEqual(channel.json_body["total"], len(expected_user_list))
    
            returned_order = [row["name"] for row in channel.json_body["users"]]
            self.assertEqual(expected_user_list, returned_order)
            self._check_fields(channel.json_body["users"])
    
    
        def _check_fields(self, content: JsonDict):
            """Checks that the expected user attributes are present in content
            Args:
                content: List that is checked for content
            """
            for u in content:
                self.assertIn("name", u)
                self.assertIn("is_guest", u)
                self.assertIn("admin", u)
                self.assertIn("user_type", u)
                self.assertIn("deactivated", u)
    
                self.assertIn("shadow_banned", u)
    
                self.assertIn("displayname", u)
                self.assertIn("avatar_url", u)
    
                self.assertIn("creation_ts", u)
    
    
        def _create_users(self, number_users: int):
            """
            Create a number of users
            Args:
                number_users: Number of users to be created
            """
            for i in range(1, number_users + 1):
                self.register_user(
    
                    "user%d" % i,
                    "pass%d" % i,
                    admin=False,
                    displayname="Name %d" % i,
    
    class DeactivateAccountTestCase(unittest.HomeserverTestCase):
    
        servlets = [
            synapse.rest.admin.register_servlets,
            login.register_servlets,
        ]
    
        def prepare(self, reactor, clock, hs):
            self.store = hs.get_datastore()
    
            self.admin_user = self.register_user("admin", "pass", admin=True)
            self.admin_user_tok = self.login("admin", "pass")
    
            self.other_user = self.register_user("user", "pass", displayname="User1")
            self.other_user_token = self.login("user", "pass")
            self.url_other_user = "/_synapse/admin/v2/users/%s" % urllib.parse.quote(
                self.other_user
            )
            self.url = "/_synapse/admin/v1/deactivate/%s" % urllib.parse.quote(
                self.other_user
            )
    
            # set attributes for user
            self.get_success(
                self.store.set_profile_avatar_url("user", "mxc://servername/mediaid")
            )
            self.get_success(
                self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
            )
    
        def test_no_auth(self):
            """
            Try to deactivate users without authentication.
            """
            channel = self.make_request("POST", self.url, b"{}")
    
    
            self.assertEqual(401, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
    
        def test_requester_is_not_admin(self):
            """
            If the user is not a server admin, an error is returned.
            """
            url = "/_synapse/admin/v1/deactivate/@bob:test"
    
            channel = self.make_request("POST", url, access_token=self.other_user_token)
    
    
            self.assertEqual(403, channel.code, msg=channel.json_body)
    
            self.assertEqual("You are not a server admin", channel.json_body["error"])
    
            channel = self.make_request(
    
                "POST",
                url,
                access_token=self.other_user_token,
                content=b"{}",
    
            self.assertEqual(403, channel.code, msg=channel.json_body)
    
            self.assertEqual("You are not a server admin", channel.json_body["error"])
    
        def test_user_does_not_exist(self):
            """
            Tests that deactivation for a user that does not exist returns a 404
            """
    
            channel = self.make_request(
                "POST",
                "/_synapse/admin/v1/deactivate/@unknown_person:test",
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(404, channel.code, msg=channel.json_body)
            self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
    
        def test_erase_is_not_bool(self):
            """
            If parameter `erase` is not boolean, return an error
            """
    
            channel = self.make_request(
                "POST",
                self.url,
    
                content={"erase": "False"},
    
            self.assertEqual(400, channel.code, msg=channel.json_body)
    
            self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
    
        def test_user_is_not_local(self):
            """
            Tests that deactivation for a user that is not a local returns a 400
            """
            url = "/_synapse/admin/v1/deactivate/@unknown_person:unknown_domain"
    
            channel = self.make_request("POST", url, access_token=self.admin_user_tok)
    
            self.assertEqual(400, channel.code, msg=channel.json_body)