Skip to content
Snippets Groups Projects
sync.py 77 KiB
Newer Older
  • Learn to ignore specific revisions
  •                 user_id, now_token.groups_key,
                )
    
            invited = {}
            joined = {}
            left = {}
            for result in results:
                membership = result["membership"]
                group_id = result["group_id"]
                gtype = result["type"]
                content = result["content"]
    
                if membership == "join":
                    if gtype == "membership":
    
                        content.pop("membership", None)
    
                        joined[group_id] = content["content"]
    
                    else:
                        joined.setdefault(group_id, {})[gtype] = content
                elif membership == "invite":
                    if gtype == "membership":
                        content.pop("membership", None)
                        invited[group_id] = content["content"]
                else:
                    if gtype == "membership":
                        left[group_id] = content["content"]
    
            sync_result_builder.groups = GroupsSyncResult(
                join=joined,
                invite=invited,
                leave=left,
            )
    
    
        @measure_func("_generate_sync_entry_for_device_list")
    
        @defer.inlineCallbacks
    
        def _generate_sync_entry_for_device_list(self, sync_result_builder,
                                                 newly_joined_rooms, newly_joined_users,
                                                 newly_left_rooms, newly_left_users):
    
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
    
            if since_token and since_token.device_list_key:
                changed = yield self.store.get_user_whose_devices_changed(
                    since_token.device_list_key
                )
    
    
                # TODO: Be more clever than this, i.e. remove users who we already
                # share a room with?
                for room_id in newly_joined_rooms:
    
                    joined_users = yield self.state.get_current_users_in_room(room_id)
    
                    newly_joined_users.update(joined_users)
    
                for room_id in newly_left_rooms:
    
                    left_users = yield self.state.get_current_users_in_room(room_id)
    
                    newly_left_users.update(left_users)
    
    
                # TODO: Check that these users are actually new, i.e. either they
                # weren't in the previous sync *or* they left and rejoined.
                changed.update(newly_joined_users)
    
                if not changed and not newly_left_users:
                    defer.returnValue(DeviceLists(
                        changed=[],
                        left=newly_left_users,
                    ))
    
    
                users_who_share_room = yield self.store.get_users_who_share_room_with_user(
                    user_id
                )
    
                defer.returnValue(DeviceLists(
                    changed=users_who_share_room & changed,
                    left=set(newly_left_users) - users_who_share_room,
                ))
    
                defer.returnValue(DeviceLists(
                    changed=[],
                    left=[],
                ))
    
        @defer.inlineCallbacks
        def _generate_sync_entry_for_to_device(self, sync_result_builder):
            """Generates the portion of the sync response. Populates
            `sync_result_builder` with the result.
    
            Args:
                sync_result_builder(SyncResultBuilder)
    
            Returns:
                Deferred(dict): A dictionary containing the per room account data.
            """
            user_id = sync_result_builder.sync_config.user.to_string()
            device_id = sync_result_builder.sync_config.device_id
            now_token = sync_result_builder.now_token
            since_stream_id = 0
            if sync_result_builder.since_token is not None:
                since_stream_id = int(sync_result_builder.since_token.to_device_key)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            if since_stream_id != int(now_token.to_device_key):
    
                # We only delete messages when a new message comes in, but that's
                # fine so long as we delete them at some point.
    
    
                deleted = yield self.store.delete_messages_for_device(
    
                    user_id, device_id, since_stream_id
                )
    
                logger.debug("Deleted %d to-device messages up to %d",
                             deleted, since_stream_id)
    
                messages, stream_id = yield self.store.get_new_messages_for_device(
    
                    user_id, device_id, since_stream_id, now_token.to_device_key
    
                    "Returning %d to-device messages between %d and %d (current token: %d)",
                    len(messages), since_stream_id, stream_id, now_token.to_device_key
                )
    
                sync_result_builder.now_token = now_token.copy_and_replace(
                    "to_device_key", stream_id
                )
                sync_result_builder.to_device = messages
            else:
                sync_result_builder.to_device = []
    
    Erik Johnston's avatar
    Erik Johnston committed
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _generate_sync_entry_for_account_data(self, sync_result_builder):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Generates the account data portion of the sync response. Populates
    
    Erik Johnston's avatar
    Erik Johnston committed
            `sync_result_builder` with the result.
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            Returns:
                Deferred(dict): A dictionary containing the per room account data.
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_config = sync_result_builder.sync_config
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
    
    Erik Johnston's avatar
    Erik Johnston committed
            if since_token and not sync_result_builder.full_state:
    
    Erik Johnston's avatar
    Erik Johnston committed
                account_data, account_data_by_room = (
                    yield self.store.get_updated_account_data_for_user(
                        user_id,
                        since_token.account_data_key,
                    )
                )
    
                push_rules_changed = yield self.store.have_push_rules_changed_for_user(
                    user_id, int(since_token.push_rules_key)
                )
    
                if push_rules_changed:
                    account_data["m.push_rules"] = yield self.push_rules_for_user(
                        sync_config.user
                    )
            else:
                account_data, account_data_by_room = (
                    yield self.store.get_account_data_for_user(
                        sync_config.user.to_string()
                    )
                )
    
                account_data['m.push_rules'] = yield self.push_rules_for_user(
                    sync_config.user
                )
    
    
            account_data_for_user = sync_config.filter_collection.filter_account_data([
                {"type": account_data_type, "content": content}
                for account_data_type, content in account_data.items()
            ])
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder.account_data = account_data_for_user
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            defer.returnValue(account_data_by_room)
    
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
    
    Erik Johnston's avatar
    Erik Johnston committed
                                              newly_joined_users):
            """Generates the presence portion of the sync response. Populates the
    
    Erik Johnston's avatar
    Erik Johnston committed
            `sync_result_builder` with the result.
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
                newly_joined_rooms(list): List of rooms that the user has joined
                    since the last sync (or empty if an initial sync)
                newly_joined_users(list): List of users that have joined rooms
                    since the last sync (or empty if an initial sync)
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            now_token = sync_result_builder.now_token
            sync_config = sync_result_builder.sync_config
            user = sync_result_builder.sync_config.user
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            presence_source = self.event_sources.sources["presence"]
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            since_token = sync_result_builder.since_token
            if since_token and not sync_result_builder.full_state:
    
    Erik Johnston's avatar
    Erik Johnston committed
                presence_key = since_token.presence_key
    
                include_offline = True
    
    Erik Johnston's avatar
    Erik Johnston committed
            else:
                presence_key = None
    
                include_offline = False
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            presence, presence_key = yield presence_source.get_new_events(
                user=user,
                from_key=presence_key,
                is_guest=sync_config.is_guest,
    
                include_offline=include_offline,
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder.now_token = now_token.copy_and_replace(
    
    Erik Johnston's avatar
    Erik Johnston committed
                "presence_key", presence_key
            )
    
            extra_users_ids = set(newly_joined_users)
            for room_id in newly_joined_rooms:
    
                users = yield self.state.get_current_users_in_room(room_id)
    
    Erik Johnston's avatar
    Erik Johnston committed
                extra_users_ids.update(users)
            extra_users_ids.discard(user.to_string())
    
    
            if extra_users_ids:
                states = yield self.presence_handler.get_states(
                    extra_users_ids,
                )
                presence.extend(states)
    
                # Deduplicate the presence entries so that there's at most one per user
    
                presence = list({p.user_id: p for p in presence}.values())
    
    Erik Johnston's avatar
    Erik Johnston committed
            presence = sync_config.filter_collection.filter_presence(
                presence
            )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder.presence = presence
    
    Erik Johnston's avatar
    Erik Johnston committed
    
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Generates the rooms portion of the sync response. Populates the
    
    Erik Johnston's avatar
    Erik Johnston committed
            `sync_result_builder` with the result.
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
                account_data_by_room(dict): Dictionary of per room account data
    
            Returns:
    
                Deferred(tuple): Returns a 4-tuple of
                `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            user_id = sync_result_builder.sync_config.user.to_string()
    
            block_all_room_ephemeral = (
                sync_result_builder.since_token is None and
                sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
    
    
            if block_all_room_ephemeral:
                ephemeral_by_room = {}
            else:
                now_token, ephemeral_by_room = yield self.ephemeral_by_room(
    
                    sync_result_builder,
    
                    now_token=sync_result_builder.now_token,
                    since_token=sync_result_builder.since_token,
                )
                sync_result_builder.now_token = now_token
    
    Erik Johnston's avatar
    Erik Johnston committed
            # We check up front if anything has changed, if it hasn't then there is
            # no point in going futher.
    
            since_token = sync_result_builder.since_token
            if not sync_result_builder.full_state:
                if since_token and not ephemeral_by_room and not account_data_by_room:
                    have_changed = yield self._have_rooms_changed(sync_result_builder)
                    if not have_changed:
                        tags_by_room = yield self.store.get_updated_tags(
                            user_id,
                            since_token.account_data_key,
                        )
                        if not tags_by_room:
    
                            logger.debug("no-oping sync")
    
                            defer.returnValue(([], [], [], []))
    
    Erik Johnston's avatar
    Erik Johnston committed
            ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
                "m.ignored_user_list", user_id=user_id,
            )
    
            if ignored_account_data:
                ignored_users = ignored_account_data.get("ignored_users", {}).keys()
            else:
                ignored_users = frozenset()
    
    
            if since_token:
    
    Erik Johnston's avatar
    Erik Johnston committed
                res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
    
    Erik Johnston's avatar
    Erik Johnston committed
                room_entries, invited, newly_joined_rooms, newly_left_rooms = res
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                tags_by_room = yield self.store.get_updated_tags(
    
                    user_id, since_token.account_data_key,
    
    Erik Johnston's avatar
    Erik Johnston committed
                )
            else:
    
    Erik Johnston's avatar
    Erik Johnston committed
                res = yield self._get_all_rooms(sync_result_builder, ignored_users)
    
    Erik Johnston's avatar
    Erik Johnston committed
                room_entries, invited, newly_joined_rooms = res
    
    Erik Johnston's avatar
    Erik Johnston committed
                newly_left_rooms = []
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                tags_by_room = yield self.store.get_tags_for_user(user_id)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            def handle_room_entries(room_entry):
    
                return self._generate_room_entry(
    
    Erik Johnston's avatar
    Erik Johnston committed
                    sync_result_builder,
    
    Erik Johnston's avatar
    Erik Johnston committed
                    ignored_users,
                    room_entry,
                    ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
                    tags=tags_by_room.get(room_entry.room_id),
                    account_data=account_data_by_room.get(room_entry.room_id, {}),
    
    Erik Johnston's avatar
    Erik Johnston committed
                    always_include=sync_result_builder.full_state,
    
    Erik Johnston's avatar
    Erik Johnston committed
            yield concurrently_execute(handle_room_entries, room_entries, 10)
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder.invited.extend(invited)
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            # Now we want to get any newly joined users
            newly_joined_users = set()
    
            if since_token:
    
                for joined_sync in sync_result_builder.joined:
                    it = itertools.chain(
    
                        joined_sync.timeline.events, itervalues(joined_sync.state)
    
                    )
                    for event in it:
                        if event.type == EventTypes.Member:
                            if event.membership == Membership.JOIN:
                                newly_joined_users.add(event.state_key)
    
                            else:
                                prev_content = event.unsigned.get("prev_content", {})
                                prev_membership = prev_content.get("membership", None)
                                if prev_membership == Membership.JOIN:
                                    newly_left_users.add(event.state_key)
    
            newly_left_users -= newly_joined_users
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            defer.returnValue((
                newly_joined_rooms,
                newly_joined_users,
                newly_left_rooms,
                newly_left_users,
            ))
    
        @defer.inlineCallbacks
        def _have_rooms_changed(self, sync_result_builder):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Returns whether there may be any new events that should be sent down
            the sync. Returns True if there are.
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
    
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
            now_token = sync_result_builder.now_token
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            assert since_token
    
    
            # Get a list of membership change events that have happened.
            rooms_changed = yield self.store.get_membership_changes_for_user(
                user_id, since_token.room_key, now_token.room_key
            )
    
            if rooms_changed:
                defer.returnValue(True)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
    
            for room_id in sync_result_builder.joined_room_ids:
    
    Erik Johnston's avatar
    Erik Johnston committed
                if self.store.has_room_changed_since(room_id, stream_id):
    
                    defer.returnValue(True)
            defer.returnValue(False)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _get_rooms_changed(self, sync_result_builder, ignored_users):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Gets the the changes that have happened since the last sync.
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
                ignored_users(set(str)): Set of users ignored by user.
    
            Returns:
                Deferred(tuple): Returns a tuple of the form:
    
                `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
    
                where:
                    room_entries is a list [RoomSyncResultBuilder]
                    invited_rooms is a list [InvitedSyncResult]
                    newly_joined rooms is a list[str] of room ids
                    newly_left_rooms is a list[str] of room ids
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
            now_token = sync_result_builder.now_token
            sync_config = sync_result_builder.sync_config
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            assert since_token
    
            # Get a list of membership change events that have happened.
            rooms_changed = yield self.store.get_membership_changes_for_user(
                user_id, since_token.room_key, now_token.room_key
            )
    
            mem_change_events_by_room_id = {}
            for event in rooms_changed:
                mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
    
            newly_joined_rooms = []
    
    Erik Johnston's avatar
    Erik Johnston committed
            newly_left_rooms = []
    
    Erik Johnston's avatar
    Erik Johnston committed
            room_entries = []
    
    Erik Johnston's avatar
    Erik Johnston committed
            invited = []
    
            for room_id, events in iteritems(mem_change_events_by_room_id):
    
                logger.info(
                    "Membership changes in %s: [%s]",
                    room_id,
                    ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)),
                )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                non_joins = [e for e in events if e.membership != Membership.JOIN]
                has_join = len(non_joins) != len(events)
    
                # We want to figure out if we joined the room at some point since
                # the last sync (even if we have since left). This is to make sure
                # we do send down the room, and with full state, where necessary
    
    Erik Johnston's avatar
    Erik Johnston committed
                old_state_ids = None
    
                if room_id in sync_result_builder.joined_room_ids and non_joins:
    
                    # Always include if the user (re)joined the room, especially
                    # important so that device list changes are calculated correctly.
                    # If there are non join member events, but we are still in the room,
                    # then the user must have left and joined
                    newly_joined_rooms.append(room_id)
    
                    # User is in the room so we don't need to do the invite/leave checks
                    continue
    
    
                if room_id in sync_result_builder.joined_room_ids or has_join:
    
                    old_state_ids = yield self.get_state_at(room_id, since_token)
                    old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                    old_mem_ev = None
                    if old_mem_ev_id:
                        old_mem_ev = yield self.store.get_event(
                            old_mem_ev_id, allow_none=True
                        )
    
    
                    # debug for #4422
                    if has_join:
                        prev_membership = None
                        if old_mem_ev:
                            prev_membership = old_mem_ev.membership
                        issue4422_logger.debug(
                            "Previous membership for room %s with join: %s (event %s)",
                            room_id, prev_membership, old_mem_ev_id,
                        )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                    if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
                        newly_joined_rooms.append(room_id)
    
    
                # If user is in the room then we don't need to do the invite/leave checks
    
                if room_id in sync_result_builder.joined_room_ids:
    
    Erik Johnston's avatar
    Erik Johnston committed
    
                if not non_joins:
                    continue
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                # Check if we have left the room. This can either be because we were
                # joined before *or* that we since joined and then left.
                if events[-1].membership != Membership.JOIN:
                    if has_join:
                        newly_left_rooms.append(room_id)
                    else:
                        if not old_state_ids:
                            old_state_ids = yield self.get_state_at(room_id, since_token)
                            old_mem_ev_id = old_state_ids.get(
                                (EventTypes.Member, user_id),
                                None,
                            )
                            old_mem_ev = None
                            if old_mem_ev_id:
                                old_mem_ev = yield self.store.get_event(
                                    old_mem_ev_id, allow_none=True
                                )
                        if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
                            newly_left_rooms.append(room_id)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                # Only bother if we're still currently invited
                should_invite = non_joins[-1].membership == Membership.INVITE
                if should_invite:
                    if event.sender not in ignored_users:
                        room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
                        if room_sync:
                            invited.append(room_sync)
    
                # Always include leave/ban events. Just take the last one.
                # TODO: How do we handle ban -> leave in same batch?
                leave_events = [
                    e for e in non_joins
                    if e.membership in (Membership.LEAVE, Membership.BAN)
                ]
    
                if leave_events:
                    leave_event = leave_events[-1]
                    leave_stream_token = yield self.store.get_stream_token_for_event(
                        leave_event.event_id
                    )
                    leave_token = since_token.copy_and_replace(
                        "room_key", leave_stream_token
                    )
    
                    if since_token and since_token.is_after(leave_token):
                        continue
    
    
                    # If this is an out of band message, like a remote invite
                    # rejection, we include it in the recents batch. Otherwise, we
                    # let _load_filtered_recents handle fetching the correct
                    # batches.
                    #
                    # This is all screaming out for a refactor, as the logic here is
                    # subtle and the moving parts numerous.
                    if leave_event.internal_metadata.is_out_of_band_membership():
                        batch_events = [leave_event]
                    else:
                        batch_events = None
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                    room_entries.append(RoomSyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                        room_id=room_id,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        rtype="archived",
    
    Erik Johnston's avatar
    Erik Johnston committed
                        newly_joined=room_id in newly_joined_rooms,
                        full_state=False,
                        since_token=since_token,
                        upto_token=leave_token,
                    ))
    
            timeline_limit = sync_config.filter_collection.timeline_limit()
    
            # Get all events for rooms we're currently joined to.
            room_to_events = yield self.store.get_room_events_stream_for_rooms(
    
                room_ids=sync_result_builder.joined_room_ids,
    
    Erik Johnston's avatar
    Erik Johnston committed
                from_key=since_token.room_key,
                to_key=now_token.room_key,
                limit=timeline_limit + 1,
            )
    
            # We loop through all room ids, even if there are no new events, in case
            # there are non room events taht we need to notify about.
    
            for room_id in sync_result_builder.joined_room_ids:
    
    Erik Johnston's avatar
    Erik Johnston committed
                room_entry = room_to_events.get(room_id, None)
    
    
                newly_joined = room_id in newly_joined_rooms
    
    Erik Johnston's avatar
    Erik Johnston committed
                if room_entry:
                    events, start_key = room_entry
    
                    prev_batch_token = now_token.copy_and_replace("room_key", start_key)
    
    
                    entry = RoomSyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                        room_id=room_id,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        rtype="joined",
    
    Erik Johnston's avatar
    Erik Johnston committed
                        events=events,
    
                        newly_joined=newly_joined,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        full_state=False,
    
                        since_token=None if newly_joined else since_token,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        upto_token=prev_batch_token,
    
    Erik Johnston's avatar
    Erik Johnston committed
                else:
    
                    entry = RoomSyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                        room_id=room_id,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        rtype="joined",
    
    Erik Johnston's avatar
    Erik Johnston committed
                        events=[],
    
                        newly_joined=newly_joined,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        full_state=False,
                        since_token=since_token,
                        upto_token=since_token,
    
                    )
    
                if newly_joined:
                    # debugging for https://github.com/matrix-org/synapse/issues/4422
                    issue4422_logger.debug(
                        "RoomSyncResultBuilder events for newly joined room %s: %r",
                        room_id, entry.events,
                    )
                room_entries.append(entry)
    
    Erik Johnston's avatar
    Erik Johnston committed
            defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
    
    Erik Johnston's avatar
    Erik Johnston committed
    
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _get_all_rooms(self, sync_result_builder, ignored_users):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Returns entries for all rooms for the user.
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
                ignored_users(set(str)): Set of users ignored by user.
    
            Returns:
                Deferred(tuple): Returns a tuple of the form:
                `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
            """
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
            now_token = sync_result_builder.now_token
            sync_config = sync_result_builder.sync_config
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            membership_list = (
                Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
            )
    
            room_list = yield self.store.get_rooms_for_user_where_membership_is(
                user_id=user_id,
                membership_list=membership_list
            )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            room_entries = []
    
    Erik Johnston's avatar
    Erik Johnston committed
            invited = []
    
            for event in room_list:
                if event.membership == Membership.JOIN:
    
    Erik Johnston's avatar
    Erik Johnston committed
                    room_entries.append(RoomSyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                        room_id=event.room_id,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        rtype="joined",
    
    Erik Johnston's avatar
    Erik Johnston committed
                        events=None,
                        newly_joined=False,
                        full_state=True,
                        since_token=since_token,
                        upto_token=now_token,
                    ))
                elif event.membership == Membership.INVITE:
                    if event.sender in ignored_users:
                        continue
                    invite = yield self.store.get_event(event.event_id)
                    invited.append(InvitedSyncResult(
                        room_id=event.room_id,
                        invite=invite,
                    ))
                elif event.membership in (Membership.LEAVE, Membership.BAN):
                    # Always send down rooms we were banned or kicked from.
                    if not sync_config.filter_collection.include_leave:
                        if event.membership == Membership.LEAVE:
                            if user_id == event.sender:
                                continue
    
                    leave_token = now_token.copy_and_replace(
                        "room_key", "s%d" % (event.stream_ordering,)
                    )
    
    Erik Johnston's avatar
    Erik Johnston committed
                    room_entries.append(RoomSyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                        room_id=event.room_id,
    
    Erik Johnston's avatar
    Erik Johnston committed
                        rtype="archived",
    
    Erik Johnston's avatar
    Erik Johnston committed
                        events=None,
                        newly_joined=False,
                        full_state=True,
                        since_token=since_token,
                        upto_token=leave_token,
                    ))
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            defer.returnValue((room_entries, invited, []))
    
    Erik Johnston's avatar
    Erik Johnston committed
    
        @defer.inlineCallbacks
    
    Erik Johnston's avatar
    Erik Johnston committed
        def _generate_room_entry(self, sync_result_builder, ignored_users,
    
    Erik Johnston's avatar
    Erik Johnston committed
                                 room_builder, ephemeral, tags, account_data,
                                 always_include=False):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Populates the `joined` and `archived` section of `sync_result_builder`
    
    Erik Johnston's avatar
    Erik Johnston committed
            based on the `room_builder`.
    
            Args:
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder(SyncResultBuilder)
    
    Erik Johnston's avatar
    Erik Johnston committed
                ignored_users(set(str)): Set of users ignored by user.
                room_builder(RoomSyncResultBuilder)
                ephemeral(list): List of new ephemeral events for room
                tags(list): List of *all* tags for room, or None if there has been
                    no change.
                account_data(list): List of new account data for room
                always_include(bool): Always include this room in the sync response,
                    even if empty.
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            newly_joined = room_builder.newly_joined
            full_state = (
                room_builder.full_state
                or newly_joined
    
    Erik Johnston's avatar
    Erik Johnston committed
                or sync_result_builder.full_state
    
            events = room_builder.events
    
            # We want to shortcut out as early as possible.
            if not (always_include or account_data or ephemeral or full_state):
                if events == [] and tags is None:
                    return
    
            now_token = sync_result_builder.now_token
            sync_config = sync_result_builder.sync_config
    
            room_id = room_builder.room_id
    
    Erik Johnston's avatar
    Erik Johnston committed
            since_token = room_builder.since_token
            upto_token = room_builder.upto_token
    
    
            batch = yield self._load_filtered_recents(
    
    Erik Johnston's avatar
    Erik Johnston committed
                room_id, sync_config,
                now_token=upto_token,
                since_token=since_token,
                recents=events,
    
    Erik Johnston's avatar
    Erik Johnston committed
                newly_joined_room=newly_joined,
    
            if newly_joined:
                # debug for https://github.com/matrix-org/synapse/issues/4422
                issue4422_logger.debug(
                    "Timeline events after filtering in newly-joined room %s: %r",
                    room_id, batch,
                )
    
    
            # When we join the room (or the client requests full_state), we should
            # send down any existing tags. Usually the user won't have tags in a
            # newly joined room, unless either a) they've joined before or b) the
            # tag was added by synapse e.g. for server notice rooms.
            if full_state:
                user_id = sync_result_builder.sync_config.user.to_string()
                tags = yield self.store.get_tags_for_room(user_id, room_id)
    
    
                # If there aren't any tags, don't send the empty tags list down
                # sync
                if not tags:
                    tags = None
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            account_data_events = []
            if tags is not None:
                account_data_events.append({
                    "type": "m.tag",
                    "content": {"tags": tags},
                })
    
            for account_data_type, content in account_data.items():
                account_data_events.append({
                    "type": account_data_type,
                    "content": content,
                })
    
    
            account_data_events = sync_config.filter_collection.filter_room_account_data(
    
    Erik Johnston's avatar
    Erik Johnston committed
                account_data_events
            )
    
            ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
    
    
    Nathan Pennie's avatar
    Nathan Pennie committed
            if not (always_include
                    or batch
                    or account_data_events
                    or ephemeral
                    or full_state):
    
    Erik Johnston's avatar
    Erik Johnston committed
                return
    
            state = yield self.compute_state_delta(
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                room_id, batch, sync_config, since_token, now_token,
                full_state=full_state
    
    
            # we include a summary in room responses when we're lazy loading
            # members (as the client otherwise doesn't have enough info to form
            # the name itself).
    
            if (
                sync_config.filter_collection.lazy_load_members() and
                (
    
                    # we recalulate the summary:
                    #   if there are membership changes in the timeline, or
                    #   if membership has changed during a gappy sync, or
                    #   if this is an initial sync.
    
                    any(ev.type == EventTypes.Member for ev in batch.events) or
    
                    (
                        # XXX: this may include false positives in the form of LL
                        # members which have snuck into state
                        batch.limited and
                        any(t == EventTypes.Member for (t, k) in state)
                    ) or
    
                    since_token is None
                )
            ):
                summary = yield self.compute_summary(
                    room_id, sync_config, batch, state, now_token
                )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            if room_builder.rtype == "joined":
    
    Erik Johnston's avatar
    Erik Johnston committed
                unread_notifications = {}
                room_sync = JoinedSyncResult(
                    room_id=room_id,
                    timeline=batch,
                    state=state,
                    ephemeral=ephemeral,
                    account_data=account_data_events,
                    unread_notifications=unread_notifications,
    
    Erik Johnston's avatar
    Erik Johnston committed
                )
    
                if room_sync or always_include:
                    notifs = yield self.unread_notifs_for_room_id(
                        room_id, sync_config
                    )
    
                    if notifs is not None:
                        unread_notifications["notification_count"] = notifs["notify_count"]
                        unread_notifications["highlight_count"] = notifs["highlight_count"]
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                    sync_result_builder.joined.append(room_sync)
    
                if batch.limited and since_token:
    
                    user_id = sync_result_builder.sync_config.user.to_string()
                    logger.info(
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        "Incremental gappy sync of %s for user %s with %d state events" % (
    
                            room_id,
                            user_id,
                            len(state),
                        )
                    )
    
    Erik Johnston's avatar
    Erik Johnston committed
            elif room_builder.rtype == "archived":
    
    Erik Johnston's avatar
    Erik Johnston committed
                room_sync = ArchivedSyncResult(
                    room_id=room_id,
                    timeline=batch,
                    state=state,
    
                    account_data=account_data_events,
    
    Erik Johnston's avatar
    Erik Johnston committed
                )
                if room_sync or always_include:
    
    Erik Johnston's avatar
    Erik Johnston committed
                    sync_result_builder.archived.append(room_sync)
    
    Erik Johnston's avatar
    Erik Johnston committed
            else:
                raise Exception("Unrecognized rtype: %r", room_builder.rtype)
    
        @defer.inlineCallbacks
        def get_rooms_for_user_at(self, user_id, stream_ordering):
            """Get set of joined rooms for a user at the given stream ordering.
    
            The stream ordering *must* be recent, otherwise this may throw an
            exception if older than a month. (This function is called with the
            current token, which should be perfectly fine).
    
            Args:
                user_id (str)
                stream_ordering (int)
    
            ReturnValue:
                Deferred[frozenset[str]]: Set of room_ids the user is in at given
                stream_ordering.
            """
            joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
                user_id,
            )
    
            joined_room_ids = set()
    
            # We need to check that the stream ordering of the join for each room
            # is before the stream_ordering asked for. This might not be the case
            # if the user joins a room between us getting the current token and
            # calling `get_rooms_for_user_with_stream_ordering`.
            # If the membership's stream ordering is after the given stream
            # ordering, we need to go and work out if the user was in the room
            # before.
    
    Erik Johnston's avatar
    Erik Johnston committed
            for room_id, membership_stream_ordering in joined_rooms:
                if membership_stream_ordering <= stream_ordering:
    
                    joined_room_ids.add(room_id)
                    continue
    
    
    Erik Johnston's avatar
    Erik Johnston committed
                logger.info("User joined room after current token: %s", room_id)
    
    
                extrems = yield self.store.get_forward_extremeties_for_room(
                    room_id, stream_ordering,
                )
    
                users_in_room = yield self.state.get_current_users_in_room(
    
                    room_id, extrems,
                )
                if user_id in users_in_room:
                    joined_room_ids.add(room_id)
    
            joined_room_ids = frozenset(joined_room_ids)
            defer.returnValue(joined_room_ids)
    
    
    
    def _action_has_highlight(actions):
        for action in actions:
            try:
                if action.get("set_tweak", None) == "highlight":
                    return action.get("value", True)
            except AttributeError:
                pass
    
        return False
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
    def _calculate_state(
        timeline_contains, timeline_start, previous, current, lazy_load_members,
    ):
    
        """Works out what state to include in a sync response.
    
        Args:
            timeline_contains (dict): state in the timeline
            timeline_start (dict): state at the start of the timeline
            previous (dict): state at the end of the previous sync (or empty dict
    
    Erik Johnston's avatar
    Erik Johnston committed
                if this is an initial sync)
    
            current (dict): state at the end of the timeline
    
            lazy_load_members (bool): whether to return members from timeline_start
                or not.  assumes that timeline_start has already been filtered to
                include only the members the client needs to know about.
    
        event_id_to_key = {
            e: key
            for key, e in itertools.chain(
    
                iteritems(timeline_contains),
                iteritems(previous),
                iteritems(timeline_start),
                iteritems(current),
    
        c_ids = set(e for e in itervalues(current))
        ts_ids = set(e for e in itervalues(timeline_start))
        p_ids = set(e for e in itervalues(previous))
        tc_ids = set(e for e in itervalues(timeline_contains))
    
        # If we are lazyloading room members, we explicitly add the membership events
        # for the senders in the timeline into the state block returned by /sync,
        # as we may not have sent them to the client before.  We find these membership
        # events by filtering them out of timeline_start, which has already been filtered
        # to only include membership events for the senders in the timeline.
    
        # In practice, we can do this by removing them from the p_ids list,
        # which is the list of relevant state we know we have already sent to the client.
    
        # see https://github.com/matrix-org/synapse/pull/2970
        #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
        if lazy_load_members:
    
            p_ids.difference_update(
    
                e for t, e in iteritems(timeline_start)
    
                if t[0] == EventTypes.Member
    
        state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
    
            event_id_to_key[e]: e for e in state_ids
    
    Erik Johnston's avatar
    Erik Johnston committed
    
    
    class SyncResultBuilder(object):
    
        """Used to help build up a new SyncResult for a user
    
        Attributes:
            sync_config (SyncConfig)
            full_state (bool)
            since_token (StreamToken)
            now_token (StreamToken)
            joined_room_ids (list[str])
    
            # The following mirror the fields in a sync response
            presence (list)
            account_data (list)
            joined (list[JoinedSyncResult])
            invited (list[InvitedSyncResult])
            archived (list[ArchivedSyncResult])
            device (list)
            groups (GroupsSyncResult|None)
            to_device (list)
        """
    
        def __init__(self, sync_config, full_state, since_token, now_token,
                     joined_room_ids):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
            Args:
    
                sync_config (SyncConfig)
                full_state (bool): The full_state flag as specified by user
                since_token (StreamToken): The token supplied by user, or None.
                now_token (StreamToken): The token to sync up to.
                joined_room_ids (list[str]): List of rooms the user is joined to
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.sync_config = sync_config
            self.full_state = full_state
            self.since_token = since_token
            self.now_token = now_token
    
            self.joined_room_ids = joined_room_ids
    
    Erik Johnston's avatar
    Erik Johnston committed
    
            self.presence = []
            self.account_data = []
            self.joined = []
            self.invited = []
            self.archived = []
    
            self.groups = None
    
    Erik Johnston's avatar
    Erik Johnston committed
    
    
    class RoomSyncResultBuilder(object):
    
    Erik Johnston's avatar
    Erik Johnston committed
        """Stores information needed to create either a `JoinedSyncResult` or
        `ArchivedSyncResult`.
        """
    
    Erik Johnston's avatar
    Erik Johnston committed
        def __init__(self, room_id, rtype, events, newly_joined, full_state,
                     since_token, upto_token):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
            Args:
                room_id(str)
                rtype(str): One of `"joined"` or `"archived"`
    
                events(list[FrozenEvent]): List of events to include in the room
                    (more events may be added when generating result).
    
    Erik Johnston's avatar
    Erik Johnston committed
                newly_joined(bool): If the user has newly joined the room
                full_state(bool): Whether the full state should be sent in result
                since_token(StreamToken): Earliest point to return events from, or None
                upto_token(StreamToken): Latest point to return events from.
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.room_id = room_id
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.rtype = rtype
    
    Erik Johnston's avatar
    Erik Johnston committed
            self.events = events
            self.newly_joined = newly_joined
            self.full_state = full_state