Skip to content
Snippets Groups Projects
sync.py 83.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2015, 2016 OpenMarket Ltd
    
    # Copyright 2018, 2019 New Vector Ltd
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    Amber Brown's avatar
    Amber Brown committed
    import itertools
    import logging
    
    from typing import (
        TYPE_CHECKING,
        Any,
        Collection,
        Dict,
        FrozenSet,
        List,
        Optional,
        Set,
        Tuple,
    )
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    import attr
    
    from prometheus_client import Counter
    
    
    from synapse.api.constants import AccountDataTypes, EventTypes, Membership
    
    from synapse.api.filtering import FilterCollection
    from synapse.events import EventBase
    
    from synapse.logging.context import current_context
    
    from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
    
    Amber Brown's avatar
    Amber Brown committed
    from synapse.push.clientformat import format_push_rules_for_user
    
    from synapse.storage.roommember import MemberSummary
    
    from synapse.storage.state import StateFilter
    
    from synapse.types import (
        JsonDict,
    
        RoomStreamToken,
        StateMap,
        StreamToken,
        UserID,
    )
    
    from synapse.util.async_helpers import concurrently_execute
    
    from synapse.util.caches.expiringcache import ExpiringCache
    from synapse.util.caches.lrucache import LruCache
    
    Amber Brown's avatar
    Amber Brown committed
    from synapse.util.caches.response_cache import ResponseCache
    
    from synapse.util.metrics import Measure, measure_func
    
    from synapse.visibility import filter_events_for_client
    
    if TYPE_CHECKING:
        from synapse.server import HomeServer
    
    
    logger = logging.getLogger(__name__)
    
    # Debug logger for https://github.com/matrix-org/synapse/issues/4422
    issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
    
    
    # Counts the number of times we returned a non-empty sync. `type` is one of
    # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
    # "true" or "false" depending on if the request asked for lazy loaded members or
    # not.
    non_empty_sync_counter = Counter(
    
        "synapse_handlers_sync_nonempty_total",
        "Count of non empty sync responses. type is initial_sync/full_state_sync"
        "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
        "enabled for that request.",
        ["type", "lazy_loaded"],
    
    # Store the cache that tracks which lazy-loaded members have been sent to a given
    # client for no more than 30 minutes.
    LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
    
    # Remember the last 100 members we sent to a client for the purposes of
    # avoiding redundantly sending the same lazy-loaded members to the client
    LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
    
    
    @attr.s(slots=True, frozen=True)
    class SyncConfig:
        user = attr.ib(type=UserID)
        filter_collection = attr.ib(type=FilterCollection)
        is_guest = attr.ib(type=bool)
        request_key = attr.ib(type=Tuple[Any, ...])
    
        device_id = attr.ib(type=Optional[str])
    
    @attr.s(slots=True, frozen=True)
    class TimelineBatch:
        prev_batch = attr.ib(type=StreamToken)
        events = attr.ib(type=List[EventBase])
    
        def __bool__(self) -> bool:
    
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
            return bool(self.events)
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    # We can't freeze this class, because we need to update it after it's instantiated to
    # update its unread count. This is because we calculate the unread count for a room only
    # if there are updates for it, which we check after the instance has been created.
    # This should not be a big deal because we update the notification counts afterwards as
    # well anyway.
    @attr.s(slots=True)
    
    class JoinedSyncResult:
        room_id = attr.ib(type=str)
        timeline = attr.ib(type=TimelineBatch)
        state = attr.ib(type=StateMap[EventBase])
        ephemeral = attr.ib(type=List[JsonDict])
        account_data = attr.ib(type=List[JsonDict])
        unread_notifications = attr.ib(type=JsonDict)
        summary = attr.ib(type=Optional[JsonDict])
    
        unread_count = attr.ib(type=int)
    
        def __bool__(self) -> bool:
    
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
            return bool(
                self.timeline
                or self.state
                or self.ephemeral
    
                or self.account_data
    
                # nb the notification count does not, er, count: if there's nothing
                # else in the result, we don't need to send it.
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    @attr.s(slots=True, frozen=True)
    class ArchivedSyncResult:
        room_id = attr.ib(type=str)
        timeline = attr.ib(type=TimelineBatch)
        state = attr.ib(type=StateMap[EventBase])
        account_data = attr.ib(type=List[JsonDict])
    
    
        def __bool__(self) -> bool:
    
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
    Amber Brown's avatar
    Amber Brown committed
            return bool(self.timeline or self.state or self.account_data)
    
    
    @attr.s(slots=True, frozen=True)
    class InvitedSyncResult:
        room_id = attr.ib(type=str)
        invite = attr.ib(type=EventBase)
    
        def __bool__(self) -> bool:
    
            """Invited rooms should always be reported to the client"""
            return True
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    @attr.s(slots=True, frozen=True)
    class GroupsSyncResult:
        join = attr.ib(type=JsonDict)
        invite = attr.ib(type=JsonDict)
        leave = attr.ib(type=JsonDict)
    
        def __bool__(self) -> bool:
    
    Erik Johnston's avatar
    Erik Johnston committed
            return bool(self.join or self.invite or self.leave)
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    @attr.s(slots=True, frozen=True)
    class DeviceLists:
        """
        Attributes:
            changed: List of user_ids whose devices may have changed
            left: List of user_ids whose devices we no longer track
        """
    
        changed = attr.ib(type=Collection[str])
        left = attr.ib(type=Collection[str])
    
        def __bool__(self) -> bool:
    
            return bool(self.changed or self.left)
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    class _RoomChanges:
        """The set of room entries to include in the sync, plus the set of joined
        and left room IDs since last sync.
        """
    
        room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
        invited = attr.ib(type=List[InvitedSyncResult])
        newly_joined_rooms = attr.ib(type=List[str])
        newly_left_rooms = attr.ib(type=List[str])
    
    
    @attr.s(slots=True, frozen=True)
    class SyncResult:
        """
        Attributes:
            next_batch: Token for the next sync
            presence: List of presence events for the user.
            account_data: List of account_data events for the user.
            joined: JoinedSyncResult for each joined room.
            invited: InvitedSyncResult for each invited room.
            archived: ArchivedSyncResult for each archived room.
            to_device: List of direct messages for the device.
            device_lists: List of user_ids whose devices have changed
            device_one_time_keys_count: Dict of algorithm to count for one time keys
                for this device
    
            device_unused_fallback_key_types: List of key types that have an unused fallback
                key
    
            groups: Group updates, if any
        """
    
        next_batch = attr.ib(type=StreamToken)
        presence = attr.ib(type=List[JsonDict])
        account_data = attr.ib(type=List[JsonDict])
        joined = attr.ib(type=List[JoinedSyncResult])
        invited = attr.ib(type=List[InvitedSyncResult])
        archived = attr.ib(type=List[ArchivedSyncResult])
        to_device = attr.ib(type=List[JsonDict])
        device_lists = attr.ib(type=DeviceLists)
        device_one_time_keys_count = attr.ib(type=JsonDict)
    
        device_unused_fallback_key_types = attr.ib(type=List[str])
    
        groups = attr.ib(type=Optional[GroupsSyncResult])
    
    
        def __bool__(self) -> bool:
    
            """Make the result appear empty if there are no updates. This is used
            to tell if the notifier needs to wait for more events when polling for
            events.
            """
    
    Amber Brown's avatar
    Amber Brown committed
                self.presence
                or self.joined
                or self.invited
                or self.archived
                or self.account_data
                or self.to_device
                or self.device_lists
                or self.groups
    
    Amber Brown's avatar
    Amber Brown committed
    
    
    class SyncHandler:
    
        def __init__(self, hs: "HomeServer"):
    
            self.hs_config = hs.config
    
            self.store = hs.get_datastore()
            self.notifier = hs.get_notifier()
            self.presence_handler = hs.get_presence_handler()
    
    Mark Haines's avatar
    Mark Haines committed
            self.event_sources = hs.get_event_sources()
    
            self.response_cache = ResponseCache(
    
                hs.get_clock(), "sync"
    
            )  # type: ResponseCache[Tuple[Any, ...]]
    
            self.state = hs.get_state_handler()
    
    Neil Johnson's avatar
    Neil Johnson committed
            self.auth = hs.get_auth()
    
            self.storage = hs.get_storage()
            self.state_store = self.storage.state
    
            # ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
    
            self.lazy_loaded_members_cache = ExpiringCache(
    
    Amber Brown's avatar
    Amber Brown committed
                "lazy_loaded_members_cache",
                self.clock,
                max_len=0,
                expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
    
            )  # type: ExpiringCache[Tuple[str, Optional[str]], LruCache[str, str]]
    
        async def wait_for_sync_for_user(
    
            requester: Requester,
    
            sync_config: SyncConfig,
            since_token: Optional[StreamToken] = None,
            timeout: int = 0,
            full_state: bool = False,
        ) -> SyncResult:
    
            """Get the sync for a client if we have new data for it now. Otherwise
            wait for new data to arrive on the server. If the timeout expires, then
            return an empty sync result.
            """
    
            # If the user is not part of the mau group, then check that limits have
            # not been exceeded (if not part of the group by this point, almost certain
            # auth_blocking will occur)
            user_id = sync_config.user.to_string()
    
            await self.auth.check_auth_blocking(requester=requester)
    
            res = await self.response_cache.wrap(
    
                sync_config.request_key,
                self._wait_for_sync_for_user,
    
    Amber Brown's avatar
    Amber Brown committed
                sync_config,
                since_token,
                timeout,
                full_state,
    
            logger.debug("Returning sync response for %s", user_id)
    
        async def _wait_for_sync_for_user(
    
            self,
            sync_config: SyncConfig,
            since_token: Optional[StreamToken] = None,
            timeout: int = 0,
            full_state: bool = False,
        ) -> SyncResult:
    
            if since_token is None:
                sync_type = "initial_sync"
            elif full_state:
                sync_type = "full_state_sync"
            else:
                sync_type = "incremental_sync"
    
    
            context = current_context()
    
            if context:
    
                context.tag = sync_type
    
    
            if timeout == 0 or since_token is None or full_state:
                # we are going to return immediately, so don't bother calling
                # notifier.wait_for_events.
    
                result = await self.current_sync_for_user(
    
    Amber Brown's avatar
    Amber Brown committed
                    sync_config, since_token, full_state=full_state
    
    Mark Haines's avatar
    Mark Haines committed
            else:
    
    Amber Brown's avatar
    Amber Brown committed
    
    
                def current_sync_callback(before_token, after_token):
    
                    return self.current_sync_for_user(sync_config, since_token)
    
    
                result = await self.notifier.wait_for_events(
    
    Amber Brown's avatar
    Amber Brown committed
                    sync_config.user.to_string(),
                    timeout,
                    current_sync_callback,
    
                    from_token=since_token,
    
    
            if result:
                if sync_config.filter_collection.lazy_load_members():
                    lazy_loaded = "true"
                else:
                    lazy_loaded = "false"
                non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
    
    
            return result
    
        async def current_sync_for_user(
            self,
            sync_config: SyncConfig,
            since_token: Optional[StreamToken] = None,
            full_state: bool = False,
        ) -> SyncResult:
    
            """Get the sync for client needed to match what the server has now."""
    
            with start_active_span("current_sync_for_user"):
                log_kv({"since_token": since_token})
                sync_result = await self.generate_sync_result(
                    sync_config, since_token, full_state
                )
    
                set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
                return sync_result
    
        async def push_rules_for_user(self, user: UserID) -> JsonDict:
    
            user_id = user.to_string()
    
            rules = await self.store.get_push_rules_for_user(user_id)
    
            rules = format_push_rules_for_user(user, rules)
    
            return rules
    
        async def ephemeral_by_room(
            self,
            sync_result_builder: "SyncResultBuilder",
            now_token: StreamToken,
            since_token: Optional[StreamToken] = None,
        ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
    
            """Get the ephemeral events for each room the user is in
    
                sync_result_builder
                now_token: Where the server is currently up to.
                since_token: Where the server was when the client
    
                    last synced.
            Returns:
                A tuple of the now StreamToken, updated to reflect the which typing
                events are included, and a dict mapping from room_id to a list of
                typing events for that room.
            """
    
    
            sync_config = sync_result_builder.sync_config
    
    
            with Measure(self.clock, "ephemeral_by_room"):
    
                typing_key = since_token.typing_key if since_token else 0
    
                room_ids = sync_result_builder.joined_room_ids
    
                typing_source = self.event_sources.sources["typing"]
    
                typing, typing_key = await typing_source.get_new_events(
    
                    user=sync_config.user,
                    from_key=typing_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("typing_key", typing_key)
    
    
                ephemeral_by_room = {}  # type: JsonDict
    
    
                for event in typing:
                    # we want to exclude the room_id from the event, but modifying the
                    # result returned by the event source is poor form (it might cache
                    # the object)
                    room_id = event["room_id"]
    
                    event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
    
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
    
                receipt_key = since_token.receipt_key if since_token else 0
    
    
                receipt_source = self.event_sources.sources["receipt"]
    
                receipts, receipt_key = await receipt_source.get_new_events(
    
                    user=sync_config.user,
                    from_key=receipt_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("receipt_key", receipt_key)
    
                for event in receipts:
                    room_id = event["room_id"]
                    # exclude room id, as above
    
                    event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
    
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
            return now_token, ephemeral_by_room
    
        async def _load_filtered_recents(
    
    Amber Brown's avatar
    Amber Brown committed
            self,
    
            room_id: str,
            sync_config: SyncConfig,
            now_token: StreamToken,
            since_token: Optional[StreamToken] = None,
            potential_recents: Optional[List[EventBase]] = None,
            newly_joined_room: bool = False,
        ) -> TimelineBatch:
    
            with Measure(self.clock, "load_filtered_recents"):
                timeline_limit = sync_config.filter_collection.timeline_limit()
    
    Amber Brown's avatar
    Amber Brown committed
                block_all_timeline = (
                    sync_config.filter_collection.blocks_all_room_timeline()
                )
    
                if (
                    potential_recents is None
                    or newly_joined_room
                    or timeline_limit < len(potential_recents)
                ):
    
    Erik Johnston's avatar
    Erik Johnston committed
                    limited = True
                else:
                    limited = False
    
                if potential_recents:
                    recents = sync_config.filter_collection.filter_room_timeline(
                        potential_recents
                    )
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
    
                    current_state_ids = frozenset()  # type: FrozenSet[str]
    
                    if any(e.is_state() for e in recents):
    
                        current_state_ids_map = await self.store.get_current_state_ids(
    
                        current_state_ids = frozenset(current_state_ids_map.values())
    
                    recents = await filter_events_for_client(
    
                        self.storage,
    
                        sync_config.user.to_string(),
                        recents,
    
                        always_include_ids=current_state_ids,
    
                if not limited or block_all_timeline:
    
                    prev_batch_token = now_token
                    if recents:
                        room_key = recents[0].internal_metadata.before
                        prev_batch_token = now_token.copy_and_replace("room_key", room_key)
    
    
                    return TimelineBatch(
    
                        events=recents, prev_batch=prev_batch_token, limited=False
    
    Amber Brown's avatar
    Amber Brown committed
                    )
    
    
                filtering_factor = 2
                load_limit = max(timeline_limit * filtering_factor, 10)
                max_repeat = 5  # Only try a few times per room, otherwise
                room_key = now_token.room_key
                end_key = room_key
    
    
                since_key = None
                if since_token and not newly_joined_room:
                    since_key = since_token.room_key
    
                while limited and len(recents) < timeline_limit and max_repeat:
    
                    # If we have a since_key then we are trying to get any events
                    # that have happened since `since_key` up to `end_key`, so we
                    # can just use `get_room_events_stream_for_room`.
                    # Otherwise, we want to return the last N events in the room
                    # in toplogical ordering.
    
                        events, end_key = await self.store.get_room_events_stream_for_room(
    
                            room_id,
                            limit=load_limit + 1,
                            from_key=since_key,
                            to_key=end_key,
                        )
                    else:
    
                        events, end_key = await self.store.get_recent_events_for_room(
    
    Amber Brown's avatar
    Amber Brown committed
                            room_id, limit=load_limit + 1, end_token=end_key
    
                    loaded_recents = sync_config.filter_collection.filter_room_timeline(
                        events
                    )
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
                    current_state_ids = frozenset()
                    if any(e.is_state() for e in loaded_recents):
    
                        current_state_ids_map = await self.store.get_current_state_ids(
    
                        current_state_ids = frozenset(current_state_ids_map.values())
    
                    loaded_recents = await filter_events_for_client(
    
                        self.storage,
    
                        sync_config.user.to_string(),
                        loaded_recents,
    
                        always_include_ids=current_state_ids,
    
                    )
                    loaded_recents.extend(recents)
                    recents = loaded_recents
    
                    if len(events) <= load_limit:
                        limited = False
                        break
                    max_repeat -= 1
    
                if len(recents) > timeline_limit:
                    limited = True
                    recents = recents[-timeline_limit:]
    
                    room_key = recents[0].internal_metadata.before
    
    Amber Brown's avatar
    Amber Brown committed
                prev_batch_token = now_token.copy_and_replace("room_key", room_key)
    
            return TimelineBatch(
                events=recents,
                prev_batch=prev_batch_token,
                limited=limited or newly_joined_room,
    
    Amber Brown's avatar
    Amber Brown committed
            )
    
        async def get_state_after_event(
    
            self, event: EventBase, state_filter: Optional[StateFilter] = None
    
        ) -> StateMap[str]:
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
            Get the room state after the given event
    
                event: event of interest
                state_filter: The state filter used to fetch state from the database.
    
            state_ids = await self.state_store.get_state_ids_for_event(
    
                event.event_id, state_filter=state_filter or StateFilter.all()
    
                state_ids = dict(state_ids)
    
                state_ids[(event.type, event.state_key)] = event.event_id
    
            return state_ids
    
        async def get_state_at(
    
            self,
            room_id: str,
            stream_position: StreamToken,
    
            state_filter: Optional[StateFilter] = None,
    
        ) -> StateMap[str]:
    
            """Get the room state at a particular stream position
    
                room_id: room for which to get state
                stream_position: point at which to get state
                state_filter: The state filter used to fetch state from the database.
    
            # FIXME this claims to get the state at a stream position, but
            # get_recent_events_for_room operates by topo ordering. This therefore
            # does not reliably give you the state at the given stream position.
            # (https://github.com/matrix-org/synapse/issues/3305)
    
            last_events, _ = await self.store.get_recent_events_for_room(
    
    Amber Brown's avatar
    Amber Brown committed
                room_id, end_token=stream_position.room_key, limit=1
    
                last_event = last_events[-1]
    
                state = await self.get_state_after_event(
    
                    last_event, state_filter=state_filter or StateFilter.all()
    
                # no events in this room - so presumably no state
    
            return state
    
        async def compute_summary(
            self,
            room_id: str,
            sync_config: SyncConfig,
            batch: TimelineBatch,
    
            state: MutableStateMap[EventBase],
    
            now_token: StreamToken,
        ) -> Optional[JsonDict]:
    
            """Works out a room summary block for this room, summarising the number
    
            of joined members in the room, and providing the 'hero' members if the
            room has no name so clients can consistently name rooms.  Also adds
            state events to 'state' if needed to describe the heroes.
    
    
            Args
                room_id
                sync_config
                batch: The timeline batch for the room that will be sent to the user.
                state: State as returned by compute_state_delta
                now_token: Token of the end of the current batch.
    
            # FIXME: we could/should get this from room_stats when matthew/stats lands
    
    
            # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
    
            last_events, _ = await self.store.get_recent_event_ids_for_room(
    
    Amber Brown's avatar
    Amber Brown committed
                room_id, end_token=now_token.room_key, limit=1
    
    
            last_event = last_events[-1]
    
            state_ids = await self.state_store.get_state_ids_for_event(
    
    Amber Brown's avatar
    Amber Brown committed
                state_filter=StateFilter.from_types(
                    [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
                ),
    
            # this is heavily cached, thus: fast.
    
            details = await self.store.get_room_summary(room_id)
    
    Amber Brown's avatar
    Amber Brown committed
            name_id = state_ids.get((EventTypes.Name, ""))
            canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
    
            empty_ms = MemberSummary([], 0)
    
    
            # TODO: only send these when they change.
    
    Amber Brown's avatar
    Amber Brown committed
            summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count
            summary["m.invited_member_count"] = details.get(
                Membership.INVITE, empty_ms
            ).count
    
            # if the room has a name or canonical_alias set, we can skip
    
            # calculating heroes. Empty strings are falsey, so we check
            # for the "name" value and default to an empty string.
    
                name = await self.store.get_event(name_id, allow_none=True)
    
    Brendan Abolivier's avatar
    Brendan Abolivier committed
                if name and name.content.get("name"):
    
                    return summary
    
                canonical_alias = await self.store.get_event(
    
    Amber Brown's avatar
    Amber Brown committed
                    canonical_alias_id, allow_none=True
    
    Brendan Abolivier's avatar
    Brendan Abolivier committed
                if canonical_alias and canonical_alias.content.get("alias"):
    
                    return summary
    
            joined_user_ids = [
    
    Amber Brown's avatar
    Amber Brown committed
                r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
    
            ]
            invited_user_ids = [
    
    Amber Brown's avatar
    Amber Brown committed
                r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
    
    Amber Brown's avatar
    Amber Brown committed
            gone_user_ids = [
                r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
            ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
    
    
            # FIXME: only build up a member_ids list for our heroes
            member_ids = {}
            for membership in (
                Membership.JOIN,
                Membership.INVITE,
                Membership.LEAVE,
    
    Amber Brown's avatar
    Amber Brown committed
                Membership.BAN,
    
            ):
                for user_id, event_id in details.get(membership, empty_ms).members:
                    member_ids[user_id] = event_id
    
            # FIXME: order by stream ordering rather than as returned by SQL
    
    Amber Brown's avatar
    Amber Brown committed
            if joined_user_ids or invited_user_ids:
    
                summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5]
    
                summary["m.heroes"] = sorted(gone_user_ids)[0:5]
    
    
            if not sync_config.filter_collection.lazy_load_members():
    
                return summary
    
    
            # ensure we send membership events for heroes if needed
            cache_key = (sync_config.user.to_string(), sync_config.device_id)
            cache = self.get_lazy_loaded_members_cache(cache_key)
    
            # track which members the client should already know about via LL:
            # Ones which are already in state...
    
    Amber Brown's avatar
    Amber Brown committed
                user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
    
    
            # ...or ones which are in the timeline...
            for ev in batch.events:
                if ev.type == EventTypes.Member:
                    existing_members.add(ev.state_key)
    
            # ...and then ensure any missing ones get included in state.
            missing_hero_event_ids = [
                member_ids[hero_id]
    
    Amber Brown's avatar
    Amber Brown committed
                for hero_id in summary["m.heroes"]
    
    Amber Brown's avatar
    Amber Brown committed
                    cache.get(hero_id) != member_ids[hero_id]
                    and hero_id not in existing_members
    
            missing_hero_state = await self.store.get_events(missing_hero_event_ids)
    
            for s in missing_hero_state.values():
    
                cache.set(s.state_key, s.event_id)
                state[(EventTypes.Member, s.state_key)] = s
    
    
            return summary
    
        def get_lazy_loaded_members_cache(
            self, cache_key: Tuple[str, Optional[str]]
    
        ) -> LruCache[str, str]:
            cache = self.lazy_loaded_members_cache.get(
                cache_key
            )  # type: Optional[LruCache[str, str]]
    
            if cache is None:
                logger.debug("creating LruCache for %r", cache_key)
                cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
                self.lazy_loaded_members_cache[cache_key] = cache
            else:
                logger.debug("found LruCache for %r", cache_key)
            return cache
    
    
        async def compute_state_delta(
    
            self,
            room_id: str,
            batch: TimelineBatch,
            sync_config: SyncConfig,
            since_token: Optional[StreamToken],
            now_token: StreamToken,
            full_state: bool,
    
            """Works out the difference in state between the start of the timeline
    
            and the previous sync.
    
    
                room_id:
                batch: The timeline batch for the room that will be sent to the user.
                sync_config:
                since_token: Token of the end of the previous batch. May be None.
                now_token: Token of the end of the current batch.
                full_state: Whether to force returning the full state.
    
            """
            # TODO(mjark) Check if the state events were received by the server
            # after the previous sync, since we need to include those state
    
            # updates even if they occurred logically before the previous event.
    
            # TODO(mjark) Check for new redactions in the state events.
    
            with Measure(self.clock, "compute_state_delta"):
    
                members_to_fetch = None
    
                lazy_load_members = sync_config.filter_collection.lazy_load_members()
                include_redundant_members = (
                    sync_config.filter_collection.include_redundant_members()
                )
    
    
                if lazy_load_members:
    
                    # We only request state for the members needed to display the
                    # timeline:
    
                        event.sender  # FIXME: we also care about invite targets etc.
                        for event in batch.events
    
                    if full_state:
                        # always make sure we LL ourselves so we know we're in the room
                        # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
                        # We only need apply this on full state syncs given we disabled
                        # LL for incr syncs in #3840.
                        members_to_fetch.add(sync_config.user.to_string())
    
                    state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
                else:
                    state_filter = StateFilter.all()
    
                timeline_state = {
                    (event.type, event.state_key): event.event_id
    
    Amber Brown's avatar
    Amber Brown committed
                    for event in batch.events
                    if event.is_state()
    
                if full_state:
                    if batch:
    
                        current_state_ids = await self.state_store.get_state_ids_for_event(
    
    Amber Brown's avatar
    Amber Brown committed
                            batch.events[-1].event_id, state_filter=state_filter
    
                        state_ids = await self.state_store.get_state_ids_for_event(
    
    Amber Brown's avatar
    Amber Brown committed
                            batch.events[0].event_id, state_filter=state_filter
    
                        current_state_ids = await self.get_state_at(
    
    Amber Brown's avatar
    Amber Brown committed
                            room_id, stream_position=now_token, state_filter=state_filter
    
                        state_ids = current_state_ids
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
    
                        timeline_start=state_ids,
    
                        previous={},
    
                        current=current_state_ids,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    )
                elif batch.limited:
    
                        state_at_timeline_start = (
                            await self.state_store.get_state_ids_for_event(
                                batch.events[0].event_id, state_filter=state_filter
                            )
    
                        # We can get here if the user has ignored the senders of all
                        # the recent events.
    
                        state_at_timeline_start = await self.get_state_at(
    
                            room_id, stream_position=now_token, state_filter=state_filter
                        )
    
    
                    # for now, we disable LL for gappy syncs - see
                    # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
                    # N.B. this slows down incr syncs as we are now processing way
                    # more state in the server than if we were LLing.
                    #
                    # We still have to filter timeline_start to LL entries (above) in order
                    # for _calculate_state's LL logic to work, as we have to include LL
                    # members for timeline senders in case they weren't loaded in the initial
                    # sync.  We do this by (counterintuitively) by filtering timeline_start
                    # members to just be ones which were timeline senders, which then ensures
                    # all of the rest get included in the state block (if we need to know
                    # about them).
    
                    state_filter = StateFilter.all()
    
                    # If this is an initial sync then full_state should be set, and
                    # that case is handled above. We assert here to ensure that this
                    # is indeed the case.
                    assert since_token is not None
    
                    state_at_previous_sync = await self.get_state_at(
    
    Amber Brown's avatar
    Amber Brown committed
                        room_id, stream_position=since_token, state_filter=state_filter
    
                        current_state_ids = await self.state_store.get_state_ids_for_event(
    
                            batch.events[-1].event_id, state_filter=state_filter
                        )
                    else:
                        # Its not clear how we get here, but empirically we do
                        # (#5407). Logging has been added elsewhere to try and
                        # figure out where this state comes from.
    
                        current_state_ids = await self.get_state_at(
    
                            room_id, stream_position=now_token, state_filter=state_filter
                        )
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
                        timeline_start=state_at_timeline_start,
                        previous=state_at_previous_sync,
    
                        current=current_state_ids,
    
                        # we have to include LL members in case LL initial sync missed them
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    if lazy_load_members:
    
                        if members_to_fetch and batch.events:
    
                            # We're returning an incremental sync, with no
                            # "gap" since the previous sync, so normally there would be
                            # no state to return.
    
                            # But we're lazy-loading, so the client might need some more
                            # member events to understand the events in this timeline.
                            # So we fish out all the member events corresponding to the
                            # timeline here, and then dedupe any redundant ones below.
    
    
                            state_ids = await self.state_store.get_state_ids_for_event(
    
                                batch.events[0].event_id,
                                # we only want members!
                                state_filter=StateFilter.from_types(
                                    (EventTypes.Member, member)
                                    for member in members_to_fetch
                                ),
    
                if lazy_load_members and not include_redundant_members:
                    cache_key = (sync_config.user.to_string(), sync_config.device_id)
    
                    cache = self.get_lazy_loaded_members_cache(cache_key)
    
    
                    # if it's a new sync sequence, then assume the client has had
                    # amnesia and doesn't want any recent lazy-loaded members
                    # de-duplicated.
                    if since_token is None:
                        logger.debug("clearing LruCache for %r", cache_key)
                        cache.clear()
                    else:
                        # only send members which aren't in our LruCache (either
                        # because they're new to this client or have been pushed out
                        # of the cache)
                        logger.debug("filtering state from %r...", state_ids)
                        state_ids = {
                            t: event_id
    
                            for t, event_id in state_ids.items()
    
                            if cache.get(t[1]) != event_id
                        }
                        logger.debug("...to %r", state_ids)
    
                    # add any member IDs we are about to send into our LruCache
                    for t, event_id in itertools.chain(
    
    Amber Brown's avatar
    Amber Brown committed
                        state_ids.items(), timeline_state.items()
    
                    ):
                        if t[0] == EventTypes.Member:
                            cache.set(t[1], event_id)
    
    
            state = {}  # type: Dict[str, EventBase]
    
            if state_ids:
    
                state = await self.store.get_events(list(state_ids.values()))
    
            return {
                (e.type, e.state_key): e
                for e in sync_config.filter_collection.filter_room_state(
                    list(state.values())
                )
    
                if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
    
        async def unread_notifs_for_room_id(
            self, room_id: str, sync_config: SyncConfig
    
        ) -> Dict[str, int]:
    
            with Measure(self.clock, "unread_notifs_for_room_id"):
    
                last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
    
                    user_id=sync_config.user.to_string(),
                    room_id=room_id,
    
    Amber Brown's avatar
    Amber Brown committed
                    receipt_type="m.read",
    
                notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
                    room_id, sync_config.user.to_string(), last_unread_event_id
                )
                return notifs
    
        async def generate_sync_result(
    
            self,
            sync_config: SyncConfig,
            since_token: Optional[StreamToken] = None,
            full_state: bool = False,
        ) -> SyncResult:
    
            """Generates a sync result."""
    
    Erik Johnston's avatar
    Erik Johnston committed
            # NB: The now_token gets changed by some of the generate_sync_* methods,
            # this is due to some of the underlying streams not supporting the ability
            # to query up to a given point.
            # Always use the `now_token` in `SyncResultBuilder`
    
            now_token = self.event_sources.get_current_token()
    
            log_kv({"now_token": now_token})
    
                "Calculating sync response for %r between %s and %s",
    
    Amber Brown's avatar
    Amber Brown committed
                sync_config.user,
                since_token,
                now_token,
    
            user_id = sync_config.user.to_string()
            app_service = self.store.get_app_service_by_user_id(user_id)
            if app_service:
                # We no longer support AS users using /sync directly.
                # See https://github.com/matrix-org/matrix-doc/issues/1144
                raise NotImplementedError()
            else: