Skip to content
Snippets Groups Projects
test_user.py 118 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2018 New Vector Ltd
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import hashlib
    import hmac
    import json
    
    from binascii import unhexlify
    
    from typing import List, Optional
    
    from unittest.mock import Mock
    
    
    import synapse.rest.admin
    from synapse.api.constants import UserTypes
    
    from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
    
    from synapse.api.room_versions import RoomVersions
    
    from synapse.rest.client.v1 import login, logout, profile, room
    from synapse.rest.client.v2_alpha import devices, sync
    
    from synapse.types import JsonDict, UserID
    
    from tests.server import FakeSite, make_request
    
    from tests.test_utils import make_awaitable
    
    from tests.unittest import override_config
    
    
    
    class UserRegisterTestCase(unittest.HomeserverTestCase):
    
    
        servlets = [
            synapse.rest.admin.register_servlets_for_client_rest_resource,
            profile.register_servlets,
        ]
    
            self.url = "/_synapse/admin/v1/register"
    
    
            self.registration_handler = Mock()
            self.identity_handler = Mock()
            self.login_handler = Mock()
            self.device_handler = Mock()
            self.device_handler.check_device_registered = Mock(return_value="FAKE")
    
            self.datastore = Mock(return_value=Mock())
            self.datastore.get_current_state_deltas = Mock(return_value=(0, []))
    
            self.secrets = Mock()
    
            self.hs = self.setup_test_homeserver()
    
            self.hs.config.registration_shared_secret = "shared"
    
            self.hs.get_media_repository = Mock()
            self.hs.get_deactivate_account_handler = Mock()
    
            return self.hs
    
        def test_disabled(self):
            """
            If there is no shared secret, registration through this method will be
            prevented.
            """
            self.hs.config.registration_shared_secret = None
    
    
            channel = self.make_request("POST", self.url, b"{}")
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(
                "Shared secret registration is not enabled", channel.json_body["error"]
            )
    
        def test_get_nonce(self):
            """
            Calling GET on the endpoint will return a randomised nonce, using the
            homeserver's secrets provider.
            """
            secrets = Mock()
            secrets.token_hex = Mock(return_value="abcd")
    
            self.hs.get_secrets = Mock(return_value=secrets)
    
    
            channel = self.make_request("GET", self.url)
    
    
            self.assertEqual(channel.json_body, {"nonce": "abcd"})
    
        def test_expired_nonce(self):
            """
            Calling GET on the endpoint will return a randomised nonce, which will
            only last for SALT_TIMEOUT (60s).
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            # 59 seconds
            self.reactor.advance(59)
    
            body = json.dumps({"nonce": nonce})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("username must be specified", channel.json_body["error"])
    
            # 61 seconds
            self.reactor.advance(2)
    
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("unrecognised nonce", channel.json_body["error"])
    
        def test_register_incorrect_nonce(self):
            """
            Only the provided nonce can be used, as it's checked in the MAC.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob",
                    "password": "abc123",
                    "admin": True,
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("HMAC incorrect", channel.json_body["error"])
    
        def test_register_correct_nonce(self):
            """
            When the correct nonce is provided, and the right key is provided, the
            user is registered.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(
                nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
            )
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob",
                    "password": "abc123",
                    "admin": True,
                    "user_type": UserTypes.SUPPORT,
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
        def test_nonce_reuse(self):
            """
            A valid unrecognised nonce.
            """
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob",
                    "password": "abc123",
                    "admin": True,
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
            # Now, try and reuse it
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("unrecognised nonce", channel.json_body["error"])
    
        def test_missing_parts(self):
            """
            Synapse will complain if you don't give nonce, username, password, and
            mac.  Admin and user_types are optional.  Additional checks are done for length
            and type.
            """
    
            def nonce():
    
                channel = self.make_request("GET", self.url)
    
                return channel.json_body["nonce"]
    
            #
            # Nonce check
            #
    
            # Must be present
            body = json.dumps({})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("nonce must be specified", channel.json_body["error"])
    
            #
            # Username checks
            #
    
            # Must be present
            body = json.dumps({"nonce": nonce()})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("username must be specified", channel.json_body["error"])
    
            # Must be a string
            body = json.dumps({"nonce": nonce(), "username": 1234})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            # Must not have null bytes
            body = json.dumps({"nonce": nonce(), "username": "abcd\u0000"})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            # Must not have null bytes
            body = json.dumps({"nonce": nonce(), "username": "a" * 1000})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid username", channel.json_body["error"])
    
            #
            # Password checks
            #
    
            # Must be present
            body = json.dumps({"nonce": nonce(), "username": "a"})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("password must be specified", channel.json_body["error"])
    
            # Must be a string
            body = json.dumps({"nonce": nonce(), "username": "a", "password": 1234})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            # Must not have null bytes
            body = json.dumps({"nonce": nonce(), "username": "a", "password": "abcd\u0000"})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            # Super long
            body = json.dumps({"nonce": nonce(), "username": "a", "password": "A" * 1000})
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid password", channel.json_body["error"])
    
            #
            # user_type check
            #
    
            # Invalid user_type
            body = json.dumps(
                {
                    "nonce": nonce(),
                    "username": "a",
                    "password": "1234",
                    "user_type": "invalid",
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Invalid user type", channel.json_body["error"])
    
    
        def test_displayname(self):
            """
            Test that displayname of new user is set
            """
    
            # set no displayname
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob1\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {"nonce": nonce, "username": "bob1", "password": "abc123", "mac": want_mac}
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob1:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob1:test/displayname")
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("bob1", channel.json_body["displayname"])
    
            # displayname is None
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob2\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob2",
                    "displayname": None,
                    "password": "abc123",
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob2:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob2:test/displayname")
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("bob2", channel.json_body["displayname"])
    
            # displayname is empty
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob3\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob3",
                    "displayname": "",
                    "password": "abc123",
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob3:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob3:test/displayname")
    
            self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"])
    
            # set displayname
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(nonce.encode("ascii") + b"\x00bob4\x00abc123\x00notadmin")
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob4",
                    "displayname": "Bob's Name",
                    "password": "abc123",
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob4:test", channel.json_body["user_id"])
    
    
            channel = self.make_request("GET", "/profile/@bob4:test/displayname")
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("Bob's Name", channel.json_body["displayname"])
    
    
        @override_config(
            {"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
        )
        def test_register_mau_limit_reached(self):
            """
            Check we can register a user via the shared secret registration API
            even if the MAU limit is reached.
            """
            handler = self.hs.get_registration_handler()
            store = self.hs.get_datastore()
    
            # Set monthly active users to the limit
    
            store.get_monthly_active_count = Mock(
    
                return_value=make_awaitable(self.hs.config.max_mau_value)
    
            # Check that the blocking of monthly active users is working as expected
            # The registration of a new user fails due to the limit
            self.get_failure(
                handler.register_user(localpart="local_part"), ResourceLimitError
            )
    
            # Register new user with admin API
    
            channel = self.make_request("GET", self.url)
    
            nonce = channel.json_body["nonce"]
    
            want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
            want_mac.update(
                nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
            )
            want_mac = want_mac.hexdigest()
    
            body = json.dumps(
                {
                    "nonce": nonce,
                    "username": "bob",
                    "password": "abc123",
                    "admin": True,
                    "user_type": UserTypes.SUPPORT,
                    "mac": want_mac,
                }
            )
    
            channel = self.make_request("POST", self.url, body.encode("utf8"))
    
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("@bob:test", channel.json_body["user_id"])
    
    
    
    class UsersListTestCase(unittest.HomeserverTestCase):
    
        servlets = [
            synapse.rest.admin.register_servlets,
            login.register_servlets,
        ]
        url = "/_synapse/admin/v2/users"
    
        def prepare(self, reactor, clock, hs):
    
            self.store = hs.get_datastore()
    
    
            self.admin_user = self.register_user("admin", "pass", admin=True)
            self.admin_user_tok = self.login("admin", "pass")
    
        def test_no_auth(self):
            """
            Try to list users without authentication.
            """
    
            channel = self.make_request("GET", self.url, b"{}")
    
    
            self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
    
            self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
    
        def test_requester_is_no_admin(self):
            """
            If the user is not a server admin, an error is returned.
            """
    
            self._create_users(1)
    
            other_user_token = self.login("user1", "pass1")
    
    
            channel = self.make_request("GET", self.url, access_token=other_user_token)
    
    
            self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
    
    
        def test_all_users(self):
            """
            List all users, including deactivated users.
            """
    
            self._create_users(2)
    
    
                "GET",
                self.url + "?deactivated=true",
                b"{}",
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(3, len(channel.json_body["users"]))
    
            self.assertEqual(3, channel.json_body["total"])
    
            self._check_fields(channel.json_body["users"])
    
    
        def test_search_term(self):
            """Test that searching for a users works correctly"""
    
            def _search_test(
                expected_user_id: Optional[str],
                search_term: str,
                search_field: Optional[str] = "name",
                expected_http_code: Optional[int] = 200,
            ):
                """Search for a user and check that the returned user's id is a match
    
                Args:
                    expected_user_id: The user_id expected to be returned by the API. Set
                        to None to expect zero results for the search
                    search_term: The term to search for user names with
                    search_field: Field which is to request: `name` or `user_id`
                    expected_http_code: The expected http code for the request
                """
    
                url = self.url + "?%s=%s" % (
                    search_field,
                    search_term,
                )
    
                channel = self.make_request(
    
                    "GET",
                    url.encode("ascii"),
                    access_token=self.admin_user_tok,
    
                )
                self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
    
                if expected_http_code != 200:
                    return
    
                # Check that users were returned
                self.assertTrue("users" in channel.json_body)
    
                self._check_fields(channel.json_body["users"])
    
                users = channel.json_body["users"]
    
                # Check that the expected number of users were returned
                expected_user_count = 1 if expected_user_id else 0
                self.assertEqual(len(users), expected_user_count)
                self.assertEqual(channel.json_body["total"], expected_user_count)
    
                if expected_user_id:
                    # Check that the first returned user id is correct
                    u = users[0]
                    self.assertEqual(expected_user_id, u["name"])
    
    
            self._create_users(2)
    
            user1 = "@user1:test"
            user2 = "@user2:test"
    
    
            _search_test(user1, "er1")
            _search_test(user1, "me 1")
    
            _search_test(user2, "er2")
            _search_test(user2, "me 2")
    
            _search_test(user1, "er1", "user_id")
            _search_test(user2, "er2", "user_id")
    
            _search_test(user1, "ER1")
            _search_test(user1, "NAME 1")
    
            _search_test(user2, "ER2")
            _search_test(user2, "NAME 2")
    
            _search_test(user1, "ER1", "user_id")
            _search_test(user2, "ER2", "user_id")
    
    
            _search_test(None, "foo")
            _search_test(None, "bar")
    
            _search_test(None, "foo", "user_id")
            _search_test(None, "bar", "user_id")
    
    
        def test_invalid_parameter(self):
            """
            If parameters are invalid, an error is returned.
            """
    
            # negative limit
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=-5",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
    
            # negative from
            channel = self.make_request(
    
                "GET",
                self.url + "?from=-5",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
    
            # invalid guests
            channel = self.make_request(
    
                "GET",
                self.url + "?guests=not_bool",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
            # invalid deactivated
            channel = self.make_request(
    
                "GET",
                self.url + "?deactivated=not_bool",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
    
            # unkown order_by
            channel = self.make_request(
                "GET",
                self.url + "?order_by=bar",
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
            # invalid search order
            channel = self.make_request(
                "GET",
                self.url + "?dir=bar",
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
    
    
        def test_limit(self):
            """
            Testing list of users with limit
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=5",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 5)
            self.assertEqual(channel.json_body["next_token"], "5")
            self._check_fields(channel.json_body["users"])
    
        def test_from(self):
            """
            Testing list of users with a defined starting point (from)
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?from=5",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 15)
            self.assertNotIn("next_token", channel.json_body)
            self._check_fields(channel.json_body["users"])
    
        def test_limit_and_from(self):
            """
            Testing list of users with a defined starting point and limit
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            channel = self.make_request(
    
                "GET",
                self.url + "?from=5&limit=10",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(channel.json_body["next_token"], "15")
            self.assertEqual(len(channel.json_body["users"]), 10)
            self._check_fields(channel.json_body["users"])
    
        def test_next_token(self):
            """
            Testing that `next_token` appears at the right place
            """
    
            number_users = 20
            # Create one less user (since there's already an admin user).
            self._create_users(number_users - 1)
    
            #  `next_token` does not appear
            # Number of results is the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=20",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), number_users)
            self.assertNotIn("next_token", channel.json_body)
    
            #  `next_token` does not appear
            # Number of max results is larger than the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=21",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), number_users)
            self.assertNotIn("next_token", channel.json_body)
    
            #  `next_token` does appear
            # Number of max results is smaller than the number of entries
            channel = self.make_request(
    
                "GET",
                self.url + "?limit=19",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 19)
            self.assertEqual(channel.json_body["next_token"], "19")
    
            # Check
            # Set `from` to value of `next_token` for request remaining entries
            #  `next_token` does not appear
            channel = self.make_request(
    
                "GET",
                self.url + "?from=19",
                access_token=self.admin_user_tok,
    
            )
    
            self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(channel.json_body["total"], number_users)
            self.assertEqual(len(channel.json_body["users"]), 1)
            self.assertNotIn("next_token", channel.json_body)
    
    
        def test_order_by(self):
            """
            Testing order list with parameter `order_by`
            """
    
            user1 = self.register_user("user1", "pass1", admin=False, displayname="Name Z")
            user2 = self.register_user("user2", "pass2", admin=False, displayname="Name Y")
    
            # Modify user
            self.get_success(self.store.set_user_deactivated_status(user1, True))
            self.get_success(self.store.set_shadow_banned(UserID.from_string(user1), True))
    
            # Set avatar URL to all users, that no user has a NULL value to avoid
            # different sort order between SQlite and PostreSQL
            self.get_success(self.store.set_profile_avatar_url("user1", "mxc://url3"))
            self.get_success(self.store.set_profile_avatar_url("user2", "mxc://url2"))
            self.get_success(self.store.set_profile_avatar_url("admin", "mxc://url1"))
    
            # order by default (name)
            self._order_test([self.admin_user, user1, user2], None)
            self._order_test([self.admin_user, user1, user2], None, "f")
            self._order_test([user2, user1, self.admin_user], None, "b")
    
            # order by name
            self._order_test([self.admin_user, user1, user2], "name")
            self._order_test([self.admin_user, user1, user2], "name", "f")
            self._order_test([user2, user1, self.admin_user], "name", "b")
    
            # order by displayname
            self._order_test([user2, user1, self.admin_user], "displayname")
            self._order_test([user2, user1, self.admin_user], "displayname", "f")
            self._order_test([self.admin_user, user1, user2], "displayname", "b")
    
            # order by is_guest
            # like sort by ascending name, as no guest user here
            self._order_test([self.admin_user, user1, user2], "is_guest")
            self._order_test([self.admin_user, user1, user2], "is_guest", "f")
            self._order_test([self.admin_user, user1, user2], "is_guest", "b")
    
            # order by admin
            self._order_test([user1, user2, self.admin_user], "admin")
            self._order_test([user1, user2, self.admin_user], "admin", "f")
            self._order_test([self.admin_user, user1, user2], "admin", "b")
    
            # order by deactivated
            self._order_test([self.admin_user, user2, user1], "deactivated")
            self._order_test([self.admin_user, user2, user1], "deactivated", "f")
            self._order_test([user1, self.admin_user, user2], "deactivated", "b")
    
            # order by user_type
            # like sort by ascending name, as no special user type here
            self._order_test([self.admin_user, user1, user2], "user_type")
            self._order_test([self.admin_user, user1, user2], "user_type", "f")
            self._order_test([self.admin_user, user1, user2], "is_guest", "b")
    
            # order by shadow_banned
            self._order_test([self.admin_user, user2, user1], "shadow_banned")
            self._order_test([self.admin_user, user2, user1], "shadow_banned", "f")
            self._order_test([user1, self.admin_user, user2], "shadow_banned", "b")
    
            # order by avatar_url
            self._order_test([self.admin_user, user2, user1], "avatar_url")
            self._order_test([self.admin_user, user2, user1], "avatar_url", "f")
            self._order_test([user1, user2, self.admin_user], "avatar_url", "b")
    
        def _order_test(
            self,
            expected_user_list: List[str],
            order_by: Optional[str],
            dir: Optional[str] = None,
        ):
            """Request the list of users in a certain order. Assert that order is what
            we expect
            Args:
                expected_user_list: The list of user_id in the order we expect to get
                    back from the server
                order_by: The type of ordering to give the server
                dir: The direction of ordering to give the server
            """
    
            url = self.url + "?deactivated=true&"
            if order_by is not None:
                url += "order_by=%s&" % (order_by,)
            if dir is not None and dir in ("b", "f"):
                url += "dir=%s" % (dir,)
            channel = self.make_request(
                "GET",
                url.encode("ascii"),
                access_token=self.admin_user_tok,
            )
            self.assertEqual(200, channel.code, msg=channel.json_body)
            self.assertEqual(channel.json_body["total"], len(expected_user_list))
    
            returned_order = [row["name"] for row in channel.json_body["users"]]
            self.assertEqual(expected_user_list, returned_order)
            self._check_fields(channel.json_body["users"])
    
    
        def _check_fields(self, content: JsonDict):
            """Checks that the expected user attributes are present in content
            Args:
                content: List that is checked for content
            """
            for u in content:
                self.assertIn("name", u)
                self.assertIn("is_guest", u)
                self.assertIn("admin", u)
                self.assertIn("user_type", u)
                self.assertIn("deactivated", u)
    
                self.assertIn("shadow_banned", u)
    
                self.assertIn("displayname", u)
                self.assertIn("avatar_url", u)
    
        def _create_users(self, number_users: int):
            """
            Create a number of users
            Args:
                number_users: Number of users to be created
            """
            for i in range(1, number_users + 1):
                self.register_user(
    
                    "user%d" % i,
                    "pass%d" % i,
                    admin=False,
                    displayname="Name %d" % i,
    
    class DeactivateAccountTestCase(unittest.HomeserverTestCase):
    
        servlets = [
            synapse.rest.admin.register_servlets,
            login.register_servlets,
        ]
    
        def prepare(self, reactor, clock, hs):
            self.store = hs.get_datastore()
    
            self.admin_user = self.register_user("admin", "pass", admin=True)
            self.admin_user_tok = self.login("admin", "pass")
    
            self.other_user = self.register_user("user", "pass", displayname="User1")
            self.other_user_token = self.login("user", "pass")
            self.url_other_user = "/_synapse/admin/v2/users/%s" % urllib.parse.quote(
                self.other_user
            )
            self.url = "/_synapse/admin/v1/deactivate/%s" % urllib.parse.quote(
                self.other_user
            )
    
            # set attributes for user
            self.get_success(
                self.store.set_profile_avatar_url("user", "mxc://servername/mediaid")
            )
            self.get_success(
                self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
            )
    
        def test_no_auth(self):
            """
            Try to deactivate users without authentication.
            """
            channel = self.make_request("POST", self.url, b"{}")
    
            self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
    
        def test_requester_is_not_admin(self):
            """
            If the user is not a server admin, an error is returned.
            """
            url = "/_synapse/admin/v1/deactivate/@bob:test"
    
            channel = self.make_request("POST", url, access_token=self.other_user_token)
    
            self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("You are not a server admin", channel.json_body["error"])
    
            channel = self.make_request(
    
                "POST",
                url,
                access_token=self.other_user_token,
                content=b"{}",
    
            )
    
            self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual("You are not a server admin", channel.json_body["error"])
    
        def test_user_does_not_exist(self):
            """
            Tests that deactivation for a user that does not exist returns a 404
            """
    
            channel = self.make_request(
                "POST",
                "/_synapse/admin/v1/deactivate/@unknown_person:test",
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(404, channel.code, msg=channel.json_body)
            self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
    
        def test_erase_is_not_bool(self):
            """
            If parameter `erase` is not boolean, return an error
            """
            body = json.dumps({"erase": "False"})
    
            channel = self.make_request(
                "POST",
                self.url,
                content=body.encode(encoding="utf_8"),
                access_token=self.admin_user_tok,
            )
    
            self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
            self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
    
        def test_user_is_not_local(self):
            """