Newer
Older
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
import time
from typing import Dict, Iterable
from parameterized import parameterized
from signedjson import key as key, sign as sign
from twisted.test.proto_helpers import MemoryReactor
Erik Johnston
committed
from synapse.api.constants import RoomEncryptionAlgorithms
from synapse.api.errors import Codes, SynapseError
from synapse.appservice import ApplicationService
from synapse.handlers.device import DeviceHandler
from synapse.server import HomeServer
from synapse.storage.databases.main.appservice import _make_exclusive_regex
Shay
committed
from synapse.types import JsonDict, UserID
from synapse.util import Clock
from tests import unittest
from tests.unittest import override_config
class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.appservice_api = mock.AsyncMock()
return self.setup_test_homeserver(application_service_api=self.appservice_api)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_e2e_keys_handler()
self.store = self.hs.get_datastores().main
Shay
committed
self.requester = UserID.from_string(f"@test_requester:{self.hs.hostname}")
def test_query_local_devices_no_devices(self) -> None:
"""If the user has no devices, we expect an empty list."""
local_user = "@boris:" + self.hs.hostname
res = self.get_success(self.handler.query_local_devices({local_user: None}))
self.assertDictEqual(res, {local_user: {}})
def test_reupload_one_time_keys(self) -> None:
"""we should be able to re-upload the same keys"""
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
"alg1:k1": "key1",
"alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
"alg2:k3": {"key": "key3"},
# Note that "signed_curve25519" is always returned in key count responses. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": keys}
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
# we should be able to change the signature without a problem
keys["alg2:k2"]["signatures"]["k1"] = "sig2"
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": keys}
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
def test_change_one_time_keys(self) -> None:
"""attempts to change one-time-keys should be rejected"""
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
keys = {
"alg1:k1": "key1",
"alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
"alg2:k3": {"key": "key3"},
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": keys}
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
# Error when changing string key
self.get_failure(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}
),
SynapseError,
)
# Error when replacing dict key with string
self.get_failure(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}
),
SynapseError,
)
# Error when replacing string key with dict
self.get_failure(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"one_time_keys": {"alg1:k1": {"key": "key"}}},
),
SynapseError,
)
# Error when replacing dict key
self.get_failure(
self.handler.upload_keys_for_user(
local_user,
device_id,
{
"one_time_keys": {
"alg2:k2": {"key": "key3", "signatures": {"k1": "sig1"}}
}
},
),
SynapseError,
)
def test_claim_one_time_key(self) -> None:
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": {"alg1:k1": "key1"}}
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "signed_curve25519": 0}}
)
# Keys should be returned in the order they were uploaded. To test, advance time
# a little, then upload a second key with an earlier key ID; it should get
# returned second.
self.reactor.advance(1)
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": {"alg1:k0": "key0"}}
)
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 2, "signed_curve25519": 0}}
)
# now claim both keys back. They should be in the same order
res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
{
"failures": {},
"one_time_keys": {local_user: {device_id: {"alg1:k1": "key1"}}},
},
res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
self.requester,
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
res,
{
"failures": {},
"one_time_keys": {local_user: {device_id: {"alg1:k0": "key0"}}},
},
)
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def test_claim_one_time_key_bulk(self) -> None:
"""Like test_claim_one_time_key but claims multiple keys in one handler call."""
# Apologies to the reader. This test is a little too verbose. It is particularly
# tricky to make assertions neatly with all these nested dictionaries in play.
# Three users with two devices each. Each device uses two algorithms.
# Each algorithm is invoked with two keys.
alice = f"@alice:{self.hs.hostname}"
brian = f"@brian:{self.hs.hostname}"
chris = f"@chris:{self.hs.hostname}"
one_time_keys = {
alice: {
"alice_dev_1": {
"alg1:k1": {"dummy_id": 1},
"alg1:k2": {"dummy_id": 2},
"alg2:k3": {"dummy_id": 3},
"alg2:k4": {"dummy_id": 4},
},
"alice_dev_2": {
"alg1:k5": {"dummy_id": 5},
"alg1:k6": {"dummy_id": 6},
"alg2:k7": {"dummy_id": 7},
"alg2:k8": {"dummy_id": 8},
},
},
brian: {
"brian_dev_1": {
"alg1:k9": {"dummy_id": 9},
"alg1:k10": {"dummy_id": 10},
"alg2:k11": {"dummy_id": 11},
"alg2:k12": {"dummy_id": 12},
},
"brian_dev_2": {
"alg1:k13": {"dummy_id": 13},
"alg1:k14": {"dummy_id": 14},
"alg2:k15": {"dummy_id": 15},
"alg2:k16": {"dummy_id": 16},
},
},
chris: {
"chris_dev_1": {
"alg1:k17": {"dummy_id": 17},
"alg1:k18": {"dummy_id": 18},
"alg2:k19": {"dummy_id": 19},
"alg2:k20": {"dummy_id": 20},
},
"chris_dev_2": {
"alg1:k21": {"dummy_id": 21},
"alg1:k22": {"dummy_id": 22},
"alg2:k23": {"dummy_id": 23},
"alg2:k24": {"dummy_id": 24},
},
},
}
for user_id, devices in one_time_keys.items():
for device_id, keys_dict in devices.items():
counts = self.get_success(
self.handler.upload_keys_for_user(
user_id,
device_id,
{"one_time_keys": keys_dict},
)
)
# The upload should report 2 keys per algorithm.
expected_counts = {
"one_time_key_counts": {
# See count_e2e_one_time_keys for why this is hardcoded.
"signed_curve25519": 0,
"alg1": 2,
"alg2": 2,
},
}
self.assertEqual(counts, expected_counts)
# Claim a variety of keys.
# Raw format, easier to make test assertions about.
claims_to_make = {
(alice, "alice_dev_1", "alg1"): 1,
(alice, "alice_dev_1", "alg2"): 2,
(alice, "alice_dev_2", "alg2"): 1,
(brian, "brian_dev_1", "alg1"): 2,
(brian, "brian_dev_2", "alg2"): 9001,
(chris, "chris_dev_2", "alg2"): 1,
}
# Convert to the format the handler wants.
query: Dict[str, Dict[str, Dict[str, int]]] = {}
for (user_id, device_id, algorithm), count in claims_to_make.items():
query.setdefault(user_id, {}).setdefault(device_id, {})[algorithm] = count
claim_res = self.get_success(
self.handler.claim_one_time_keys(
query,
self.requester,
timeout=None,
always_include_fallback_keys=False,
)
)
# No failures, please!
self.assertEqual(claim_res["failures"], {})
# Check that we get exactly the (user, device, algorithm)s we asked for.
got_otks = claim_res["one_time_keys"]
claimed_user_device_algorithms = {
(user_id, device_id, alg_key_id.split(":")[0])
for user_id, devices in got_otks.items()
for device_id, key_dict in devices.items()
for alg_key_id in key_dict
}
self.assertEqual(claimed_user_device_algorithms, set(claims_to_make))
# Now check the keys we got are what we expected.
def assertExactlyOneOtk(
user_id: str, device_id: str, *alg_key_pairs: str
) -> None:
key_dict = got_otks[user_id][device_id]
found = 0
for alg_key in alg_key_pairs:
if alg_key in key_dict:
expected_key_json = one_time_keys[user_id][device_id][alg_key]
self.assertEqual(key_dict[alg_key], expected_key_json)
found += 1
self.assertEqual(found, 1)
def assertAllOtks(user_id: str, device_id: str, *alg_key_pairs: str) -> None:
key_dict = got_otks[user_id][device_id]
for alg_key in alg_key_pairs:
expected_key_json = one_time_keys[user_id][device_id][alg_key]
self.assertEqual(key_dict[alg_key], expected_key_json)
# Expect a single arbitrary key to be returned.
assertExactlyOneOtk(alice, "alice_dev_1", "alg1:k1", "alg1:k2")
assertExactlyOneOtk(alice, "alice_dev_2", "alg2:k7", "alg2:k8")
assertExactlyOneOtk(chris, "chris_dev_2", "alg2:k23", "alg2:k24")
assertAllOtks(alice, "alice_dev_1", "alg2:k3", "alg2:k4")
assertAllOtks(brian, "brian_dev_1", "alg1:k9", "alg1:k10")
assertAllOtks(brian, "brian_dev_2", "alg2:k15", "alg2:k16")
# Now check the unused key counts.
for user_id, devices in one_time_keys.items():
for device_id in devices:
counts_by_alg = self.get_success(
self.store.count_e2e_one_time_keys(user_id, device_id)
)
# Somewhat fiddley to compute the expected count dict.
expected_counts_by_alg = {
"signed_curve25519": 0,
}
for alg in ["alg1", "alg2"]:
claim_count = claims_to_make.get((user_id, device_id, alg), 0)
remaining_count = max(0, 2 - claim_count)
if remaining_count > 0:
expected_counts_by_alg[alg] = remaining_count
self.assertEqual(
counts_by_alg, expected_counts_by_alg, f"{user_id}:{device_id}"
)
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def test_claim_one_time_key_bulk_ordering(self) -> None:
"""Keys returned by the bulk claim call should be returned in the correct order"""
# Alice has lots of keys, uploaded in a specific order
alice = f"@alice:{self.hs.hostname}"
alice_dev = "alice_dev_1"
self.get_success(
self.handler.upload_keys_for_user(
alice,
alice_dev,
{"one_time_keys": {"alg1:k20": 20, "alg1:k21": 21, "alg1:k22": 22}},
)
)
# Advance time by 1s, to ensure that there is a difference in upload time.
self.reactor.advance(1)
self.get_success(
self.handler.upload_keys_for_user(
alice,
alice_dev,
{"one_time_keys": {"alg1:k10": 10, "alg1:k11": 11, "alg1:k12": 12}},
)
)
# Now claim some, and check we get the right ones.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{alice: {alice_dev: {"alg1": 2}}},
self.requester,
timeout=None,
always_include_fallback_keys=False,
)
)
# We should get the first-uploaded keys, even though they have later key ids.
# We should get a random set of two of k20, k21, k22.
self.assertEqual(claim_res["failures"], {})
claimed_keys = claim_res["one_time_keys"]["@alice:test"]["alice_dev_1"]
self.assertEqual(len(claimed_keys), 2)
for key_id in claimed_keys.keys():
self.assertIn(key_id, ["alg1:k20", "alg1:k21", "alg1:k22"])
def test_fallback_key(self) -> None:
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
fallback_key = {"alg1:k1": "fallback_key1"}
fallback_key2 = {"alg1:k2": "fallback_key2"}
fallback_key3 = {"alg1:k2": "fallback_key3"}
otk = {"alg1:k2": "key2"}
# we shouldn't have any unused fallback keys yet
res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(res, [])
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"fallback_keys": fallback_key},
# we should now have an unused alg1 key
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(fallback_res, ["alg1"])
# claiming an OTK when no OTKs are available should return the fallback
# key
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
)
# we shouldn't have any unused fallback keys again
unused_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(unused_res, [])
# claiming an OTK again should return the same fallback key
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
)
# re-uploading the same fallback key should still result in no unused fallback
# keys
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"fallback_keys": fallback_key},
)
)
unused_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(unused_res, [])
# uploading a new fallback key should result in an unused fallback key
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"fallback_keys": fallback_key2},
)
)
unused_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(unused_res, ["alg1"])
# if the user uploads a one-time key, the next claim should fetch the
# one-time key, and then go back to the fallback
self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": otk}
)
)
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
{"failures": {}, "one_time_keys": {local_user: {device_id: otk}}},
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key2}}},
# using the unstable prefix should also set the fallback key
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"org.matrix.msc2732.fallback_keys": fallback_key3},
)
)
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key3}}},
)
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def test_fallback_key_bulk(self) -> None:
"""Like test_fallback_key, but claims multiple keys in one handler call."""
alice = f"@alice:{self.hs.hostname}"
brian = f"@brian:{self.hs.hostname}"
chris = f"@chris:{self.hs.hostname}"
# Have three users upload fallback keys for two devices.
fallback_keys = {
alice: {
"alice_dev_1": {"alg1:k1": "fallback_key1"},
"alice_dev_2": {"alg2:k2": "fallback_key2"},
},
brian: {
"brian_dev_1": {"alg1:k3": "fallback_key3"},
"brian_dev_2": {"alg2:k4": "fallback_key4"},
},
chris: {
"chris_dev_1": {"alg1:k5": "fallback_key5"},
"chris_dev_2": {"alg2:k6": "fallback_key6"},
},
}
for user_id, devices in fallback_keys.items():
for device_id, key_dict in devices.items():
self.get_success(
self.handler.upload_keys_for_user(
user_id,
device_id,
{"fallback_keys": key_dict},
)
)
# Each device should have an unused fallback key.
for user_id, devices in fallback_keys.items():
for device_id in devices:
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
expected_algorithm_name = f"alg{device_id[-1]}"
self.assertEqual(fallback_res, [expected_algorithm_name])
# Claim the fallback key for one device per user.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{
alice: {"alice_dev_1": {"alg1": 1}},
brian: {"brian_dev_2": {"alg2": 1}},
chris: {"chris_dev_2": {"alg2": 1}},
},
self.requester,
timeout=None,
always_include_fallback_keys=False,
)
)
expected_claims = {
alice: {"alice_dev_1": {"alg1:k1": "fallback_key1"}},
brian: {"brian_dev_2": {"alg2:k4": "fallback_key4"}},
chris: {"chris_dev_2": {"alg2:k6": "fallback_key6"}},
}
self.assertEqual(
claim_res,
{"failures": {}, "one_time_keys": expected_claims},
)
for user_id, devices in fallback_keys.items():
for device_id in devices:
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
# Claimed fallback keys should no longer show up as unused.
# Unclaimed fallback keys should still be unused.
if device_id in expected_claims[user_id]:
self.assertEqual(fallback_res, [])
else:
expected_algorithm_name = f"alg{device_id[-1]}"
self.assertEqual(fallback_res, [expected_algorithm_name])
Patrick Cloke
committed
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def test_fallback_key_always_returned(self) -> None:
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
fallback_key = {"alg1:k1": "fallback_key1"}
otk = {"alg1:k2": "key2"}
# we shouldn't have any unused fallback keys yet
res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(res, [])
# Upload a OTK & fallback key.
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"one_time_keys": otk, "fallback_keys": fallback_key},
)
)
# we should now have an unused alg1 key
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(fallback_res, ["alg1"])
# Claiming an OTK and requesting to always return the fallback key should
# return both.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=True,
)
)
self.assertEqual(
claim_res,
{
"failures": {},
"one_time_keys": {local_user: {device_id: {**fallback_key, **otk}}},
},
)
# This should not mark the key as used.
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(fallback_res, ["alg1"])
# Claiming an OTK again should return only the fallback key.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
Shay
committed
self.requester,
Patrick Cloke
committed
timeout=None,
always_include_fallback_keys=True,
)
)
self.assertEqual(
claim_res,
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
)
# And mark it as used.
fallback_res = self.get_success(
self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
)
self.assertEqual(fallback_res, [])
def test_replace_master_key(self) -> None:
"""uploading a new signing key should make the old signing key unavailable"""
local_user = "@boris:" + self.hs.hostname
keys1 = {
"master_key": {
# private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
"user_id": local_user,
"usage": ["master"],
"keys": {
"ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
},
}
}
self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
keys2 = {
"master_key": {
# private key: 4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs
"user_id": local_user,
"usage": ["master"],
"keys": {
"ed25519:Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw": "Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw"
},
}
}
self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys2))
devices = self.get_success(
Patrick Cloke
committed
self.handler.query_devices(
{"device_keys": {local_user: []}}, 0, local_user, "device123"
)
self.assertDictEqual(devices["master_keys"], {local_user: keys2["master_key"]})
def test_reupload_signatures(self) -> None:
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
"""re-uploading a signature should not fail"""
local_user = "@boris:" + self.hs.hostname
keys1 = {
"master_key": {
# private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
"user_id": local_user,
"usage": ["master"],
"keys": {
"ed25519:EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ": "EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ"
},
},
"self_signing_key": {
# private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
"user_id": local_user,
"usage": ["self_signing"],
"keys": {
"ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
},
},
}
master_signing_key = key.decode_signing_key_base64(
"ed25519",
"EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ",
"HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8",
)
sign.sign_json(keys1["self_signing_key"], local_user, master_signing_key)
signing_key = key.decode_signing_key_base64(
"ed25519",
"nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk",
"2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0",
)
self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
# upload two device keys, which will be signed later by the self-signing key
device_key_1: JsonDict = {
"algorithms": [
"m.olm.curve25519-aes-sha2",
RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
],
"keys": {
"ed25519:abc": "base64+ed25519+key",
"curve25519:abc": "base64+curve25519+key",
},
"signatures": {local_user: {"ed25519:abc": "base64+signature"}},
}
device_key_2: JsonDict = {
"algorithms": [
"m.olm.curve25519-aes-sha2",
RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
],
"keys": {
"ed25519:def": "base64+ed25519+key",
"curve25519:def": "base64+curve25519+key",
},
"signatures": {local_user: {"ed25519:def": "base64+signature"}},
}
self.get_success(
self.handler.upload_keys_for_user(
local_user, "abc", {"device_keys": device_key_1}
)
self.get_success(
self.handler.upload_keys_for_user(
local_user, "def", {"device_keys": device_key_2}
)
)
# sign the first device key and upload it
del device_key_1["signatures"]
sign.sign_json(device_key_1, local_user, signing_key)
self.get_success(
self.handler.upload_signatures_for_device_keys(
local_user, {local_user: {"abc": device_key_1}}
)
)
# sign the second device key and upload both device keys. The server
# should ignore the first device key since it already has a valid
# signature for it
del device_key_2["signatures"]
sign.sign_json(device_key_2, local_user, signing_key)
self.get_success(
self.handler.upload_signatures_for_device_keys(
local_user, {local_user: {"abc": device_key_1, "def": device_key_2}}
)
)
device_key_1["signatures"][local_user]["ed25519:abc"] = "base64+signature"
device_key_2["signatures"][local_user]["ed25519:def"] = "base64+signature"
devices = self.get_success(
Patrick Cloke
committed
self.handler.query_devices(
{"device_keys": {local_user: []}}, 0, local_user, "device123"
)
del devices["device_keys"][local_user]["abc"]["unsigned"]
del devices["device_keys"][local_user]["def"]["unsigned"]
self.assertDictEqual(devices["device_keys"][local_user]["abc"], device_key_1)
self.assertDictEqual(devices["device_keys"][local_user]["def"], device_key_2)
def test_self_signing_key_doesnt_show_up_as_device(self) -> None:
"""signing keys should be hidden when fetching a user's devices"""
local_user = "@boris:" + self.hs.hostname
keys1 = {
"master_key": {
# private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
"user_id": local_user,
"usage": ["master"],
"keys": {
"ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
},
}
}
self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
device_handler = self.hs.get_device_handler()
assert isinstance(device_handler, DeviceHandler)
e = self.get_failure(
device_handler.check_device_registered(
user_id=local_user,
device_id="nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk",
initial_device_display_name="new display name",
),
SynapseError,
res = e.value.code
self.assertEqual(res, 400)
query_res = self.get_success(
self.handler.query_local_devices({local_user: None})
)
self.assertDictEqual(query_res, {local_user: {}})
def test_upload_signatures(self) -> None:
"""should check signatures that are uploaded"""
# set up a user with cross-signing keys and a device. This user will
# try uploading signatures
local_user = "@boris:" + self.hs.hostname
device_id = "xyz"
# private key: OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA
device_pubkey = "NnHhnqiMFQkq969szYkooLaBAXW244ZOxgukCvm2ZeY"
device_key: JsonDict = {
"user_id": local_user,
"device_id": device_id,
"algorithms": [
"m.olm.curve25519-aes-sha2",
RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
],
"keys": {"curve25519:xyz": "curve25519+key", "ed25519:xyz": device_pubkey},
"signatures": {local_user: {"ed25519:xyz": "something"}},
}
device_signing_key = key.decode_signing_key_base64(
"ed25519", "xyz", "OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA"
self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"device_keys": device_key}
)
)
# private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
master_key: JsonDict = {
"user_id": local_user,
"usage": ["master"],
}
master_signing_key = key.decode_signing_key_base64(
"ed25519", master_pubkey, "2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0"
)
usersigning_pubkey = "Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw"
usersigning_key = {
# private key: 4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs
"user_id": local_user,
"usage": ["user_signing"],
"keys": {"ed25519:" + usersigning_pubkey: usersigning_pubkey},
}
usersigning_signing_key = key.decode_signing_key_base64(
"ed25519", usersigning_pubkey, "4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs"
)
sign.sign_json(usersigning_key, local_user, master_signing_key)
# private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
selfsigning_pubkey = "EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ"
selfsigning_key = {
"user_id": local_user,
"usage": ["self_signing"],
"keys": {"ed25519:" + selfsigning_pubkey: selfsigning_pubkey},
}
selfsigning_signing_key = key.decode_signing_key_base64(
"ed25519", selfsigning_pubkey, "HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8"
)
sign.sign_json(selfsigning_key, local_user, master_signing_key)
cross_signing_keys = {
"master_key": master_key,
"user_signing_key": usersigning_key,
"self_signing_key": selfsigning_key,
}
self.get_success(
self.handler.upload_signing_keys_for_user(local_user, cross_signing_keys)
)
# set up another user with a master key. This user will be signed by
# the first user
other_user = "@otherboris:" + self.hs.hostname
other_master_pubkey = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
other_master_key: JsonDict = {
# private key: oyw2ZUx0O4GifbfFYM0nQvj9CL0b8B7cyN4FprtK8OI
"user_id": other_user,
"usage": ["master"],
"keys": {"ed25519:" + other_master_pubkey: other_master_pubkey},
self.get_success(
self.handler.upload_signing_keys_for_user(
other_user, {"master_key": other_master_key}
)
# test various signature failures (see below)
ret = self.get_success(
self.handler.upload_signatures_for_device_keys(
local_user,
{
local_user: {
# fails because the signature is invalid
# should fail with INVALID_SIGNATURE
device_id: {
"user_id": local_user,
"device_id": device_id,
"algorithms": [
"m.olm.curve25519-aes-sha2",
RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
],
"keys": {
"curve25519:xyz": "curve25519+key",
# private key: OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA
"ed25519:xyz": device_pubkey,
},
"signatures": {
local_user: {
"ed25519:" + selfsigning_pubkey: "something"
}
},
# fails because device is unknown
# should fail with NOT_FOUND
"unknown": {
"user_id": local_user,
"device_id": "unknown",
"signatures": {
local_user: {
"ed25519:" + selfsigning_pubkey: "something"
}
},
# fails because the signature is invalid
# should fail with INVALID_SIGNATURE
master_pubkey: {