Skip to content
Snippets Groups Projects
test_email.py 17.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2018 New Vector
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from typing import Dict, List, Sequence, Tuple
    
    Erik Johnston's avatar
    Erik Johnston committed
    import attr
    
    import pkg_resources
    
    from twisted.internet.defer import Deferred
    
    
    import synapse.rest.admin
    
    from synapse.api.errors import Codes, SynapseError
    
    from synapse.rest.client import login, room
    
    
    from tests.unittest import HomeserverTestCase
    
    
    
    Erik Johnston's avatar
    Erik Johnston committed
    @attr.s
    
    class _User:
    
    Erik Johnston's avatar
    Erik Johnston committed
        "Helper wrapper for user ID and access token"
        id = attr.ib()
        token = attr.ib()
    
    
    
    class EmailPusherTests(HomeserverTestCase):
    
        servlets = [
    
            synapse.rest.admin.register_servlets_for_client_rest_resource,
    
            room.register_servlets,
            login.register_servlets,
        ]
        user_id = True
        hijack_auth = False
    
        def make_homeserver(self, reactor, clock):
    
            config = self.default_config()
    
            config["email"] = {
                "enable_notifs": True,
                "template_dir": os.path.abspath(
    
    Amber Brown's avatar
    Amber Brown committed
                    pkg_resources.resource_filename("synapse", "res/templates")
    
                ),
                "expiry_template_html": "notice_expiry.html",
                "expiry_template_text": "notice_expiry.txt",
                "notif_template_html": "notif_mail.html",
                "notif_template_text": "notif_mail.txt",
                "smtp_host": "127.0.0.1",
                "smtp_port": 20,
                "require_transport_security": False,
                "smtp_user": None,
                "smtp_pass": None,
                "app_name": "Matrix",
                "notif_from": "test@example.com",
                "riot_base_url": None,
            }
    
            config["public_baseurl"] = "http://aaa"
    
            hs = self.setup_test_homeserver(config=config)
    
            # List[Tuple[Deferred, args, kwargs]]
    
            self.email_attempts: List[Tuple[Deferred, Sequence, Dict]] = []
    
                # This mocks out synapse.reactor.send_email._sendmail.
    
                d = Deferred()
                self.email_attempts.append((d, args, kwargs))
                return d
    
            hs.get_send_email_handler()._sendmail = sendmail
    
    Erik Johnston's avatar
    Erik Johnston committed
        def prepare(self, reactor, clock, hs):
    
            # Register the user who gets notified
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.user_id = self.register_user("user", "pass")
            self.access_token = self.login("user", "pass")
    
            # Register other users
            self.others = [
                _User(
                    id=self.register_user("otheruser1", "pass"),
                    token=self.login("otheruser1", "pass"),
                ),
                _User(
                    id=self.register_user("otheruser2", "pass"),
                    token=self.login("otheruser2", "pass"),
                ),
            ]
    
    
            # Register the pusher
            user_tuple = self.get_success(
    
    Erik Johnston's avatar
    Erik Johnston committed
                self.hs.get_datastore().get_user_by_access_token(self.access_token)
    
            self.token_id = user_tuple.token_id
    
            # We need to add email to account before we can create a pusher.
            self.get_success(
                hs.get_datastore().user_add_threepid(
                    self.user_id, "email", "a@example.com", 0, 0
                )
            )
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.pusher = self.get_success(
    
                self.hs.get_pusherpool().add_pusher(
    
    Erik Johnston's avatar
    Erik Johnston committed
                    user_id=self.user_id,
    
                    kind="email",
                    app_id="m.email",
                    app_display_name="Email Notifications",
                    device_display_name="a@example.com",
                    pushkey="a@example.com",
                    lang=None,
                    data={},
                )
            )
    
    
            self.auth_handler = hs.get_auth_handler()
    
    
        def test_need_validated_email(self):
            """Test that we can only add an email pusher if the user has validated
            their email.
            """
            with self.assertRaises(SynapseError) as cm:
                self.get_success_or_raise(
                    self.hs.get_pusherpool().add_pusher(
                        user_id=self.user_id,
                        access_token=self.token_id,
                        kind="email",
                        app_id="m.email",
                        app_display_name="Email Notifications",
                        device_display_name="b@example.com",
                        pushkey="b@example.com",
                        lang=None,
                        data={},
                    )
                )
    
            self.assertEqual(400, cm.exception.code)
            self.assertEqual(Codes.THREEPID_NOT_FOUND, cm.exception.errcode)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
        def test_simple_sends_email(self):
            # Create a simple room with two users
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.invite(
    
    Amber Brown's avatar
    Amber Brown committed
                room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
            self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
    
            # The other user sends a single message.
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
    
    Erik Johnston's avatar
    Erik Johnston committed
            # We should get emailed about that message
            self._check_for_mail()
    
            # The other user sends multiple messages.
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
            self.helper.send(room, body="There!", tok=self.others[0].token)
    
            self._check_for_mail()
    
    
        def test_invite_sends_email(self):
            # Create a room and invite the user to it
            room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
            self.helper.invite(
                room=room,
                src=self.others[0].id,
                tok=self.others[0].token,
                targ=self.user_id,
            )
    
            # We should get emailed about the invite
            self._check_for_mail()
    
        def test_invite_to_empty_room_sends_email(self):
            # Create a room and invite the user to it
            room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
            self.helper.invite(
                room=room,
                src=self.others[0].id,
                tok=self.others[0].token,
                targ=self.user_id,
            )
    
            # Then have the original user leave
            self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
    
            # We should get emailed about the invite
            self._check_for_mail()
    
    
    Erik Johnston's avatar
    Erik Johnston committed
        def test_multiple_members_email(self):
            # We want to test multiple notifications, so we pause processing of push
            # while we send messages.
            self.pusher._pause_processing()
    
            # Create a simple room with multiple other users
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
    
            for other in self.others:
                self.helper.invite(
    
    Amber Brown's avatar
    Amber Brown committed
                    room=room, src=self.user_id, tok=self.access_token, targ=other.id
    
    Erik Johnston's avatar
    Erik Johnston committed
                )
                self.helper.join(room=room, user=other.id, tok=other.token)
    
            # The other users send some messages
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
            self.helper.send(room, body="There!", tok=self.others[1].token)
            self.helper.send(room, body="There!", tok=self.others[1].token)
    
            # Nothing should have happened yet, as we're paused.
            assert not self.email_attempts
    
            self.pusher._resume_processing()
    
            # We should get emailed about those messages
            self._check_for_mail()
    
    
        def test_multiple_rooms(self):
            # We want to test multiple notifications from multiple rooms, so we pause
            # processing of push while we send messages.
            self.pusher._pause_processing()
    
            # Create a simple room with multiple other users
            rooms = [
                self.helper.create_room_as(self.user_id, tok=self.access_token),
                self.helper.create_room_as(self.user_id, tok=self.access_token),
            ]
    
            for r, other in zip(rooms, self.others):
                self.helper.invite(
                    room=r, src=self.user_id, tok=self.access_token, targ=other.id
                )
                self.helper.join(room=r, user=other.id, tok=other.token)
    
            # The other users send some messages
            self.helper.send(rooms[0], body="Hi!", tok=self.others[0].token)
            self.helper.send(rooms[1], body="There!", tok=self.others[1].token)
            self.helper.send(rooms[1], body="There!", tok=self.others[1].token)
    
            # Nothing should have happened yet, as we're paused.
            assert not self.email_attempts
    
            self.pusher._resume_processing()
    
            # We should get emailed about those messages
            self._check_for_mail()
    
    
        def test_room_notifications_include_avatar(self):
            # Create a room and set its avatar.
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.send_state(
                room, "m.room.avatar", {"url": "mxc://DUMMY_MEDIA_ID"}, self.access_token
            )
    
            # Invite two other uses.
            for other in self.others:
                self.helper.invite(
                    room=room, src=self.user_id, tok=self.access_token, targ=other.id
                )
                self.helper.join(room=room, user=other.id, tok=other.token)
    
            # The other users send some messages.
            # TODO It seems that two messages are required to trigger an email?
            self.helper.send(room, body="Alpha", tok=self.others[0].token)
            self.helper.send(room, body="Beta", tok=self.others[1].token)
    
            # We should get emailed about those messages
            args, kwargs = self._check_for_mail()
    
            # That email should contain the room's avatar
            msg: bytes = args[5]
            # Multipart: plain text, base 64 encoded; html, base 64 encoded
            html = (
                email.message_from_bytes(msg)
                .get_payload()[1]
                .get_payload(decode=True)
                .decode()
            )
            self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html)
    
    
        def test_empty_room(self):
            """All users leaving a room shouldn't cause the pusher to break."""
            # Create a simple room with two users
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.invite(
                room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
            )
            self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
    
            # The other user sends a single message.
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
    
            # Leave the room before the message is processed.
            self.helper.leave(room, self.user_id, tok=self.access_token)
            self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
    
            # We should get emailed about that message
            self._check_for_mail()
    
        def test_empty_room_multiple_messages(self):
            """All users leaving a room shouldn't cause the pusher to break."""
            # Create a simple room with two users
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.invite(
                room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
            )
            self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
    
            # The other user sends a single message.
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
            self.helper.send(room, body="There!", tok=self.others[0].token)
    
            # Leave the room before the message is processed.
            self.helper.leave(room, self.user_id, tok=self.access_token)
            self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
    
            # We should get emailed about that message
            self._check_for_mail()
    
    
        def test_encrypted_message(self):
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.invite(
                room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
            )
            self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
    
            # The other user sends some messages
            self.helper.send_event(room, "m.room.encrypted", {}, tok=self.others[0].token)
    
            # We should get emailed about that message
            self._check_for_mail()
    
    
        def test_no_email_sent_after_removed(self):
            # Create a simple room with two users
            room = self.helper.create_room_as(self.user_id, tok=self.access_token)
            self.helper.invite(
                room=room,
                src=self.user_id,
                tok=self.access_token,
                targ=self.others[0].id,
            )
            self.helper.join(
                room=room,
                user=self.others[0].id,
                tok=self.others[0].token,
            )
    
            # The other user sends a single message.
            self.helper.send(room, body="Hi!", tok=self.others[0].token)
    
            # We should get emailed about that message
            self._check_for_mail()
    
            # disassociate the user's email address
            self.get_success(
                self.auth_handler.delete_threepid(
                    user_id=self.user_id,
                    medium="email",
                    address="a@example.com",
                )
            )
    
            # check that the pusher for that email address has been deleted
            pushers = self.get_success(
                self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
            )
            pushers = list(pushers)
            self.assertEqual(len(pushers), 0)
    
    
        def test_remove_unlinked_pushers_background_job(self):
            """Checks that all existing pushers associated with unlinked email addresses are removed
            upon running the remove_deleted_email_pushers background update.
            """
            # disassociate the user's email address manually (without deleting the pusher).
            # This resembles the old behaviour, which the background update below is intended
            # to clean up.
            self.get_success(
                self.hs.get_datastore().user_delete_threepid(
                    self.user_id, "email", "a@example.com"
                )
            )
    
            # Run the "remove_deleted_email_pushers" background job
            self.get_success(
                self.hs.get_datastore().db_pool.simple_insert(
                    table="background_updates",
                    values={
                        "update_name": "remove_deleted_email_pushers",
                        "progress_json": "{}",
                        "depends_on": None,
                    },
                )
            )
    
            # ... and tell the DataStore that it hasn't finished all updates yet
            self.hs.get_datastore().db_pool.updates._all_done = False
    
            # Now let's actually drive the updates to completion
            while not self.get_success(
                self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
            ):
                self.get_success(
                    self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
                    by=0.1,
                )
    
            # Check that all pushers with unlinked addresses were deleted
            pushers = self.get_success(
    
                self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
            )
            pushers = list(pushers)
            self.assertEqual(len(pushers), 0)
    
    
        def _check_for_mail(self) -> Tuple[Sequence, Dict]:
            """
            Assert that synapse sent off exactly one email notification.
    
            Returns:
                args and kwargs passed to synapse.reactor.send_email._sendmail for
                that notification.
            """
    
            # Get the stream ordering before it gets sent
            pushers = self.get_success(
    
                self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
    
            pushers = list(pushers)
    
            self.assertEqual(len(pushers), 1)
    
            last_stream_ordering = pushers[0].last_stream_ordering
    
    
            # Advance time a bit, so the pusher will register something has happened
    
    
            # It hasn't succeeded yet, so the stream ordering shouldn't have moved
            pushers = self.get_success(
    
                self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
    
            pushers = list(pushers)
    
            self.assertEqual(len(pushers), 1)
    
            self.assertEqual(last_stream_ordering, pushers[0].last_stream_ordering)
    
    
            # One email was attempted to be sent
            self.assertEqual(len(self.email_attempts), 1)
    
    
            deferred, sendmail_args, sendmail_kwargs = self.email_attempts[0]
    
            # Make the email succeed
    
            self.pump()
    
            # One email was attempted to be sent
            self.assertEqual(len(self.email_attempts), 1)
    
            # The stream ordering has increased
            pushers = self.get_success(
    
                self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
    
            pushers = list(pushers)
    
            self.assertEqual(len(pushers), 1)
    
            self.assertTrue(pushers[0].last_stream_ordering > last_stream_ordering)
    
    
            # Reset the attempts.
            self.email_attempts = []
    
            return sendmail_args, sendmail_kwargs