Skip to content
Snippets Groups Projects
test_auth.py 38.7 KiB
Newer Older
  • Learn to ignore specific revisions
  •             Performs one refresh, returning the next refresh token and access token.
                """
                refresh_response = self.use_refresh_token(refresh_token)
                self.assertEqual(
                    refresh_response.code, HTTPStatus.OK, refresh_response.result
                )
                return (
                    refresh_response.json_body["refresh_token"],
                    refresh_response.json_body["access_token"],
                )
    
            def _table_length(table_name: str) -> int:
                """
                Helper to get the size of a table, in rows.
                For testing only; trivially vulnerable to SQL injection.
                """
    
                def _txn(txn: LoggingTransaction) -> int:
                    txn.execute(f"SELECT COUNT(1) FROM {table_name}")
                    row = txn.fetchone()
                    # Query is infallible
                    assert row is not None
                    return row[0]
    
                return self.get_success(
                    self.hs.get_datastores().main.db_pool.runInteraction(
                        "_table_length", _txn
                    )
                )
    
            # Before we log in, there are no access tokens.
            self.assertEqual(_table_length("access_tokens"), 0)
            self.assertEqual(_table_length("refresh_tokens"), 0)
    
            body = {
                "type": "m.login.password",
                "user": "test",
                "password": self.user_pass,
                "refresh_token": True,
            }
            login_response = self.make_request(
                "POST",
                "/_matrix/client/v3/login",
                body,
            )
            self.assertEqual(login_response.code, HTTPStatus.OK, login_response.result)
    
            access_token = login_response.json_body["access_token"]
            refresh_token = login_response.json_body["refresh_token"]
    
            # Now that we have logged in, there should be one access token and one
            # refresh token
            self.assertEqual(_table_length("access_tokens"), 1)
            self.assertEqual(_table_length("refresh_tokens"), 1)
    
            for _ in range(5):
                refresh_token, access_token = _refresh(refresh_token)
    
            # After 5 sequential refreshes, there should only be the latest two
            # refresh/access token pairs.
            # (The last one is preserved because it's in use!
            # The one before that is preserved because it can still be used to
            # replace the last token pair, in case of e.g. a network interruption.)
            self.assertEqual(_table_length("access_tokens"), 2)
            self.assertEqual(_table_length("refresh_tokens"), 2)
    
            logout_response = self.make_request(
                "POST", "/_matrix/client/v3/logout", {}, access_token=access_token
            )
            self.assertEqual(logout_response.code, HTTPStatus.OK, logout_response.result)
    
            # Now that we have logged in, there should be no access token
            # and no refresh token
            self.assertEqual(_table_length("access_tokens"), 0)
            self.assertEqual(_table_length("refresh_tokens"), 0)