Skip to content
Snippets Groups Projects
sync.py 77 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2015, 2016 OpenMarket Ltd
    # Copyright 2018 New Vector Ltd
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Amber Brown's avatar
    Amber Brown committed
    import collections
    import itertools
    import logging
    
    from six import iteritems, itervalues
    
    
    from prometheus_client import Counter
    
    
    Amber Brown's avatar
    Amber Brown committed
    from twisted.internet import defer
    
    from synapse.api.constants import EventTypes, Membership
    from synapse.push.clientformat import format_push_rules_for_user
    
    from synapse.storage.roommember import MemberSummary
    
    from synapse.storage.state import StateFilter
    
    Amber Brown's avatar
    Amber Brown committed
    from synapse.types import RoomStreamToken
    
    from synapse.util.async_helpers import concurrently_execute
    
    from synapse.util.caches.expiringcache import ExpiringCache
    from synapse.util.caches.lrucache import LruCache
    
    Amber Brown's avatar
    Amber Brown committed
    from synapse.util.caches.response_cache import ResponseCache
    
    from synapse.util.logcontext import LoggingContext
    
    from synapse.util.metrics import Measure, measure_func
    
    from synapse.visibility import filter_events_for_client
    
    logger = logging.getLogger(__name__)
    
    # Debug logger for https://github.com/matrix-org/synapse/issues/4422
    issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
    
    
    # Counts the number of times we returned a non-empty sync. `type` is one of
    # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
    # "true" or "false" depending on if the request asked for lazy loaded members or
    # not.
    non_empty_sync_counter = Counter(
    
        "synapse_handlers_sync_nonempty_total",
        "Count of non empty sync responses. type is initial_sync/full_state_sync"
        "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
        "enabled for that request.",
        ["type", "lazy_loaded"],
    
    # Store the cache that tracks which lazy-loaded members have been sent to a given
    # client for no more than 30 minutes.
    LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
    
    # Remember the last 100 members we sent to a client for the purposes of
    # avoiding redundantly sending the same lazy-loaded members to the client
    LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
    
    
    Mark Haines's avatar
    Mark Haines committed
    
    SyncConfig = collections.namedtuple("SyncConfig", [
        "user",
    
        "filter_collection",
    
        "request_key",
    
    class TimelineBatch(collections.namedtuple("TimelineBatch", [
        "prev_batch",
    
        "limited",
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
            return bool(self.events)
    
        __bool__ = __nonzero__  # python3
    
    class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
    
        "room_id",           # str
        "timeline",          # TimelineBatch
    
        "state",             # dict[(str, str), FrozenEvent]
    
        "account_data",
    
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
            return bool(
                self.timeline
                or self.state
                or self.ephemeral
    
                or self.account_data
    
                # nb the notification count does not, er, count: if there's nothing
                # else in the result, we don't need to send it.
    
        __bool__ = __nonzero__  # python3
    
    class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
    
        "room_id",            # str
        "timeline",           # TimelineBatch
    
        "state",              # dict[(str, str), FrozenEvent]
    
        "account_data",
    
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
            return bool(
                self.timeline
                or self.state
    
                or self.account_data
    
        __bool__ = __nonzero__  # python3
    
    class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
    
        "room_id",   # str
        "invite",    # FrozenEvent: the invite event
    
        def __nonzero__(self):
            """Invited rooms should always be reported to the client"""
            return True
    
        __bool__ = __nonzero__  # python3
    
    class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
        "join",
        "invite",
        "leave",
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
    Erik Johnston's avatar
    Erik Johnston committed
            return bool(self.join or self.invite or self.leave)
    
        __bool__ = __nonzero__  # python3
    
    class DeviceLists(collections.namedtuple("DeviceLists", [
        "changed",   # list of user_ids whose devices may have changed
        "left",      # list of user_ids whose devices we no longer track
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            return bool(self.changed or self.left)
    
        __bool__ = __nonzero__  # python3
    
    Mark Haines's avatar
    Mark Haines committed
    class SyncResult(collections.namedtuple("SyncResult", [
    
    Mark Haines's avatar
    Mark Haines committed
        "next_batch",  # Token for the next sync
    
        "presence",  # List of presence events for the user.
    
        "account_data",  # List of account_data events for the user.
    
        "joined",  # JoinedSyncResult for each joined room.
        "invited",  # InvitedSyncResult for each invited room.
    
        "archived",  # ArchivedSyncResult for each archived room.
    
        "to_device",  # List of direct messages for the device.
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
        "device_lists",  # List of user_ids whose devices have changed
    
        "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                       # for this device
    
        "groups",
    
    Mark Haines's avatar
    Mark Haines committed
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
            """Make the result appear empty if there are no updates. This is used
            to tell if the notifier needs to wait for more events when polling for
            events.
            """
    
                self.presence or
                self.joined or
                self.invited or
                self.archived or
    
                self.account_data or
    
                self.to_device or
    
                self.device_lists or
                self.groups
    
        __bool__ = __nonzero__  # python3
    
    class SyncHandler(object):
    
    Mark Haines's avatar
    Mark Haines committed
    
        def __init__(self, hs):
    
            self.hs_config = hs.config
    
            self.store = hs.get_datastore()
            self.notifier = hs.get_notifier()
            self.presence_handler = hs.get_presence_handler()
    
    Mark Haines's avatar
    Mark Haines committed
            self.event_sources = hs.get_event_sources()
    
            self.response_cache = ResponseCache(hs, "sync")
    
            self.state = hs.get_state_handler()
    
    Neil Johnson's avatar
    Neil Johnson committed
            self.auth = hs.get_auth()
    
            # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
            self.lazy_loaded_members_cache = ExpiringCache(
                "lazy_loaded_members_cache", self.clock,
                max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
            )
    
    
    Neil Johnson's avatar
    Neil Johnson committed
        @defer.inlineCallbacks
    
        def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                   full_state=False):
    
            """Get the sync for a client if we have new data for it now. Otherwise
            wait for new data to arrive on the server. If the timeout expires, then
            return an empty sync result.
            Returns:
    
    Neil Johnson's avatar
    Neil Johnson committed
                Deferred[SyncResult]
    
            # If the user is not part of the mau group, then check that limits have
            # not been exceeded (if not part of the group by this point, almost certain
            # auth_blocking will occur)
            user_id = sync_config.user.to_string()
            yield self.auth.check_auth_blocking(user_id)
    
    
            res = yield self.response_cache.wrap(
    
                sync_config.request_key,
                self._wait_for_sync_for_user,
                sync_config, since_token, timeout, full_state,
            )
    
            defer.returnValue(res)
    
        @defer.inlineCallbacks
        def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                    full_state):
    
            if since_token is None:
                sync_type = "initial_sync"
            elif full_state:
                sync_type = "full_state_sync"
            else:
                sync_type = "incremental_sync"
    
    
            context = LoggingContext.current_context()
            if context:
    
                context.tag = sync_type
    
    
            if timeout == 0 or since_token is None or full_state:
                # we are going to return immediately, so don't bother calling
                # notifier.wait_for_events.
    
                result = yield self.current_sync_for_user(
                    sync_config, since_token, full_state=full_state,
                )
    
    Mark Haines's avatar
    Mark Haines committed
            else:
    
                def current_sync_callback(before_token, after_token):
    
                    return self.current_sync_for_user(sync_config, since_token)
    
                result = yield self.notifier.wait_for_events(
    
                    sync_config.user.to_string(), timeout, current_sync_callback,
    
                    from_token=since_token,
    
    
            if result:
                if sync_config.filter_collection.lazy_load_members():
                    lazy_loaded = "true"
                else:
                    lazy_loaded = "false"
                non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
    
            defer.returnValue(result)
    
        def current_sync_for_user(self, sync_config, since_token=None,
                                  full_state=False):
    
            """Get the sync for client needed to match what the server has now.
            Returns:
                A Deferred SyncResult.
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            return self.generate_sync_result(sync_config, since_token, full_state)
    
        @defer.inlineCallbacks
        def push_rules_for_user(self, user):
            user_id = user.to_string()
    
            rules = yield self.store.get_push_rules_for_user(user_id)
            rules = format_push_rules_for_user(user, rules)
    
        @defer.inlineCallbacks
    
        def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
    
            """Get the ephemeral events for each room the user is in
    
                sync_result_builder(SyncResultBuilder)
    
                now_token (StreamToken): Where the server is currently up to.
                since_token (StreamToken): Where the server was when the client
                    last synced.
            Returns:
                A tuple of the now StreamToken, updated to reflect the which typing
                events are included, and a dict mapping from room_id to a list of
                typing events for that room.
            """
    
    
            sync_config = sync_result_builder.sync_config
    
    
            with Measure(self.clock, "ephemeral_by_room"):
                typing_key = since_token.typing_key if since_token else "0"
    
                room_ids = sync_result_builder.joined_room_ids
    
                typing_source = self.event_sources.sources["typing"]
                typing, typing_key = yield typing_source.get_new_events(
                    user=sync_config.user,
                    from_key=typing_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("typing_key", typing_key)
    
                ephemeral_by_room = {}
    
                for event in typing:
                    # we want to exclude the room_id from the event, but modifying the
                    # result returned by the event source is poor form (it might cache
                    # the object)
                    room_id = event["room_id"]
    
                    event_copy = {k: v for (k, v) in iteritems(event)
    
                                  if k != "room_id"}
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
                receipt_key = since_token.receipt_key if since_token else "0"
    
                receipt_source = self.event_sources.sources["receipt"]
                receipts, receipt_key = yield receipt_source.get_new_events(
                    user=sync_config.user,
                    from_key=receipt_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("receipt_key", receipt_key)
    
                for event in receipts:
                    room_id = event["room_id"]
                    # exclude room id, as above
    
                    event_copy = {k: v for (k, v) in iteritems(event)
    
                                  if k != "room_id"}
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
            defer.returnValue((now_token, ephemeral_by_room))
    
        def _load_filtered_recents(self, room_id, sync_config, now_token,
                                   since_token=None, recents=None, newly_joined_room=False):
    
            Returns:
                a Deferred TimelineBatch
    
            with Measure(self.clock, "load_filtered_recents"):
                timeline_limit = sync_config.filter_collection.timeline_limit()
    
                block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
    
    Erik Johnston's avatar
    Erik Johnston committed
                if recents is None or newly_joined_room or timeline_limit < len(recents):
                    limited = True
                else:
                    limited = False
    
                if recents:
    
                    recents = sync_config.filter_collection.filter_room_timeline(recents)
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
                    current_state_ids = frozenset()
                    if any(e.is_state() for e in recents):
                        current_state_ids = yield self.state.get_current_state_ids(room_id)
    
                        current_state_ids = frozenset(itervalues(current_state_ids))
    
                    recents = yield filter_events_for_client(
                        self.store,
    
                        sync_config.user.to_string(),
                        recents,
    
                        always_include_ids=current_state_ids,
    
                if not limited or block_all_timeline:
    
                    defer.returnValue(TimelineBatch(
                        events=recents,
                        prev_batch=now_token,
                        limited=False
                    ))
    
                filtering_factor = 2
                load_limit = max(timeline_limit * filtering_factor, 10)
                max_repeat = 5  # Only try a few times per room, otherwise
                room_key = now_token.room_key
                end_key = room_key
    
    
                since_key = None
                if since_token and not newly_joined_room:
                    since_key = since_token.room_key
    
                while limited and len(recents) < timeline_limit and max_repeat:
    
                    # If we have a since_key then we are trying to get any events
                    # that have happened since `since_key` up to `end_key`, so we
                    # can just use `get_room_events_stream_for_room`.
                    # Otherwise, we want to return the last N events in the room
                    # in toplogical ordering.
    
                    if since_key:
                        events, end_key = yield self.store.get_room_events_stream_for_room(
                            room_id,
                            limit=load_limit + 1,
                            from_key=since_key,
                            to_key=end_key,
                        )
                    else:
                        events, end_key = yield self.store.get_recent_events_for_room(
                            room_id,
                            limit=load_limit + 1,
                            end_token=end_key,
                        )
    
                    loaded_recents = sync_config.filter_collection.filter_room_timeline(
                        events
                    )
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
                    current_state_ids = frozenset()
                    if any(e.is_state() for e in loaded_recents):
                        current_state_ids = yield self.state.get_current_state_ids(room_id)
    
                        current_state_ids = frozenset(itervalues(current_state_ids))
    
                    loaded_recents = yield filter_events_for_client(
                        self.store,
    
                        sync_config.user.to_string(),
                        loaded_recents,
    
                        always_include_ids=current_state_ids,
    
                    )
                    loaded_recents.extend(recents)
                    recents = loaded_recents
    
                    if len(events) <= load_limit:
                        limited = False
                        break
                    max_repeat -= 1
    
                if len(recents) > timeline_limit:
                    limited = True
                    recents = recents[-timeline_limit:]
                    room_key = recents[0].internal_metadata.before
    
                prev_batch_token = now_token.copy_and_replace(
                    "room_key", room_key
                )
    
            defer.returnValue(TimelineBatch(
    
                events=recents,
                prev_batch=prev_batch_token,
                limited=limited or newly_joined_room
    
    Mark Haines's avatar
    Mark Haines committed
        @defer.inlineCallbacks
    
        def get_state_after_event(self, event, state_filter=StateFilter.all()):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
            Get the room state after the given event
    
            Args:
                event(synapse.events.EventBase): event of interest
    
                state_filter (StateFilter): The state filter used to fetch state
                    from the database.
    
            Returns:
                A Deferred map from ((type, state_key)->Event)
    
            state_ids = yield self.store.get_state_ids_for_event(
    
                event.event_id, state_filter=state_filter,
    
                state_ids = state_ids.copy()
                state_ids[(event.type, event.state_key)] = event.event_id
            defer.returnValue(state_ids)
    
        def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
    
            """ Get the room state at a particular stream position
    
    
            Args:
                room_id(str): room for which to get state
                stream_position(StreamToken): point at which to get state
    
                state_filter (StateFilter): The state filter used to fetch state
                    from the database.
    
    
            Returns:
                A Deferred map from ((type, state_key)->Event)
    
            # FIXME this claims to get the state at a stream position, but
            # get_recent_events_for_room operates by topo ordering. This therefore
            # does not reliably give you the state at the given stream position.
            # (https://github.com/matrix-org/synapse/issues/3305)
    
            last_events, _ = yield self.store.get_recent_events_for_room(
    
                room_id, end_token=stream_position.room_key, limit=1,
    
                last_event = last_events[-1]
    
                state = yield self.get_state_after_event(
    
                    last_event, state_filter=state_filter,
    
                # no events in this room - so presumably no state
    
            defer.returnValue(state)
    
    
        @defer.inlineCallbacks
        def compute_summary(self, room_id, sync_config, batch, state, now_token):
            """ Works out a room summary block for this room, summarising the number
            of joined members in the room, and providing the 'hero' members if the
            room has no name so clients can consistently name rooms.  Also adds
            state events to 'state' if needed to describe the heroes.
    
            Args:
                room_id(str):
                sync_config(synapse.handlers.sync.SyncConfig):
                batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
                    the room that will be sent to the user.
                state(dict): dict of (type, state_key) -> Event as returned by
                    compute_state_delta
                now_token(str): Token of the end of the current batch.
    
            Returns:
                 A deferred dict describing the room summary
            """
    
    
            # FIXME: we could/should get this from room_stats when matthew/stats lands
    
    
            # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
            last_events, _ = yield self.store.get_recent_event_ids_for_room(
                room_id, end_token=now_token.room_key, limit=1,
            )
    
            if not last_events:
                defer.returnValue(None)
                return
    
            last_event = last_events[-1]
            state_ids = yield self.store.get_state_ids_for_event(
    
                last_event.event_id,
                state_filter=StateFilter.from_types([
    
                    (EventTypes.Name, ''),
                    (EventTypes.CanonicalAlias, ''),
    
            # this is heavily cached, thus: fast.
            details = yield self.store.get_room_summary(room_id)
    
    
            name_id = state_ids.get((EventTypes.Name, ''))
            canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
    
            summary = {}
    
            empty_ms = MemberSummary([], 0)
    
    
            # TODO: only send these when they change.
    
            summary["m.joined_member_count"] = (
                details.get(Membership.JOIN, empty_ms).count
            )
            summary["m.invited_member_count"] = (
                details.get(Membership.INVITE, empty_ms).count
            )
    
            # if the room has a name or canonical_alias set, we can skip
    
                name = yield self.store.get_event(name_id, allow_none=True)
    
                if name and name.content and name.content.name:
    
                    defer.returnValue(summary)
    
            if canonical_alias_id:
                canonical_alias = yield self.store.get_event(
    
                    canonical_alias_id, allow_none=True,
    
                if canonical_alias and canonical_alias.content and canonical_alias.content.alias:
    
            joined_user_ids = [
                r[0] for r in details.get(Membership.JOIN, empty_ms).members
            ]
            invited_user_ids = [
                r[0] for r in details.get(Membership.INVITE, empty_ms).members
            ]
            gone_user_ids = (
                [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
                [r[0] for r in details.get(Membership.BAN, empty_ms).members]
            )
    
            # FIXME: only build up a member_ids list for our heroes
            member_ids = {}
            for membership in (
                Membership.JOIN,
                Membership.INVITE,
                Membership.LEAVE,
                Membership.BAN
            ):
                for user_id, event_id in details.get(membership, empty_ms).members:
                    member_ids[user_id] = event_id
    
            # FIXME: order by stream ordering rather than as returned by SQL
    
            me = sync_config.user.to_string()
            if (joined_user_ids or invited_user_ids):
                summary['m.heroes'] = sorted(
                    [
                        user_id
                        for user_id in (joined_user_ids + invited_user_ids)
                        if user_id != me
                    ]
                )[0:5]
            else:
                summary['m.heroes'] = sorted(
    
                    [
                        user_id
                        for user_id in gone_user_ids
                        if user_id != me
                    ]
    
                )[0:5]
    
            if not sync_config.filter_collection.lazy_load_members():
                defer.returnValue(summary)
    
            # ensure we send membership events for heroes if needed
            cache_key = (sync_config.user.to_string(), sync_config.device_id)
            cache = self.get_lazy_loaded_members_cache(cache_key)
    
            # track which members the client should already know about via LL:
            # Ones which are already in state...
            existing_members = set(
                user_id for (typ, user_id) in state.keys()
                if typ == EventTypes.Member
            )
    
            # ...or ones which are in the timeline...
            for ev in batch.events:
                if ev.type == EventTypes.Member:
                    existing_members.add(ev.state_key)
    
            # ...and then ensure any missing ones get included in state.
            missing_hero_event_ids = [
                member_ids[hero_id]
                for hero_id in summary['m.heroes']
                if (
                    cache.get(hero_id) != member_ids[hero_id] and
                    hero_id not in existing_members
                )
            ]
    
            missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
            missing_hero_state = missing_hero_state.values()
    
            for s in missing_hero_state:
                cache.set(s.state_key, s.event_id)
                state[(EventTypes.Member, s.state_key)] = s
    
            defer.returnValue(summary)
    
        def get_lazy_loaded_members_cache(self, cache_key):
            cache = self.lazy_loaded_members_cache.get(cache_key)
            if cache is None:
                logger.debug("creating LruCache for %r", cache_key)
                cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
                self.lazy_loaded_members_cache[cache_key] = cache
            else:
                logger.debug("found LruCache for %r", cache_key)
            return cache
    
    
        @defer.inlineCallbacks
        def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
    
            """ Works out the difference in state between the start of the timeline
    
            and the previous sync.
    
    
            Args:
                room_id(str):
                batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
                    the room that will be sent to the user.
                sync_config(synapse.handlers.sync.SyncConfig):
                since_token(str|None): Token of the end of the previous batch. May
                    be None.
                now_token(str): Token of the end of the current batch.
                full_state(bool): Whether to force returning the full state.
    
            Returns:
    
                 A deferred dict of (type, state_key) -> Event
    
            """
            # TODO(mjark) Check if the state events were received by the server
            # after the previous sync, since we need to include those state
            # updates even if they occured logically before the previous event.
            # TODO(mjark) Check for new redactions in the state events.
    
            with Measure(self.clock, "compute_state_delta"):
    
                members_to_fetch = None
    
                lazy_load_members = sync_config.filter_collection.lazy_load_members()
                include_redundant_members = (
                    sync_config.filter_collection.include_redundant_members()
                )
    
    
                if lazy_load_members:
    
                    # We only request state for the members needed to display the
                    # timeline:
    
                    members_to_fetch = set(
                        event.sender  # FIXME: we also care about invite targets etc.
                        for event in batch.events
                    )
    
                    if full_state:
                        # always make sure we LL ourselves so we know we're in the room
                        # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
                        # We only need apply this on full state syncs given we disabled
                        # LL for incr syncs in #3840.
                        members_to_fetch.add(sync_config.user.to_string())
    
                    state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
                else:
                    state_filter = StateFilter.all()
    
                timeline_state = {
                    (event.type, event.state_key): event.event_id
                    for event in batch.events if event.is_state()
                }
    
    
                if full_state:
                    if batch:
    
                        current_state_ids = yield self.store.get_state_ids_for_event(
    
                            batch.events[-1].event_id, state_filter=state_filter,
    
                        state_ids = yield self.store.get_state_ids_for_event(
    
                            batch.events[0].event_id, state_filter=state_filter,
    
                        current_state_ids = yield self.get_state_at(
    
                            room_id, stream_position=now_token,
                            state_filter=state_filter,
    
                        state_ids = current_state_ids
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
    
                        timeline_start=state_ids,
    
                        previous={},
    
                        current=current_state_ids,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    )
                elif batch.limited:
    
                    state_at_timeline_start = yield self.store.get_state_ids_for_event(
    
                        batch.events[0].event_id, state_filter=state_filter,
    
                    )
    
                    # for now, we disable LL for gappy syncs - see
                    # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
                    # N.B. this slows down incr syncs as we are now processing way
                    # more state in the server than if we were LLing.
                    #
                    # We still have to filter timeline_start to LL entries (above) in order
                    # for _calculate_state's LL logic to work, as we have to include LL
                    # members for timeline senders in case they weren't loaded in the initial
                    # sync.  We do this by (counterintuitively) by filtering timeline_start
                    # members to just be ones which were timeline senders, which then ensures
                    # all of the rest get included in the state block (if we need to know
                    # about them).
    
                    state_filter = StateFilter.all()
    
                    state_at_previous_sync = yield self.get_state_at(
    
                        room_id, stream_position=since_token,
                        state_filter=state_filter,
    
                    current_state_ids = yield self.store.get_state_ids_for_event(
    
                        batch.events[-1].event_id, state_filter=state_filter,
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
                        timeline_start=state_at_timeline_start,
                        previous=state_at_previous_sync,
    
                        current=current_state_ids,
    
                        # we have to include LL members in case LL initial sync missed them
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    if lazy_load_members:
    
                        if members_to_fetch and batch.events:
    
                            # We're returning an incremental sync, with no
                            # "gap" since the previous sync, so normally there would be
                            # no state to return.
    
                            # But we're lazy-loading, so the client might need some more
                            # member events to understand the events in this timeline.
                            # So we fish out all the member events corresponding to the
                            # timeline here, and then dedupe any redundant ones below.
    
    
                            state_ids = yield self.store.get_state_ids_for_event(
    
                                batch.events[0].event_id,
                                # we only want members!
                                state_filter=StateFilter.from_types(
                                    (EventTypes.Member, member)
                                    for member in members_to_fetch
                                ),
    
                if lazy_load_members and not include_redundant_members:
                    cache_key = (sync_config.user.to_string(), sync_config.device_id)
    
                    cache = self.get_lazy_loaded_members_cache(cache_key)
    
    
                    # if it's a new sync sequence, then assume the client has had
                    # amnesia and doesn't want any recent lazy-loaded members
                    # de-duplicated.
                    if since_token is None:
                        logger.debug("clearing LruCache for %r", cache_key)
                        cache.clear()
                    else:
                        # only send members which aren't in our LruCache (either
                        # because they're new to this client or have been pushed out
                        # of the cache)
                        logger.debug("filtering state from %r...", state_ids)
                        state_ids = {
                            t: event_id
    
                            for t, event_id in iteritems(state_ids)
    
                            if cache.get(t[1]) != event_id
                        }
                        logger.debug("...to %r", state_ids)
    
                    # add any member IDs we are about to send into our LruCache
                    for t, event_id in itertools.chain(
                        state_ids.items(),
                        timeline_state.items(),
                    ):
                        if t[0] == EventTypes.Member:
                            cache.set(t[1], event_id)
    
    
            state = {}
            if state_ids:
    
                state = yield self.store.get_events(list(state_ids.values()))
    
            defer.returnValue({
                (e.type, e.state_key): e
    
                for e in sync_config.filter_collection.filter_room_state(list(state.values()))
    
        def unread_notifs_for_room_id(self, room_id, sync_config):
    
            with Measure(self.clock, "unread_notifs_for_room_id"):
    
                last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
                    user_id=sync_config.user.to_string(),
                    room_id=room_id,
                    receipt_type="m.read"
    
                notifs = []
                if last_unread_event_id:
                    notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
                        room_id, sync_config.user.to_string(), last_unread_event_id
                    )
                    defer.returnValue(notifs)
    
    
            # There is no new information in this period, so your notification
            # count is whatever it was last time.
            defer.returnValue(None)
    
    Erik Johnston's avatar
    Erik Johnston committed
        @defer.inlineCallbacks
        def generate_sync_result(self, sync_config, since_token=None, full_state=False):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Generates a sync result.
    
            Args:
                sync_config (SyncConfig)
                since_token (StreamToken)
                full_state (bool)
    
            Returns:
                Deferred(SyncResult)
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            # NB: The now_token gets changed by some of the generate_sync_* methods,
            # this is due to some of the underlying streams not supporting the ability
            # to query up to a given point.
            # Always use the `now_token` in `SyncResultBuilder`
    
    Erik Johnston's avatar
    Erik Johnston committed
            now_token = yield self.event_sources.get_current_token()
    
    
            logger.info(
                "Calculating sync response for %r between %s and %s",
                sync_config.user, since_token, now_token,
            )
    
    
            user_id = sync_config.user.to_string()
            app_service = self.store.get_app_service_by_user_id(user_id)
            if app_service:
                # We no longer support AS users using /sync directly.
                # See https://github.com/matrix-org/matrix-doc/issues/1144
                raise NotImplementedError()
            else:
                joined_room_ids = yield self.get_rooms_for_user_at(
                    user_id, now_token.room_stream_id,
                )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder = SyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_config, full_state,
                since_token=since_token,
                now_token=now_token,
    
                joined_room_ids=joined_room_ids,
    
    Erik Johnston's avatar
    Erik Johnston committed
            account_data_by_room = yield self._generate_sync_entry_for_account_data(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder
    
    Erik Johnston's avatar
    Erik Johnston committed
            res = yield self._generate_sync_entry_for_rooms(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder, account_data_by_room
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
    
            newly_joined_rooms, newly_joined_users, _, _ = res
            _, _, newly_left_rooms, newly_left_users = res
    
            block_all_presence_data = (
                since_token is None and
                sync_config.filter_collection.blocks_all_presence()
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
    
            if self.hs_config.use_presence and not block_all_presence_data:
    
                yield self._generate_sync_entry_for_presence(
                    sync_result_builder, newly_joined_rooms, newly_joined_users
                )
    
            yield self._generate_sync_entry_for_to_device(sync_result_builder)
    
    
            device_lists = yield self._generate_sync_entry_for_device_list(
    
                sync_result_builder,
                newly_joined_rooms=newly_joined_rooms,
                newly_joined_users=newly_joined_users,
    
                newly_left_rooms=newly_left_rooms,
    
                newly_left_users=newly_left_users,
    
            device_id = sync_config.device_id
            one_time_key_counts = {}
            if device_id:
                one_time_key_counts = yield self.store.count_e2e_one_time_keys(
                    user_id, device_id
                )
    
    
            yield self._generate_sync_entry_for_groups(sync_result_builder)
    
    
            # debug for https://github.com/matrix-org/synapse/issues/4422
            for joined_room in sync_result_builder.joined:
                room_id = joined_room.room_id
                if room_id in newly_joined_rooms:
                    issue4422_logger.debug(
                        "Sync result for newly joined room %s: %r",
                        room_id, joined_room,
                    )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            defer.returnValue(SyncResult(
    
    Erik Johnston's avatar
    Erik Johnston committed
                presence=sync_result_builder.presence,
                account_data=sync_result_builder.account_data,
                joined=sync_result_builder.joined,
                invited=sync_result_builder.invited,
                archived=sync_result_builder.archived,
    
                to_device=sync_result_builder.to_device,
    
                device_lists=device_lists,
    
                groups=sync_result_builder.groups,
    
                device_one_time_keys_count=one_time_key_counts,
    
    Erik Johnston's avatar
    Erik Johnston committed
                next_batch=sync_result_builder.now_token,
    
        @measure_func("_generate_sync_entry_for_groups")
        @defer.inlineCallbacks
        def _generate_sync_entry_for_groups(self, sync_result_builder):
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
            now_token = sync_result_builder.now_token
    
            if since_token and since_token.groups_key:
                results = yield self.store.get_groups_changes_for_user(
                    user_id, since_token.groups_key, now_token.groups_key,
                )
            else:
                results = yield self.store.get_all_groups_for_user(