Skip to content
Snippets Groups Projects
test_auth.py 42.4 KiB
Newer Older
  • Learn to ignore specific revisions
  •         self.assertEqual(login_response.code, HTTPStatus.OK, login_response.result)
    
    
            # This first refresh should work properly
            first_refresh_response = self.make_request(
                "POST",
    
                {"refresh_token": login_response.json_body["refresh_token"]},
            )
            self.assertEqual(
    
                first_refresh_response.code, HTTPStatus.OK, first_refresh_response.result
    
            )
    
            # This one as well, since the token in the first one was never used
            second_refresh_response = self.make_request(
                "POST",
    
                {"refresh_token": login_response.json_body["refresh_token"]},
            )
            self.assertEqual(
    
                second_refresh_response.code, HTTPStatus.OK, second_refresh_response.result
    
            )
    
            # This one should not, since the token from the first refresh is not valid anymore
            third_refresh_response = self.make_request(
                "POST",
    
                {"refresh_token": first_refresh_response.json_body["refresh_token"]},
            )
            self.assertEqual(
    
                third_refresh_response.code,
                HTTPStatus.UNAUTHORIZED,
                third_refresh_response.result,
    
            )
    
            # The associated access token should also be invalid
            whoami_response = self.make_request(
                "GET",
                "/_matrix/client/r0/account/whoami",
                access_token=first_refresh_response.json_body["access_token"],
            )
    
            self.assertEqual(
                whoami_response.code, HTTPStatus.UNAUTHORIZED, whoami_response.result
            )
    
    
            # But all other tokens should work (they will expire after some time)
            for access_token in [
                second_refresh_response.json_body["access_token"],
                login_response.json_body["access_token"],
            ]:
                whoami_response = self.make_request(
                    "GET", "/_matrix/client/r0/account/whoami", access_token=access_token
                )
    
                self.assertEqual(
                    whoami_response.code, HTTPStatus.OK, whoami_response.result
                )
    
    
            # Now that the access token from the last valid refresh was used once, refreshing with the N-1 token should fail
            fourth_refresh_response = self.make_request(
                "POST",
    
                {"refresh_token": login_response.json_body["refresh_token"]},
            )
            self.assertEqual(
    
                fourth_refresh_response.code,
                HTTPStatus.FORBIDDEN,
                fourth_refresh_response.result,
    
            )
    
            # But refreshing from the last valid refresh token still works
            fifth_refresh_response = self.make_request(
                "POST",
    
                {"refresh_token": second_refresh_response.json_body["refresh_token"]},
            )
            self.assertEqual(
    
                fifth_refresh_response.code, HTTPStatus.OK, fifth_refresh_response.result
    
        def test_many_token_refresh(self) -> None:
    
            """
            If a refresh is performed many times during a session, there shouldn't be
            extra 'cruft' built up over time.
    
            This test was written specifically to troubleshoot a case where logout
            was very slow if a lot of refreshes had been performed for the session.
            """
    
            def _refresh(refresh_token: str) -> Tuple[str, str]:
                """
                Performs one refresh, returning the next refresh token and access token.
                """
                refresh_response = self.use_refresh_token(refresh_token)
                self.assertEqual(
                    refresh_response.code, HTTPStatus.OK, refresh_response.result
                )
                return (
                    refresh_response.json_body["refresh_token"],
                    refresh_response.json_body["access_token"],
                )
    
            def _table_length(table_name: str) -> int:
                """
                Helper to get the size of a table, in rows.
                For testing only; trivially vulnerable to SQL injection.
                """
    
                def _txn(txn: LoggingTransaction) -> int:
                    txn.execute(f"SELECT COUNT(1) FROM {table_name}")
                    row = txn.fetchone()
                    # Query is infallible
                    assert row is not None
                    return row[0]
    
                return self.get_success(
                    self.hs.get_datastores().main.db_pool.runInteraction(
                        "_table_length", _txn
                    )
                )
    
            # Before we log in, there are no access tokens.
            self.assertEqual(_table_length("access_tokens"), 0)
            self.assertEqual(_table_length("refresh_tokens"), 0)
    
            body = {
                "type": "m.login.password",
                "user": "test",
                "password": self.user_pass,
                "refresh_token": True,
            }
            login_response = self.make_request(
                "POST",
                "/_matrix/client/v3/login",
                body,
            )
            self.assertEqual(login_response.code, HTTPStatus.OK, login_response.result)
    
            access_token = login_response.json_body["access_token"]
            refresh_token = login_response.json_body["refresh_token"]
    
            # Now that we have logged in, there should be one access token and one
            # refresh token
            self.assertEqual(_table_length("access_tokens"), 1)
            self.assertEqual(_table_length("refresh_tokens"), 1)
    
            for _ in range(5):
                refresh_token, access_token = _refresh(refresh_token)
    
            # After 5 sequential refreshes, there should only be the latest two
            # refresh/access token pairs.
            # (The last one is preserved because it's in use!
            # The one before that is preserved because it can still be used to
            # replace the last token pair, in case of e.g. a network interruption.)
            self.assertEqual(_table_length("access_tokens"), 2)
            self.assertEqual(_table_length("refresh_tokens"), 2)
    
            logout_response = self.make_request(
                "POST", "/_matrix/client/v3/logout", {}, access_token=access_token
            )
            self.assertEqual(logout_response.code, HTTPStatus.OK, logout_response.result)
    
            # Now that we have logged in, there should be no access token
            # and no refresh token
            self.assertEqual(_table_length("access_tokens"), 0)
            self.assertEqual(_table_length("refresh_tokens"), 0)