Newer
Older
def test_map_userinfo_to_existing_user(self) -> None:
"""Existing users can log in with OpenID Connect when allow_existing_users is True."""
store = self.hs.get_datastores().main
user = UserID.from_string("@test_user:test")
store.register_user(user_id=user.to_string(), password_hash=None)
# Map a user via SSO.
"sub": "test",
"username": "test_user",
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_called_once_with(
user.to_string(),
self.provider.idp_id,
request,
ANY,
None,
new_user=False,
auth_provider_session_id=None,
self.reset_mocks()
# Subsequent calls should map to the same mxid.
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_called_once_with(
user.to_string(),
self.provider.idp_id,
request,
ANY,
None,
new_user=False,
auth_provider_session_id=None,
self.reset_mocks()
# Note that a second SSO user can be mapped to the same Matrix ID. (This
# requires a unique sub, but something that maps to the same matrix ID,
# in this case we'll just use the same username. A more realistic example
# would be subs which are email addresses, and mapping from the localpart
# of the email, e.g. bob@foo.com and bob@bar.com -> @bob:test.)
userinfo = {
"sub": "test1",
"username": "test_user",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_called_once_with(
user.to_string(),
self.provider.idp_id,
request,
ANY,
None,
new_user=False,
auth_provider_session_id=None,
self.reset_mocks()
# Register some non-exact matching cases.
user2 = UserID.from_string("@TEST_user_2:test")
self.get_success(
store.register_user(user_id=user2.to_string(), password_hash=None)
)
user2_caps = UserID.from_string("@test_USER_2:test")
self.get_success(
store.register_user(user_id=user2_caps.to_string(), password_hash=None)
)
# Attempting to login without matching a name exactly is an error.
userinfo = {
"sub": "test2",
"username": "TEST_USER_2",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
args = self.assertRenderedError("mapping_error")
self.assertTrue(
args[2].startswith(
"Attempted to login as '@TEST_USER_2:test' but it matches more than one user inexactly:"
)
)
# Logging in when matching a name exactly should work.
user2 = UserID.from_string("@TEST_USER_2:test")
self.get_success(
store.register_user(user_id=user2.to_string(), password_hash=None)
)
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_called_once_with(
"@TEST_USER_2:test",
self.provider.idp_id,
request,
ANY,
None,
new_user=False,
auth_provider_session_id=None,
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_map_userinfo_to_invalid_localpart(self) -> None:
"""If the mapping provider generates an invalid localpart it should be rejected."""
userinfo = {"sub": "test2", "username": "föö"}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("mapping_error", "localpart is invalid: föö")
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"user_mapping_provider": {
"module": __name__ + ".TestMappingProviderFailures"
}
}
)
def test_map_userinfo_to_user_retries(self) -> None:
"""The mapping provider can retry generating an MXID if the MXID is already in use."""
store = self.hs.get_datastores().main
self.get_success(
store.register_user(user_id="@test_user:test", password_hash=None)
)
userinfo = {
"sub": "test",
"username": "test_user",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
# test_user is already taken, so test_user1 gets registered instead.
self.complete_sso_login.assert_called_once_with(
"@test_user1:test",
self.provider.idp_id,
request,
ANY,
None,
new_user=True,
auth_provider_session_id=None,
self.reset_mocks()
# Register all of the potential mxids for a particular OIDC username.
self.get_success(
store.register_user(user_id="@tester:test", password_hash=None)
)
for i in range(1, 3):
self.get_success(
store.register_user(user_id="@tester%d:test" % i, password_hash=None)
)
# Now attempt to map to a username, this will fail since all potential usernames are taken.
userinfo = {
"sub": "tester",
"username": "tester",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
self.assertRenderedError(
"mapping_error", "Unable to generate a Matrix ID from the SSO response"
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_empty_localpart(self) -> None:
"""Attempts to map onto an empty localpart should be rejected."""
userinfo = {
"sub": "tester",
"username": "",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("mapping_error", "localpart is invalid: ")
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"user_mapping_provider": {
"config": {"localpart_template": "{{ user.username }}"}
def test_null_localpart(self) -> None:
"""Mapping onto a null localpart via an empty OIDC attribute should be rejected"""
userinfo = {
"sub": "tester",
"username": None,
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("mapping_error", "localpart is invalid: ")
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"attribute_requirements": [{"attribute": "test", "value": "foobar"}],
}
}
)
def test_attribute_requirements(self) -> None:
"""The required attributes must be met from the OIDC userinfo response."""
# userinfo lacking "test": "foobar" attribute should fail.
userinfo = {
"sub": "tester",
"username": "tester",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": "foobar" attribute should succeed.
userinfo = {
"sub": "tester",
"username": "tester",
"test": "foobar",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
# check that the auth handler got called as expected
self.complete_sso_login.assert_called_once_with(
"@tester:test",
self.provider.idp_id,
request,
ANY,
None,
new_user=True,
auth_provider_session_id=None,
)
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"attribute_requirements": [{"attribute": "test", "value": "foobar"}],
}
}
)
def test_attribute_requirements_contains(self) -> None:
"""Test that auth succeeds if userinfo attribute CONTAINS required value"""
# userinfo with "test": ["foobar", "foo", "bar"] attribute should succeed.
userinfo = {
"sub": "tester",
"username": "tester",
"test": ["foobar", "foo", "bar"],
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
# check that the auth handler got called as expected
self.complete_sso_login.assert_called_once_with(
"@tester:test",
self.provider.idp_id,
request,
ANY,
None,
new_user=True,
auth_provider_session_id=None,
)
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"attribute_requirements": [{"attribute": "test", "value": "foobar"}],
}
}
)
def test_attribute_requirements_mismatch(self) -> None:
"""
Test that auth fails if attributes exist but don't match,
or are non-string values.
"""
# userinfo with "test": "not_foobar" attribute should fail
userinfo: dict = {
"sub": "tester",
"username": "tester",
"test": "not_foobar",
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": ["foo", "bar"] attribute should fail
userinfo = {
"sub": "tester",
"username": "tester",
"test": ["foo", "bar"],
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": False attribute should fail
# this is largely just to ensure we don't crash here
userinfo = {
"sub": "tester",
"username": "tester",
"test": False,
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": None attribute should fail
# a value of None breaks the OIDC spec, but it's important to not crash here
userinfo = {
"sub": "tester",
"username": "tester",
"test": None,
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": 1 attribute should fail
# this is largely just to ensure we don't crash here
userinfo = {
"sub": "tester",
"username": "tester",
"test": 1,
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
# userinfo with "test": 3.14 attribute should fail
# this is largely just to ensure we don't crash here
userinfo = {
"sub": "tester",
"username": "tester",
"test": 3.14,
}
request, _ = self.start_authorization(userinfo)
self.get_success(self.handler.handle_oidc_callback(request))
self.complete_sso_login.assert_not_called()
def _generate_oidc_session_token(
self,
state: str,
nonce: str,
client_redirect_url: str,
ui_auth_session_id: str = "",
from synapse.handlers.oidc import OidcSessionData
Quentin Gliech
committed
return self.handler._macaroon_generator.generate_oidc_session_token(
state=state,
session_data=OidcSessionData(
idp_id=self.provider.idp_id,
nonce=nonce,
client_redirect_url=client_redirect_url,
ui_auth_session_id=ui_auth_session_id,
def _build_callback_request(
code: str,
state: str,
session: str,
ip_address: str = "10.0.0.1",
"""Builds a fake SynapseRequest to mock the browser callback
Returns a Mock object which looks like the SynapseRequest we get from a browser
after SSO (before we return to the client)
Args:
code: the authorization code which would have been returned by the OIDC
provider
state: the "state" param which would have been passed around in the
query param. Should be the same as was embedded in the session in
_build_oidc_session.
session: the "session" which would have been passed around in the cookie.
ip_address: the IP address to pretend the request came from
"""
request = Mock(
spec=[
"args",
"getCookie",
"getClientAddress",
request.cookies = []
request.getCookie.return_value = session
request.args = {}
request.args[b"code"] = [code.encode("utf-8")]
request.args[b"state"] = [state.encode("utf-8")]
request.getClientAddress.return_value.host = ip_address