Skip to content
Snippets Groups Projects
test_oidc.py 46 KiB
Newer Older
  • Learn to ignore specific revisions
  •         """The required attributes must be met from the OIDC userinfo response."""
            auth_handler = self.hs.get_auth_handler()
            auth_handler.complete_sso_login = simple_async_mock()
    
            # userinfo lacking "test": "foobar" attribute should fail.
            userinfo = {
                "sub": "tester",
                "username": "tester",
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": "foobar" attribute should succeed.
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": "foobar",
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
    
            # check that the auth handler got called as expected
            auth_handler.complete_sso_login.assert_called_once_with(
                "@tester:test", "oidc", ANY, ANY, None, new_user=True
            )
    
        @override_config(
            {
                "oidc_config": {
                    **DEFAULT_CONFIG,
                    "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
                }
            }
        )
        def test_attribute_requirements_contains(self):
            """Test that auth succeeds if userinfo attribute CONTAINS required value"""
            auth_handler = self.hs.get_auth_handler()
            auth_handler.complete_sso_login = simple_async_mock()
            # userinfo with "test": ["foobar", "foo", "bar"] attribute should succeed.
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": ["foobar", "foo", "bar"],
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
    
            # check that the auth handler got called as expected
            auth_handler.complete_sso_login.assert_called_once_with(
                "@tester:test", "oidc", ANY, ANY, None, new_user=True
            )
    
        @override_config(
            {
                "oidc_config": {
                    **DEFAULT_CONFIG,
                    "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
                }
            }
        )
        def test_attribute_requirements_mismatch(self):
            """
            Test that auth fails if attributes exist but don't match,
            or are non-string values.
            """
            auth_handler = self.hs.get_auth_handler()
            auth_handler.complete_sso_login = simple_async_mock()
            # userinfo with "test": "not_foobar" attribute should fail
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": "not_foobar",
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": ["foo", "bar"] attribute should fail
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": ["foo", "bar"],
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": False attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": False,
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": None attribute should fail
            # a value of None breaks the OIDC spec, but it's important to not crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": None,
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": 1 attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": 1,
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
            # userinfo with "test": 3.14 attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": 3.14,
            }
            self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
            auth_handler.complete_sso_login.assert_not_called()
    
    
        def _generate_oidc_session_token(
            self,
            state: str,
            nonce: str,
            client_redirect_url: str,
    
        ) -> str:
            from synapse.handlers.oidc_handler import OidcSessionData
    
            return self.handler._token_generator.generate_oidc_session_token(
                state=state,
                session_data=OidcSessionData(
    
                    nonce=nonce,
                    client_redirect_url=client_redirect_url,
                    ui_auth_session_id=ui_auth_session_id,
                ),
            )
    
    
    async def _make_callback_with_userinfo(
        hs: HomeServer, userinfo: dict, client_redirect_url: str = "http://client/redirect"
    ) -> None:
        """Mock up an OIDC callback with the given userinfo dict
    
        We'll pull out the OIDC handler from the homeserver, stub out a couple of methods,
        and poke in the userinfo dict as if it were the response to an OIDC userinfo call.
    
        Args:
            hs: the HomeServer impl to send the callback to.
            userinfo: the OIDC userinfo dict
            client_redirect_url: the URL to redirect to on success.
        """
    
        from synapse.handlers.oidc_handler import OidcSessionData
    
    
        handler = hs.get_oidc_handler()
    
        provider = handler._providers["oidc"]
    
        provider._exchange_code = simple_async_mock(return_value={})
        provider._parse_id_token = simple_async_mock(return_value=userinfo)
        provider._fetch_userinfo = simple_async_mock(return_value=userinfo)
    
        session = handler._token_generator.generate_oidc_session_token(
    
            session_data=OidcSessionData(
    
                idp_id="oidc",
                nonce="nonce",
                client_redirect_url=client_redirect_url,
    
        )
        request = _build_callback_request("code", state, session)
    
        await handler.handle_oidc_callback(request)
    
    
    def _build_callback_request(
        code: str,
        state: str,
        session: str,
        user_agent: str = "Browser",
        ip_address: str = "10.0.0.1",
    ):
        """Builds a fake SynapseRequest to mock the browser callback
    
        Returns a Mock object which looks like the SynapseRequest we get from a browser
        after SSO (before we return to the client)
    
        Args:
            code: the authorization code which would have been returned by the OIDC
               provider
            state: the "state" param which would have been passed around in the
               query param. Should be the same as was embedded in the session in
               _build_oidc_session.
            session: the "session" which would have been passed around in the cookie.
            user_agent: the user-agent to present
            ip_address: the IP address to pretend the request came from
        """
        request = Mock(
            spec=[
                "args",
                "getCookie",
    
                "requestHeaders",
                "getClientIP",
    
        request.getCookie.return_value = session
        request.args = {}
        request.args[b"code"] = [code.encode("utf-8")]
        request.args[b"state"] = [state.encode("utf-8")]
        request.getClientIP.return_value = ip_address
        return request