Skip to content
Snippets Groups Projects
oidc.py 11.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022 The Matrix.org Foundation C.I.C.
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import json
    from typing import Any, Dict, List, Optional, Tuple
    from unittest.mock import Mock, patch
    from urllib.parse import parse_qs
    
    import attr
    
    from twisted.web.http_headers import Headers
    from twisted.web.iweb import IResponse
    
    from synapse.server import HomeServer
    from synapse.util import Clock
    from synapse.util.stringutils import random_string
    
    from tests.test_utils import FakeResponse
    
    
    @attr.s(slots=True, frozen=True, auto_attribs=True)
    class FakeAuthorizationGrant:
        userinfo: dict
        client_id: str
        redirect_uri: str
        scope: str
        nonce: Optional[str]
        sid: Optional[str]
    
    
    class FakeOidcServer:
        """A fake OpenID Connect Provider."""
    
        # All methods here are mocks, so we can track when they are called, and override
        # their values
        request: Mock
        get_jwks_handler: Mock
        get_metadata_handler: Mock
        get_userinfo_handler: Mock
        post_token_handler: Mock
    
        def __init__(self, clock: Clock, issuer: str):
            from authlib.jose import ECKey, KeySet
    
            self._clock = clock
            self.issuer = issuer
    
            self.request = Mock(side_effect=self._request)
            self.get_jwks_handler = Mock(side_effect=self._get_jwks_handler)
            self.get_metadata_handler = Mock(side_effect=self._get_metadata_handler)
            self.get_userinfo_handler = Mock(side_effect=self._get_userinfo_handler)
            self.post_token_handler = Mock(side_effect=self._post_token_handler)
    
            # A code -> grant mapping
            self._authorization_grants: Dict[str, FakeAuthorizationGrant] = {}
            # An access token -> grant mapping
            self._sessions: Dict[str, FakeAuthorizationGrant] = {}
    
            # We generate here an ECDSA key with the P-256 curve (ES256 algorithm) used for
            # signing JWTs. ECDSA keys are really quick to generate compared to RSA.
            self._key = ECKey.generate_key(crv="P-256", is_private=True)
            self._jwks = KeySet([ECKey.import_key(self._key.as_pem(is_private=False))])
    
            self._id_token_overrides: Dict[str, Any] = {}
    
        def reset_mocks(self):
            self.request.reset_mock()
            self.get_jwks_handler.reset_mock()
            self.get_metadata_handler.reset_mock()
            self.get_userinfo_handler.reset_mock()
            self.post_token_handler.reset_mock()
    
        def patch_homeserver(self, hs: HomeServer):
            """Patch the ``HomeServer`` HTTP client to handle requests through the ``FakeOidcServer``.
    
            This patch should be used whenever the HS is expected to perform request to the
            OIDC provider, e.g.::
    
                fake_oidc_server = self.helper.fake_oidc_server()
                with fake_oidc_server.patch_homeserver(hs):
                    self.make_request("GET", "/_matrix/client/r0/login/sso/redirect")
            """
            return patch.object(hs.get_proxied_http_client(), "request", self.request)
    
        @property
        def authorization_endpoint(self) -> str:
            return self.issuer + "authorize"
    
        @property
        def token_endpoint(self) -> str:
            return self.issuer + "token"
    
        @property
        def userinfo_endpoint(self) -> str:
            return self.issuer + "userinfo"
    
        @property
        def metadata_endpoint(self) -> str:
            return self.issuer + ".well-known/openid-configuration"
    
        @property
        def jwks_uri(self) -> str:
            return self.issuer + "jwks"
    
        def get_metadata(self) -> dict:
            return {
                "issuer": self.issuer,
                "authorization_endpoint": self.authorization_endpoint,
                "token_endpoint": self.token_endpoint,
                "jwks_uri": self.jwks_uri,
                "userinfo_endpoint": self.userinfo_endpoint,
                "response_types_supported": ["code"],
                "subject_types_supported": ["public"],
                "id_token_signing_alg_values_supported": ["ES256"],
            }
    
        def get_jwks(self) -> dict:
            return self._jwks.as_dict()
    
        def get_userinfo(self, access_token: str) -> Optional[dict]:
            """Given an access token, get the userinfo of the associated session."""
            session = self._sessions.get(access_token, None)
            if session is None:
                return None
            return session.userinfo
    
        def _sign(self, payload: dict) -> str:
            from authlib.jose import JsonWebSignature
    
            jws = JsonWebSignature()
            kid = self.get_jwks()["keys"][0]["kid"]
            protected = {"alg": "ES256", "kid": kid}
            json_payload = json.dumps(payload)
            return jws.serialize_compact(protected, json_payload, self._key).decode("utf-8")
    
        def generate_id_token(self, grant: FakeAuthorizationGrant) -> str:
            now = self._clock.time()
            id_token = {
                **grant.userinfo,
                "iss": self.issuer,
                "aud": grant.client_id,
                "iat": now,
                "nbf": now,
                "exp": now + 600,
            }
    
            if grant.nonce is not None:
                id_token["nonce"] = grant.nonce
    
            if grant.sid is not None:
                id_token["sid"] = grant.sid
    
            id_token.update(self._id_token_overrides)
    
            return self._sign(id_token)
    
        def id_token_override(self, overrides: dict):
            """Temporarily patch the ID token generated by the token endpoint."""
            return patch.object(self, "_id_token_overrides", overrides)
    
        def start_authorization(
            self,
            client_id: str,
            scope: str,
            redirect_uri: str,
            userinfo: dict,
            nonce: Optional[str] = None,
            with_sid: bool = False,
        ) -> Tuple[str, FakeAuthorizationGrant]:
            """Start an authorization request, and get back the code to use on the authorization endpoint."""
            code = random_string(10)
            sid = None
            if with_sid:
                sid = random_string(10)
    
            grant = FakeAuthorizationGrant(
                userinfo=userinfo,
                scope=scope,
                redirect_uri=redirect_uri,
                nonce=nonce,
                client_id=client_id,
                sid=sid,
            )
            self._authorization_grants[code] = grant
    
            return code, grant
    
        def exchange_code(self, code: str) -> Optional[Dict[str, Any]]:
            grant = self._authorization_grants.pop(code, None)
            if grant is None:
                return None
    
            access_token = random_string(10)
            self._sessions[access_token] = grant
    
            token = {
                "token_type": "Bearer",
                "access_token": access_token,
                "expires_in": 3600,
                "scope": grant.scope,
            }
    
            if "openid" in grant.scope:
                token["id_token"] = self.generate_id_token(grant)
    
            return dict(token)
    
        def buggy_endpoint(
            self,
            *,
            jwks: bool = False,
            metadata: bool = False,
            token: bool = False,
            userinfo: bool = False,
        ):
            """A context which makes a set of endpoints return a 500 error.
    
            Args:
                jwks: If True, makes the JWKS endpoint return a 500 error.
                metadata: If True, makes the OIDC Discovery endpoint return a 500 error.
                token: If True, makes the token endpoint return a 500 error.
                userinfo: If True, makes the userinfo endpoint return a 500 error.
            """
            buggy = FakeResponse(code=500, body=b"Internal server error")
    
            patches = {}
            if jwks:
                patches["get_jwks_handler"] = Mock(return_value=buggy)
            if metadata:
                patches["get_metadata_handler"] = Mock(return_value=buggy)
            if token:
                patches["post_token_handler"] = Mock(return_value=buggy)
            if userinfo:
                patches["get_userinfo_handler"] = Mock(return_value=buggy)
    
            return patch.multiple(self, **patches)
    
        async def _request(
            self,
            method: str,
            uri: str,
            data: Optional[bytes] = None,
            headers: Optional[Headers] = None,
        ) -> IResponse:
            """The override of the SimpleHttpClient#request() method"""
            access_token: Optional[str] = None
    
            if headers is None:
                headers = Headers()
    
            # Try to find the access token in the headers if any
            auth_headers = headers.getRawHeaders(b"Authorization")
            if auth_headers:
                parts = auth_headers[0].split(b" ")
                if parts[0] == b"Bearer" and len(parts) == 2:
                    access_token = parts[1].decode("ascii")
    
            if method == "POST":
                # If the method is POST, assume it has an url-encoded body
                if data is None or headers.getRawHeaders(b"Content-Type") != [
                    b"application/x-www-form-urlencoded"
                ]:
                    return FakeResponse.json(code=400, payload={"error": "invalid_request"})
    
                params = parse_qs(data.decode("utf-8"))
    
                if uri == self.token_endpoint:
                    # Even though this endpoint should be protected, this does not check
                    # for client authentication. We're not checking it for simplicity,
                    # and because client authentication is tested in other standalone tests.
                    return self.post_token_handler(params)
    
            elif method == "GET":
                if uri == self.jwks_uri:
                    return self.get_jwks_handler()
                elif uri == self.metadata_endpoint:
                    return self.get_metadata_handler()
                elif uri == self.userinfo_endpoint:
                    return self.get_userinfo_handler(access_token=access_token)
    
            return FakeResponse(code=404, body=b"404 not found")
    
        # Request handlers
        def _get_jwks_handler(self) -> IResponse:
            """Handles requests to the JWKS URI."""
            return FakeResponse.json(payload=self.get_jwks())
    
        def _get_metadata_handler(self) -> IResponse:
            """Handles requests to the OIDC well-known document."""
            return FakeResponse.json(payload=self.get_metadata())
    
        def _get_userinfo_handler(self, access_token: Optional[str]) -> IResponse:
            """Handles requests to the userinfo endpoint."""
            if access_token is None:
                return FakeResponse(code=401)
            user_info = self.get_userinfo(access_token)
            if user_info is None:
                return FakeResponse(code=401)
    
            return FakeResponse.json(payload=user_info)
    
        def _post_token_handler(self, params: Dict[str, List[str]]) -> IResponse:
            """Handles requests to the token endpoint."""
            code = params.get("code", [])
    
            if len(code) != 1:
                return FakeResponse.json(code=400, payload={"error": "invalid_request"})
    
            grant = self.exchange_code(code=code[0])
            if grant is None:
                return FakeResponse.json(code=400, payload={"error": "invalid_grant"})
    
            return FakeResponse.json(payload=grant)