Newer
Older
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Union
from twisted.internet.defer import succeed
from synapse.api.constants import LoginType
from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker
from synapse.http.site import SynapseRequest
from synapse.rest.client.v1 import login
from synapse.rest.client.v2_alpha import auth, devices, register
from synapse.types import JsonDict
from tests.server import FakeChannel
class DummyRecaptchaChecker(UserInteractiveAuthChecker):
def __init__(self, hs):
super().__init__(hs)
self.recaptcha_attempts = []
def check_auth(self, authdict, clientip):
self.recaptcha_attempts.append((authdict, clientip))
return succeed(True)
class DummyPasswordChecker(UserInteractiveAuthChecker):
def check_auth(self, authdict, clientip):
return succeed(authdict["identifier"]["user"])
class FallbackAuthTests(unittest.HomeserverTestCase):
servlets = [
auth.register_servlets,
register.register_servlets,
]
hijack_auth = False
def make_homeserver(self, reactor, clock):
config = self.default_config()
Amber Brown
committed
config["enable_registration_captcha"] = True
config["recaptcha_public_key"] = "brokencake"
config["registrations_require_3pid"] = []
hs = self.setup_test_homeserver(config=config)
return hs
def prepare(self, reactor, clock, hs):
self.recaptcha_checker = DummyRecaptchaChecker(hs)
auth_handler.checkers[LoginType.RECAPTCHA] = self.recaptcha_checker
def register(self, expected_response: int, body: JsonDict) -> FakeChannel:
"""Make a register request."""
request, channel = self.make_request(
"POST", "register", body
) # type: SynapseRequest, FakeChannel
self.assertEqual(request.code, expected_response)
return channel
def recaptcha(
self, session: str, expected_post_response: int, post_session: str = None
) -> None:
"""Get and respond to a fallback recaptcha. Returns the second request."""
if post_session is None:
post_session = session
request, channel = self.make_request(
"GET", "auth/m.login.recaptcha/fallback/web?session=" + session
) # type: SynapseRequest, FakeChannel
self.render(request)
self.assertEqual(request.code, 200)
request, channel = self.make_request(
"POST",
"auth/m.login.recaptcha/fallback/web?session="
+ post_session
+ "&g-recaptcha-response=a",
)
self.render(request)
self.assertEqual(request.code, expected_post_response)
# The recaptcha handler is called with the response given
attempts = self.recaptcha_checker.recaptcha_attempts
self.assertEqual(len(attempts), 1)
self.assertEqual(attempts[0][0]["response"], "a")
@unittest.INFO
def test_fallback_captcha(self):
"""Ensure that fallback auth via a captcha works."""
# Returns a 401 as per the spec
channel = self.register(
401, {"username": "user", "type": "m.login.password", "password": "bar"},
# Grab the session
session = channel.json_body["session"]
# Assert our configured public key is being given
self.assertEqual(
channel.json_body["params"]["m.login.recaptcha"]["public_key"], "brokencake"
)
# Complete the recaptcha step.
self.recaptcha(session, 200)
# also complete the dummy auth
self.register(200, {"auth": {"session": session, "type": "m.login.dummy"}})
# Now we should have fulfilled a complete auth flow, including
# the recaptcha fallback step, we can then send a
# request to the register API with the session in the authdict.
channel = self.register(200, {"auth": {"session": session}})
# We're given a registered user.
self.assertEqual(channel.json_body["user_id"], "@user:test")
def test_legacy_registration(self):
Registration allows the parameters to vary through the process.
"""
# Make the initial request to register. (Later on a different password
# will be used.)
# Returns a 401 as per the spec
channel = self.register(
401, {"username": "user", "type": "m.login.password", "password": "bar"},
)
# Grab the session
session = channel.json_body["session"]
# Assert our configured public key is being given
self.assertEqual(
channel.json_body["params"]["m.login.recaptcha"]["public_key"], "brokencake"
)
# Complete the recaptcha step.
self.recaptcha(session, 200)
# also complete the dummy auth
self.register(200, {"auth": {"session": session, "type": "m.login.dummy"}})
# Now we should have fulfilled a complete auth flow, including
# the recaptcha fallback step. Make the initial request again, but
# with a changed password. This still completes.
channel = self.register(
200,
{
"username": "user",
"type": "m.login.password",
"password": "foo", # Note that this is different.
"auth": {"session": session},
},
)
# We're given a registered user.
self.assertEqual(channel.json_body["user_id"], "@user:test")
def test_complete_operation_unknown_session(self):
"""
Attempting to mark an invalid session as complete should error.
"""
# Make the initial request to register. (Later on a different password
# will be used.)
# Returns a 401 as per the spec
channel = self.register(
401, {"username": "user", "type": "m.login.password", "password": "bar"}
)
# Grab the session
session = channel.json_body["session"]
# Assert our configured public key is being given
self.assertEqual(
channel.json_body["params"]["m.login.recaptcha"]["public_key"], "brokencake"
)
# Attempt to complete the recaptcha step with an unknown session.
# This results in an error.
self.recaptcha(session, 400, session + "unknown")
class UIAuthTests(unittest.HomeserverTestCase):
servlets = [
auth.register_servlets,
devices.register_servlets,
login.register_servlets,
synapse.rest.admin.register_servlets_for_client_rest_resource,
register.register_servlets,
]
def prepare(self, reactor, clock, hs):
auth_handler = hs.get_auth_handler()
auth_handler.checkers[LoginType.PASSWORD] = DummyPasswordChecker(hs)
self.user_pass = "pass"
self.user = self.register_user("test", self.user_pass)
self.user_tok = self.login("test", self.user_pass)
def get_device_ids(self) -> List[str]:
# Get the list of devices so one can be deleted.
request, channel = self.make_request(
"GET", "devices", access_token=self.user_tok,
) # type: SynapseRequest, FakeChannel
self.render(request)
# Get the ID of the device.
self.assertEqual(request.code, 200)
return [d["device_id"] for d in channel.json_body["devices"]]
def delete_device(
self, device: str, expected_response: int, body: Union[bytes, JsonDict] = b""
) -> FakeChannel:
"""Delete an individual device."""
request, channel = self.make_request(
"DELETE", "devices/" + device, body, access_token=self.user_tok
) # type: SynapseRequest, FakeChannel
self.render(request)
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# Ensure the response is sane.
self.assertEqual(request.code, expected_response)
return channel
def delete_devices(self, expected_response: int, body: JsonDict) -> FakeChannel:
"""Delete 1 or more devices."""
# Note that this uses the delete_devices endpoint so that we can modify
# the payload half-way through some tests.
request, channel = self.make_request(
"POST", "delete_devices", body, access_token=self.user_tok,
) # type: SynapseRequest, FakeChannel
self.render(request)
# Ensure the response is sane.
self.assertEqual(request.code, expected_response)
return channel
def test_ui_auth(self):
"""
Test user interactive authentication outside of registration.
"""
device_id = self.get_device_ids()[0]
# Attempt to delete this device.
# Returns a 401 as per the spec
channel = self.delete_device(device_id, 401)
# Grab the session
session = channel.json_body["session"]
# Ensure that flows are what is expected.
self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
# Make another request providing the UI auth flow.
self.delete_device(
device_id,
200,
{
"auth": {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": self.user},
"password": self.user_pass,
"session": session,
},
},
)
def test_cannot_change_body(self):
"""
The initial requested client dict cannot be modified during the user interactive authentication session.
"""
# Create a second login.
self.login("test", self.user_pass)
device_ids = self.get_device_ids()
self.assertEqual(len(device_ids), 2)
# Attempt to delete the first device.
# Returns a 401 as per the spec
channel = self.delete_devices(401, {"devices": [device_ids[0]]})
# Grab the session
session = channel.json_body["session"]
# Ensure that flows are what is expected.
self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
# Make another request providing the UI auth flow, but try to delete the
# second device. This results in an error.
self.delete_devices(
403,
{
"devices": [device_ids[1]],
"auth": {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": self.user},
"password": self.user_pass,
"session": session,
},
},
)
def test_cannot_change_uri(self):
"""
The initial requested URI cannot be modified during the user interactive authentication session.
"""
# Create a second login.
self.login("test", self.user_pass)
device_ids = self.get_device_ids()
self.assertEqual(len(device_ids), 2)
# Attempt to delete the first device.
# Returns a 401 as per the spec
channel = self.delete_device(device_ids[0], 401)
# Grab the session
session = channel.json_body["session"]
# Ensure that flows are what is expected.
self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
# Make another request providing the UI auth flow, but try to delete the
# second device. This results in an error.
self.delete_device(
device_ids[1],
403,
{
"auth": {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": self.user},
"password": self.user_pass,
"session": session,
},
},
)