Skip to content
Snippets Groups Projects
test_oidc.py 53.1 KiB
Newer Older
  • Learn to ignore specific revisions
  •     def test_map_userinfo_to_existing_user(self) -> None:
    
            """Existing users can log in with OpenID Connect when allow_existing_users is True."""
    
            store = self.hs.get_datastores().main
    
            user = UserID.from_string("@test_user:test")
    
                store.register_user(user_id=user.to_string(), password_hash=None)
    
                "sub": "test",
                "username": "test_user",
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=False,
                auth_provider_session_id=None,
    
            # Subsequent calls should map to the same mxid.
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=False,
                auth_provider_session_id=None,
    
            # Note that a second SSO user can be mapped to the same Matrix ID. (This
            # requires a unique sub, but something that maps to the same matrix ID,
            # in this case we'll just use the same username. A more realistic example
            # would be subs which are email addresses, and mapping from the localpart
            # of the email, e.g. bob@foo.com and bob@bar.com -> @bob:test.)
            userinfo = {
                "sub": "test1",
                "username": "test_user",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=False,
                auth_provider_session_id=None,
    
            # Register some non-exact matching cases.
            user2 = UserID.from_string("@TEST_user_2:test")
            self.get_success(
                store.register_user(user_id=user2.to_string(), password_hash=None)
            )
            user2_caps = UserID.from_string("@test_USER_2:test")
            self.get_success(
                store.register_user(user_id=user2_caps.to_string(), password_hash=None)
            )
    
            # Attempting to login without matching a name exactly is an error.
            userinfo = {
                "sub": "test2",
                "username": "TEST_USER_2",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
            args = self.assertRenderedError("mapping_error")
    
                    "Attempted to login as '@TEST_USER_2:test' but it matches more than one user inexactly:"
                )
            )
    
            # Logging in when matching a name exactly should work.
            user2 = UserID.from_string("@TEST_USER_2:test")
            self.get_success(
                store.register_user(user_id=user2.to_string(), password_hash=None)
            )
    
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=False,
                auth_provider_session_id=None,
    
        @override_config({"oidc_config": DEFAULT_CONFIG})
    
        def test_map_userinfo_to_invalid_localpart(self) -> None:
    
            """If the mapping provider generates an invalid localpart it should be rejected."""
    
            userinfo = {"sub": "test2", "username": "föö"}
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
            self.assertRenderedError("mapping_error", "localpart is invalid: föö")
    
                    "user_mapping_provider": {
                        "module": __name__ + ".TestMappingProviderFailures"
    
        def test_map_userinfo_to_user_retries(self) -> None:
    
            """The mapping provider can retry generating an MXID if the MXID is already in use."""
    
            store = self.hs.get_datastores().main
    
            self.get_success(
                store.register_user(user_id="@test_user:test", password_hash=None)
            )
            userinfo = {
                "sub": "test",
                "username": "test_user",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
            # test_user is already taken, so test_user1 gets registered instead.
    
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=True,
                auth_provider_session_id=None,
    
            # Register all of the potential mxids for a particular OIDC username.
    
            self.get_success(
                store.register_user(user_id="@tester:test", password_hash=None)
            )
            for i in range(1, 3):
                self.get_success(
                    store.register_user(user_id="@tester%d:test" % i, password_hash=None)
                )
    
            # Now attempt to map to a username, this will fail since all potential usernames are taken.
            userinfo = {
                "sub": "tester",
                "username": "tester",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
            self.assertRenderedError(
                "mapping_error", "Unable to generate a Matrix ID from the SSO response"
    
        @override_config({"oidc_config": DEFAULT_CONFIG})
    
        def test_empty_localpart(self) -> None:
    
            """Attempts to map onto an empty localpart should be rejected."""
            userinfo = {
                "sub": "tester",
                "username": "",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
            self.assertRenderedError("mapping_error", "localpart is invalid: ")
    
        @override_config(
            {
                "oidc_config": {
    
                    "user_mapping_provider": {
                        "config": {"localpart_template": "{{ user.username }}"}
    
        def test_null_localpart(self) -> None:
    
            """Mapping onto a null localpart via an empty OIDC attribute should be rejected"""
            userinfo = {
                "sub": "tester",
                "username": None,
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
            self.assertRenderedError("mapping_error", "localpart is invalid: ")
    
    
        @override_config(
            {
                "oidc_config": {
                    **DEFAULT_CONFIG,
                    "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
                }
            }
        )
    
        def test_attribute_requirements(self) -> None:
    
            """The required attributes must be met from the OIDC userinfo response."""
            # userinfo lacking "test": "foobar" attribute should fail.
            userinfo = {
                "sub": "tester",
                "username": "tester",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": "foobar" attribute should succeed.
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": "foobar",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
    
            # check that the auth handler got called as expected
    
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=True,
                auth_provider_session_id=None,
    
            )
    
        @override_config(
            {
                "oidc_config": {
                    **DEFAULT_CONFIG,
                    "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
                }
            }
        )
    
        def test_attribute_requirements_contains(self) -> None:
    
            """Test that auth succeeds if userinfo attribute CONTAINS required value"""
            # userinfo with "test": ["foobar", "foo", "bar"] attribute should succeed.
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": ["foobar", "foo", "bar"],
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
    
    
            # check that the auth handler got called as expected
    
            self.complete_sso_login.assert_called_once_with(
    
                ANY,
                None,
                new_user=True,
                auth_provider_session_id=None,
    
            )
    
        @override_config(
            {
                "oidc_config": {
                    **DEFAULT_CONFIG,
                    "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
                }
            }
        )
    
        def test_attribute_requirements_mismatch(self) -> None:
    
            """
            Test that auth fails if attributes exist but don't match,
            or are non-string values.
            """
            # userinfo with "test": "not_foobar" attribute should fail
    
                "sub": "tester",
                "username": "tester",
                "test": "not_foobar",
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": ["foo", "bar"] attribute should fail
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": ["foo", "bar"],
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": False attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": False,
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": None attribute should fail
            # a value of None breaks the OIDC spec, but it's important to not crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": None,
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": 1 attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": 1,
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
    
            # userinfo with "test": 3.14 attribute should fail
            # this is largely just to ensure we don't crash here
            userinfo = {
                "sub": "tester",
                "username": "tester",
                "test": 3.14,
            }
    
            request, _ = self.start_authorization(userinfo)
            self.get_success(self.handler.handle_oidc_callback(request))
            self.complete_sso_login.assert_not_called()
    
        def _generate_oidc_session_token(
            self,
            state: str,
            nonce: str,
            client_redirect_url: str,
    
            from synapse.handlers.oidc import OidcSessionData
    
            return self.handler._macaroon_generator.generate_oidc_session_token(
    
                state=state,
                session_data=OidcSessionData(
    
                    nonce=nonce,
                    client_redirect_url=client_redirect_url,
                    ui_auth_session_id=ui_auth_session_id,
    
    def _build_callback_request(
        code: str,
        state: str,
        session: str,
        ip_address: str = "10.0.0.1",
    
        """Builds a fake SynapseRequest to mock the browser callback
    
        Returns a Mock object which looks like the SynapseRequest we get from a browser
        after SSO (before we return to the client)
    
        Args:
            code: the authorization code which would have been returned by the OIDC
               provider
            state: the "state" param which would have been passed around in the
               query param. Should be the same as was embedded in the session in
               _build_oidc_session.
            session: the "session" which would have been passed around in the cookie.
            ip_address: the IP address to pretend the request came from
        """
        request = Mock(
            spec=[
                "args",
                "getCookie",
    
        request.getCookie.return_value = session
        request.args = {}
        request.args[b"code"] = [code.encode("utf-8")]
        request.args[b"state"] = [state.encode("utf-8")]
    
        request.getClientAddress.return_value.host = ip_address