Skip to content
Snippets Groups Projects
sync.py 69.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2015, 2016 OpenMarket Ltd
    # Copyright 2018 New Vector Ltd
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Amber Brown's avatar
    Amber Brown committed
    import collections
    import itertools
    import logging
    
    from six import iteritems, itervalues
    
    from twisted.internet import defer
    
    from synapse.api.constants import EventTypes, Membership
    from synapse.push.clientformat import format_push_rules_for_user
    from synapse.types import RoomStreamToken
    
    from synapse.util.async_helpers import concurrently_execute
    
    from synapse.util.caches.expiringcache import ExpiringCache
    from synapse.util.caches.lrucache import LruCache
    
    Amber Brown's avatar
    Amber Brown committed
    from synapse.util.caches.response_cache import ResponseCache
    
    from synapse.util.logcontext import LoggingContext
    
    from synapse.util.metrics import Measure, measure_func
    
    from synapse.visibility import filter_events_for_client
    
    logger = logging.getLogger(__name__)
    
    # Store the cache that tracks which lazy-loaded members have been sent to a given
    # client for no more than 30 minutes.
    LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
    
    # Remember the last 100 members we sent to a client for the purposes of
    # avoiding redundantly sending the same lazy-loaded members to the client
    LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
    
    
    Mark Haines's avatar
    Mark Haines committed
    
    SyncConfig = collections.namedtuple("SyncConfig", [
        "user",
    
        "filter_collection",
    
        "request_key",
    
    class TimelineBatch(collections.namedtuple("TimelineBatch", [
        "prev_batch",
    
        "limited",
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
            return bool(self.events)
    
        __bool__ = __nonzero__  # python3
    
    class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
    
        "room_id",           # str
        "timeline",          # TimelineBatch
    
        "state",             # dict[(str, str), FrozenEvent]
    
        "account_data",
    
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
            return bool(
                self.timeline
                or self.state
                or self.ephemeral
    
                or self.account_data
    
                # nb the notification count does not, er, count: if there's nothing
                # else in the result, we don't need to send it.
    
        __bool__ = __nonzero__  # python3
    
    class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
    
        "room_id",            # str
        "timeline",           # TimelineBatch
    
        "state",              # dict[(str, str), FrozenEvent]
    
        "account_data",
    
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            """Make the result appear empty if there are no updates. This is used
            to tell if room needs to be part of the sync result.
            """
    
            return bool(
                self.timeline
                or self.state
    
                or self.account_data
    
        __bool__ = __nonzero__  # python3
    
    class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
    
        "room_id",   # str
        "invite",    # FrozenEvent: the invite event
    
        def __nonzero__(self):
            """Invited rooms should always be reported to the client"""
            return True
    
        __bool__ = __nonzero__  # python3
    
    class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
        "join",
        "invite",
        "leave",
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
    Erik Johnston's avatar
    Erik Johnston committed
            return bool(self.join or self.invite or self.leave)
    
        __bool__ = __nonzero__  # python3
    
    class DeviceLists(collections.namedtuple("DeviceLists", [
        "changed",   # list of user_ids whose devices may have changed
        "left",      # list of user_ids whose devices we no longer track
    ])):
        __slots__ = []
    
        def __nonzero__(self):
            return bool(self.changed or self.left)
    
        __bool__ = __nonzero__  # python3
    
    Mark Haines's avatar
    Mark Haines committed
    class SyncResult(collections.namedtuple("SyncResult", [
    
    Mark Haines's avatar
    Mark Haines committed
        "next_batch",  # Token for the next sync
    
        "presence",  # List of presence events for the user.
    
        "account_data",  # List of account_data events for the user.
    
        "joined",  # JoinedSyncResult for each joined room.
        "invited",  # InvitedSyncResult for each invited room.
    
        "archived",  # ArchivedSyncResult for each archived room.
    
        "to_device",  # List of direct messages for the device.
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
        "device_lists",  # List of user_ids whose devices have changed
    
        "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                       # for this device
    
        "groups",
    
    Mark Haines's avatar
    Mark Haines committed
    ])):
        __slots__ = []
    
        def __nonzero__(self):
    
            """Make the result appear empty if there are no updates. This is used
            to tell if the notifier needs to wait for more events when polling for
            events.
            """
    
                self.presence or
                self.joined or
                self.invited or
                self.archived or
    
                self.account_data or
    
                self.to_device or
    
                self.device_lists or
                self.groups
    
        __bool__ = __nonzero__  # python3
    
    class SyncHandler(object):
    
    Mark Haines's avatar
    Mark Haines committed
    
        def __init__(self, hs):
    
            self.hs_config = hs.config
    
            self.store = hs.get_datastore()
            self.notifier = hs.get_notifier()
            self.presence_handler = hs.get_presence_handler()
    
    Mark Haines's avatar
    Mark Haines committed
            self.event_sources = hs.get_event_sources()
    
            self.response_cache = ResponseCache(hs, "sync")
    
            self.state = hs.get_state_handler()
    
    Neil Johnson's avatar
    Neil Johnson committed
            self.auth = hs.get_auth()
    
            # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
            self.lazy_loaded_members_cache = ExpiringCache(
                "lazy_loaded_members_cache", self.clock,
                max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
            )
    
    
    Neil Johnson's avatar
    Neil Johnson committed
        @defer.inlineCallbacks
    
        def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                   full_state=False):
    
            """Get the sync for a client if we have new data for it now. Otherwise
            wait for new data to arrive on the server. If the timeout expires, then
            return an empty sync result.
            Returns:
    
    Neil Johnson's avatar
    Neil Johnson committed
                Deferred[SyncResult]
    
            # If the user is not part of the mau group, then check that limits have
            # not been exceeded (if not part of the group by this point, almost certain
            # auth_blocking will occur)
            user_id = sync_config.user.to_string()
            yield self.auth.check_auth_blocking(user_id)
    
    
            res = yield self.response_cache.wrap(
    
                sync_config.request_key,
                self._wait_for_sync_for_user,
                sync_config, since_token, timeout, full_state,
            )
    
            defer.returnValue(res)
    
        @defer.inlineCallbacks
        def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                    full_state):
    
            context = LoggingContext.current_context()
            if context:
                if since_token is None:
                    context.tag = "initial_sync"
                elif full_state:
                    context.tag = "full_state_sync"
                else:
                    context.tag = "incremental_sync"
    
    
            if timeout == 0 or since_token is None or full_state:
                # we are going to return immediately, so don't bother calling
                # notifier.wait_for_events.
    
                result = yield self.current_sync_for_user(
                    sync_config, since_token, full_state=full_state,
                )
    
    Mark Haines's avatar
    Mark Haines committed
            else:
    
                def current_sync_callback(before_token, after_token):
    
                    return self.current_sync_for_user(sync_config, since_token)
    
                result = yield self.notifier.wait_for_events(
    
                    sync_config.user.to_string(), timeout, current_sync_callback,
    
                    from_token=since_token,
    
        def current_sync_for_user(self, sync_config, since_token=None,
                                  full_state=False):
    
            """Get the sync for client needed to match what the server has now.
            Returns:
                A Deferred SyncResult.
            """
    
    Erik Johnston's avatar
    Erik Johnston committed
            return self.generate_sync_result(sync_config, since_token, full_state)
    
        @defer.inlineCallbacks
        def push_rules_for_user(self, user):
            user_id = user.to_string()
    
            rules = yield self.store.get_push_rules_for_user(user_id)
            rules = format_push_rules_for_user(user, rules)
    
        @defer.inlineCallbacks
    
        def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
    
            """Get the ephemeral events for each room the user is in
    
                sync_result_builder(SyncResultBuilder)
    
                now_token (StreamToken): Where the server is currently up to.
                since_token (StreamToken): Where the server was when the client
                    last synced.
            Returns:
                A tuple of the now StreamToken, updated to reflect the which typing
                events are included, and a dict mapping from room_id to a list of
                typing events for that room.
            """
    
    
            sync_config = sync_result_builder.sync_config
    
    
            with Measure(self.clock, "ephemeral_by_room"):
                typing_key = since_token.typing_key if since_token else "0"
    
                room_ids = sync_result_builder.joined_room_ids
    
                typing_source = self.event_sources.sources["typing"]
                typing, typing_key = yield typing_source.get_new_events(
                    user=sync_config.user,
                    from_key=typing_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("typing_key", typing_key)
    
                ephemeral_by_room = {}
    
                for event in typing:
                    # we want to exclude the room_id from the event, but modifying the
                    # result returned by the event source is poor form (it might cache
                    # the object)
                    room_id = event["room_id"]
    
                    event_copy = {k: v for (k, v) in iteritems(event)
    
                                  if k != "room_id"}
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
                receipt_key = since_token.receipt_key if since_token else "0"
    
                receipt_source = self.event_sources.sources["receipt"]
                receipts, receipt_key = yield receipt_source.get_new_events(
                    user=sync_config.user,
                    from_key=receipt_key,
                    limit=sync_config.filter_collection.ephemeral_limit(),
                    room_ids=room_ids,
                    is_guest=sync_config.is_guest,
                )
                now_token = now_token.copy_and_replace("receipt_key", receipt_key)
    
                for event in receipts:
                    room_id = event["room_id"]
                    # exclude room id, as above
    
                    event_copy = {k: v for (k, v) in iteritems(event)
    
                                  if k != "room_id"}
                    ephemeral_by_room.setdefault(room_id, []).append(event_copy)
    
            defer.returnValue((now_token, ephemeral_by_room))
    
        def _load_filtered_recents(self, room_id, sync_config, now_token,
                                   since_token=None, recents=None, newly_joined_room=False):
    
            Returns:
                a Deferred TimelineBatch
    
            with Measure(self.clock, "load_filtered_recents"):
                timeline_limit = sync_config.filter_collection.timeline_limit()
    
                block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
    
    Erik Johnston's avatar
    Erik Johnston committed
                if recents is None or newly_joined_room or timeline_limit < len(recents):
                    limited = True
                else:
                    limited = False
    
                if recents:
    
                    recents = sync_config.filter_collection.filter_room_timeline(recents)
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
                    current_state_ids = frozenset()
                    if any(e.is_state() for e in recents):
                        current_state_ids = yield self.state.get_current_state_ids(room_id)
    
                        current_state_ids = frozenset(itervalues(current_state_ids))
    
                    recents = yield filter_events_for_client(
                        self.store,
    
                        sync_config.user.to_string(),
                        recents,
    
                        always_include_ids=current_state_ids,
    
                if not limited or block_all_timeline:
    
                    defer.returnValue(TimelineBatch(
                        events=recents,
                        prev_batch=now_token,
                        limited=False
                    ))
    
                filtering_factor = 2
                load_limit = max(timeline_limit * filtering_factor, 10)
                max_repeat = 5  # Only try a few times per room, otherwise
                room_key = now_token.room_key
                end_key = room_key
    
    
                since_key = None
                if since_token and not newly_joined_room:
                    since_key = since_token.room_key
    
                while limited and len(recents) < timeline_limit and max_repeat:
    
                    # If we have a since_key then we are trying to get any events
                    # that have happened since `since_key` up to `end_key`, so we
                    # can just use `get_room_events_stream_for_room`.
                    # Otherwise, we want to return the last N events in the room
                    # in toplogical ordering.
    
                    if since_key:
                        events, end_key = yield self.store.get_room_events_stream_for_room(
                            room_id,
                            limit=load_limit + 1,
                            from_key=since_key,
                            to_key=end_key,
                        )
                    else:
                        events, end_key = yield self.store.get_recent_events_for_room(
                            room_id,
                            limit=load_limit + 1,
                            end_token=end_key,
                        )
    
                    loaded_recents = sync_config.filter_collection.filter_room_timeline(
                        events
                    )
    
    
                    # We check if there are any state events, if there are then we pass
                    # all current state events to the filter_events function. This is to
                    # ensure that we always include current state in the timeline
                    current_state_ids = frozenset()
                    if any(e.is_state() for e in loaded_recents):
                        current_state_ids = yield self.state.get_current_state_ids(room_id)
    
                        current_state_ids = frozenset(itervalues(current_state_ids))
    
                    loaded_recents = yield filter_events_for_client(
                        self.store,
    
                        sync_config.user.to_string(),
                        loaded_recents,
    
                        always_include_ids=current_state_ids,
    
                    )
                    loaded_recents.extend(recents)
                    recents = loaded_recents
    
                    if len(events) <= load_limit:
                        limited = False
                        break
                    max_repeat -= 1
    
                if len(recents) > timeline_limit:
                    limited = True
                    recents = recents[-timeline_limit:]
                    room_key = recents[0].internal_metadata.before
    
                prev_batch_token = now_token.copy_and_replace(
                    "room_key", room_key
                )
    
            defer.returnValue(TimelineBatch(
    
                events=recents,
                prev_batch=prev_batch_token,
                limited=limited or newly_joined_room
    
    Mark Haines's avatar
    Mark Haines committed
        @defer.inlineCallbacks
    
        def get_state_after_event(self, event, types=None, filtered_types=None):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """
            Get the room state after the given event
    
            Args:
                event(synapse.events.EventBase): event of interest
    
                types(list[(str, str|None)]|None): List of (type, state_key) tuples
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    which are used to filter the state fetched. If `state_key` is None,
    
                    all events are returned of the given type.
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    May be None, which matches any key.
    
                filtered_types(list[str]|None): Only apply filtering via `types` to this
                    list of event types.  Other types of events are returned unfiltered.
                    If None, `types` filtering is applied to all events.
    
    
            Returns:
                A Deferred map from ((type, state_key)->Event)
    
            state_ids = yield self.store.get_state_ids_for_event(
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                event.event_id, types, filtered_types=filtered_types,
    
                state_ids = state_ids.copy()
                state_ids[(event.type, event.state_key)] = event.event_id
            defer.returnValue(state_ids)
    
        def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
    
            """ Get the room state at a particular stream position
    
    
            Args:
                room_id(str): room for which to get state
                stream_position(StreamToken): point at which to get state
    
                types(list[(str, str|None)]|None): List of (type, state_key) tuples
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    which are used to filter the state fetched. If `state_key` is None,
    
                    all events are returned of the given type.
                filtered_types(list[str]|None): Only apply filtering via `types` to this
                    list of event types.  Other types of events are returned unfiltered.
                    If None, `types` filtering is applied to all events.
    
    
            Returns:
                A Deferred map from ((type, state_key)->Event)
    
            # FIXME this claims to get the state at a stream position, but
            # get_recent_events_for_room operates by topo ordering. This therefore
            # does not reliably give you the state at the given stream position.
            # (https://github.com/matrix-org/synapse/issues/3305)
    
            last_events, _ = yield self.store.get_recent_events_for_room(
    
                room_id, end_token=stream_position.room_key, limit=1,
    
                last_event = last_events[-1]
    
                state = yield self.get_state_after_event(
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    last_event, types, filtered_types=filtered_types,
    
                # no events in this room - so presumably no state
    
            defer.returnValue(state)
    
    
        @defer.inlineCallbacks
        def compute_summary(self, room_id, sync_config, batch, state, now_token):
            """ Works out a room summary block for this room, summarising the number
            of joined members in the room, and providing the 'hero' members if the
            room has no name so clients can consistently name rooms.  Also adds
            state events to 'state' if needed to describe the heroes.
    
            Args:
                room_id(str):
                sync_config(synapse.handlers.sync.SyncConfig):
                batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
                    the room that will be sent to the user.
                state(dict): dict of (type, state_key) -> Event as returned by
                    compute_state_delta
                now_token(str): Token of the end of the current batch.
    
            Returns:
                 A deferred dict describing the room summary
            """
    
            # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
            last_events, _ = yield self.store.get_recent_event_ids_for_room(
                room_id, end_token=now_token.room_key, limit=1,
            )
    
            if not last_events:
                defer.returnValue(None)
                return
    
            last_event = last_events[-1]
            state_ids = yield self.store.get_state_ids_for_event(
                last_event.event_id, [
                    (EventTypes.Member, None),
                    (EventTypes.Name, ''),
                    (EventTypes.CanonicalAlias, ''),
                ]
            )
    
            member_ids = {
                state_key: event_id
                for (t, state_key), event_id in state_ids.iteritems()
                if t == EventTypes.Member
            }
            name_id = state_ids.get((EventTypes.Name, ''))
            canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
    
            summary = {}
    
            # FIXME: it feels very heavy to load up every single membership event
            # just to calculate the counts.
            member_events = yield self.store.get_events(member_ids.values())
    
            joined_user_ids = []
            invited_user_ids = []
    
            for ev in member_events.values():
                if ev.content.get("membership") == Membership.JOIN:
                    joined_user_ids.append(ev.state_key)
                elif ev.content.get("membership") == Membership.INVITE:
                    invited_user_ids.append(ev.state_key)
    
            # TODO: only send these when they change.
            summary["m.joined_member_count"] = len(joined_user_ids)
            summary["m.invited_member_count"] = len(invited_user_ids)
    
            if name_id or canonical_alias_id:
                defer.returnValue(summary)
    
            # FIXME: order by stream ordering, not alphabetic
    
            me = sync_config.user.to_string()
            if (joined_user_ids or invited_user_ids):
                summary['m.heroes'] = sorted(
                    [
                        user_id
                        for user_id in (joined_user_ids + invited_user_ids)
                        if user_id != me
                    ]
                )[0:5]
            else:
                summary['m.heroes'] = sorted(
                    [user_id for user_id in member_ids.keys() if user_id != me]
                )[0:5]
    
            if not sync_config.filter_collection.lazy_load_members():
                defer.returnValue(summary)
    
            # ensure we send membership events for heroes if needed
            cache_key = (sync_config.user.to_string(), sync_config.device_id)
            cache = self.get_lazy_loaded_members_cache(cache_key)
    
            # track which members the client should already know about via LL:
            # Ones which are already in state...
            existing_members = set(
                user_id for (typ, user_id) in state.keys()
                if typ == EventTypes.Member
            )
    
            # ...or ones which are in the timeline...
            for ev in batch.events:
                if ev.type == EventTypes.Member:
                    existing_members.add(ev.state_key)
    
            # ...and then ensure any missing ones get included in state.
            missing_hero_event_ids = [
                member_ids[hero_id]
                for hero_id in summary['m.heroes']
                if (
                    cache.get(hero_id) != member_ids[hero_id] and
                    hero_id not in existing_members
                )
            ]
    
            missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
            missing_hero_state = missing_hero_state.values()
    
            for s in missing_hero_state:
                cache.set(s.state_key, s.event_id)
                state[(EventTypes.Member, s.state_key)] = s
    
            defer.returnValue(summary)
    
        def get_lazy_loaded_members_cache(self, cache_key):
            cache = self.lazy_loaded_members_cache.get(cache_key)
            if cache is None:
                logger.debug("creating LruCache for %r", cache_key)
                cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
                self.lazy_loaded_members_cache[cache_key] = cache
            else:
                logger.debug("found LruCache for %r", cache_key)
            return cache
    
    
        @defer.inlineCallbacks
        def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
    
            """ Works out the difference in state between the start of the timeline
    
            and the previous sync.
    
    
            Args:
                room_id(str):
                batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
                    the room that will be sent to the user.
                sync_config(synapse.handlers.sync.SyncConfig):
                since_token(str|None): Token of the end of the previous batch. May
                    be None.
                now_token(str): Token of the end of the current batch.
                full_state(bool): Whether to force returning the full state.
    
            Returns:
    
                 A deferred dict of (type, state_key) -> Event
    
            """
            # TODO(mjark) Check if the state events were received by the server
            # after the previous sync, since we need to include those state
            # updates even if they occured logically before the previous event.
            # TODO(mjark) Check for new redactions in the state events.
    
            with Measure(self.clock, "compute_state_delta"):
    
                lazy_load_members = sync_config.filter_collection.lazy_load_members()
                include_redundant_members = (
                    sync_config.filter_collection.include_redundant_members()
                )
    
    
                if lazy_load_members:
    
                    # We only request state for the members needed to display the
                    # timeline:
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    types = [
    
                        (EventTypes.Member, state_key)
                        for state_key in set(
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                            event.sender  # FIXME: we also care about invite targets etc.
    
                            for event in batch.events
                        )
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                    ]
    
                    # only apply the filtering to room members
                    filtered_types = [EventTypes.Member]
    
                timeline_state = {
                    (event.type, event.state_key): event.event_id
                    for event in batch.events if event.is_state()
                }
    
    
                if full_state:
                    if batch:
    
                        current_state_ids = yield self.store.get_state_ids_for_event(
    
                            batch.events[-1].event_id, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                            filtered_types=filtered_types,
    
                        state_ids = yield self.store.get_state_ids_for_event(
    
                            batch.events[0].event_id, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                            filtered_types=filtered_types,
    
                        current_state_ids = yield self.get_state_at(
    
                            room_id, stream_position=now_token, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                            filtered_types=filtered_types,
    
                        state_ids = current_state_ids
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
    
                        timeline_start=state_ids,
    
                        previous={},
    
                        current=current_state_ids,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    )
                elif batch.limited:
                    state_at_previous_sync = yield self.get_state_at(
    
                        room_id, stream_position=since_token, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        filtered_types=filtered_types,
    
                    current_state_ids = yield self.store.get_state_ids_for_event(
    
                        batch.events[-1].event_id, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        filtered_types=filtered_types,
    
                    state_at_timeline_start = yield self.store.get_state_ids_for_event(
    
                        batch.events[0].event_id, types=types,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        filtered_types=filtered_types,
    
                    state_ids = _calculate_state(
    
                        timeline_contains=timeline_state,
                        timeline_start=state_at_timeline_start,
                        previous=state_at_previous_sync,
    
                        current=current_state_ids,
    
    Matthew Hodgson's avatar
    Matthew Hodgson committed
                        lazy_load_members=lazy_load_members,
    
                    if lazy_load_members:
    
                            # We're returning an incremental sync, with no "gap" since
                            # the previous sync, so normally there would be no state to return
                            # But we're lazy-loading, so the client might need some more
                            # member events to understand the events in this timeline.
                            # So we fish out all the member events corresponding to the
                            # timeline here, and then dedupe any redundant ones below.
    
    
                            state_ids = yield self.store.get_state_ids_for_event(
    
                                batch.events[0].event_id, types=types,
    
                                filtered_types=None,  # we only want members!
    
                if lazy_load_members and not include_redundant_members:
                    cache_key = (sync_config.user.to_string(), sync_config.device_id)
    
                    cache = self.get_lazy_loaded_members_cache(cache_key)
    
    
                    # if it's a new sync sequence, then assume the client has had
                    # amnesia and doesn't want any recent lazy-loaded members
                    # de-duplicated.
                    if since_token is None:
                        logger.debug("clearing LruCache for %r", cache_key)
                        cache.clear()
                    else:
                        # only send members which aren't in our LruCache (either
                        # because they're new to this client or have been pushed out
                        # of the cache)
                        logger.debug("filtering state from %r...", state_ids)
                        state_ids = {
                            t: event_id
                            for t, event_id in state_ids.iteritems()
                            if cache.get(t[1]) != event_id
                        }
                        logger.debug("...to %r", state_ids)
    
                    # add any member IDs we are about to send into our LruCache
                    for t, event_id in itertools.chain(
                        state_ids.items(),
                        timeline_state.items(),
                    ):
                        if t[0] == EventTypes.Member:
                            cache.set(t[1], event_id)
    
    
            state = {}
            if state_ids:
    
                state = yield self.store.get_events(list(state_ids.values()))
    
            defer.returnValue({
                (e.type, e.state_key): e
    
                for e in sync_config.filter_collection.filter_room_state(list(state.values()))
    
        def unread_notifs_for_room_id(self, room_id, sync_config):
    
            with Measure(self.clock, "unread_notifs_for_room_id"):
    
                last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
                    user_id=sync_config.user.to_string(),
                    room_id=room_id,
                    receipt_type="m.read"
    
                notifs = []
                if last_unread_event_id:
                    notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
                        room_id, sync_config.user.to_string(), last_unread_event_id
                    )
                    defer.returnValue(notifs)
    
    
            # There is no new information in this period, so your notification
            # count is whatever it was last time.
            defer.returnValue(None)
    
    Erik Johnston's avatar
    Erik Johnston committed
        @defer.inlineCallbacks
        def generate_sync_result(self, sync_config, since_token=None, full_state=False):
    
    Erik Johnston's avatar
    Erik Johnston committed
            """Generates a sync result.
    
            Args:
                sync_config (SyncConfig)
                since_token (StreamToken)
                full_state (bool)
    
            Returns:
                Deferred(SyncResult)
            """
    
            logger.info("Calculating sync response for %r", sync_config.user)
    
    Erik Johnston's avatar
    Erik Johnston committed
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            # NB: The now_token gets changed by some of the generate_sync_* methods,
            # this is due to some of the underlying streams not supporting the ability
            # to query up to a given point.
            # Always use the `now_token` in `SyncResultBuilder`
    
    Erik Johnston's avatar
    Erik Johnston committed
            now_token = yield self.event_sources.get_current_token()
    
    
            user_id = sync_config.user.to_string()
            app_service = self.store.get_app_service_by_user_id(user_id)
            if app_service:
                # We no longer support AS users using /sync directly.
                # See https://github.com/matrix-org/matrix-doc/issues/1144
                raise NotImplementedError()
            else:
                joined_room_ids = yield self.get_rooms_for_user_at(
                    user_id, now_token.room_stream_id,
                )
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            sync_result_builder = SyncResultBuilder(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_config, full_state,
                since_token=since_token,
                now_token=now_token,
    
                joined_room_ids=joined_room_ids,
    
    Erik Johnston's avatar
    Erik Johnston committed
            account_data_by_room = yield self._generate_sync_entry_for_account_data(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder
    
    Erik Johnston's avatar
    Erik Johnston committed
            res = yield self._generate_sync_entry_for_rooms(
    
    Erik Johnston's avatar
    Erik Johnston committed
                sync_result_builder, account_data_by_room
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
    
            newly_joined_rooms, newly_joined_users, _, _ = res
            _, _, newly_left_rooms, newly_left_users = res
    
            block_all_presence_data = (
                since_token is None and
                sync_config.filter_collection.blocks_all_presence()
    
    Erik Johnston's avatar
    Erik Johnston committed
            )
    
            if self.hs_config.use_presence and not block_all_presence_data:
    
                yield self._generate_sync_entry_for_presence(
                    sync_result_builder, newly_joined_rooms, newly_joined_users
                )
    
            yield self._generate_sync_entry_for_to_device(sync_result_builder)
    
    
            device_lists = yield self._generate_sync_entry_for_device_list(
    
                sync_result_builder,
                newly_joined_rooms=newly_joined_rooms,
                newly_joined_users=newly_joined_users,
    
                newly_left_rooms=newly_left_rooms,
    
                newly_left_users=newly_left_users,
    
            device_id = sync_config.device_id
            one_time_key_counts = {}
            if device_id:
                one_time_key_counts = yield self.store.count_e2e_one_time_keys(
                    user_id, device_id
                )
    
    
            yield self._generate_sync_entry_for_groups(sync_result_builder)
    
    
    Erik Johnston's avatar
    Erik Johnston committed
            defer.returnValue(SyncResult(
    
    Erik Johnston's avatar
    Erik Johnston committed
                presence=sync_result_builder.presence,
                account_data=sync_result_builder.account_data,
                joined=sync_result_builder.joined,
                invited=sync_result_builder.invited,
                archived=sync_result_builder.archived,
    
                to_device=sync_result_builder.to_device,
    
                device_lists=device_lists,
    
                groups=sync_result_builder.groups,
    
                device_one_time_keys_count=one_time_key_counts,
    
    Erik Johnston's avatar
    Erik Johnston committed
                next_batch=sync_result_builder.now_token,
    
        @measure_func("_generate_sync_entry_for_groups")
        @defer.inlineCallbacks
        def _generate_sync_entry_for_groups(self, sync_result_builder):
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
            now_token = sync_result_builder.now_token
    
            if since_token and since_token.groups_key:
                results = yield self.store.get_groups_changes_for_user(
                    user_id, since_token.groups_key, now_token.groups_key,
                )
            else:
                results = yield self.store.get_all_groups_for_user(
                    user_id, now_token.groups_key,
                )
    
            invited = {}
            joined = {}
            left = {}
            for result in results:
                membership = result["membership"]
                group_id = result["group_id"]
                gtype = result["type"]
                content = result["content"]
    
                if membership == "join":
                    if gtype == "membership":
    
                        content.pop("membership", None)
    
                        joined[group_id] = content["content"]
    
                    else:
                        joined.setdefault(group_id, {})[gtype] = content
                elif membership == "invite":
                    if gtype == "membership":
                        content.pop("membership", None)
                        invited[group_id] = content["content"]
                else:
                    if gtype == "membership":
                        left[group_id] = content["content"]
    
            sync_result_builder.groups = GroupsSyncResult(
                join=joined,
                invite=invited,
                leave=left,
            )
    
    
        @measure_func("_generate_sync_entry_for_device_list")
    
        @defer.inlineCallbacks
    
        def _generate_sync_entry_for_device_list(self, sync_result_builder,
                                                 newly_joined_rooms, newly_joined_users,
                                                 newly_left_rooms, newly_left_users):
    
            user_id = sync_result_builder.sync_config.user.to_string()
            since_token = sync_result_builder.since_token
    
            if since_token and since_token.device_list_key:
                changed = yield self.store.get_user_whose_devices_changed(
                    since_token.device_list_key
                )
    
    
                # TODO: Be more clever than this, i.e. remove users who we already
                # share a room with?
                for room_id in newly_joined_rooms:
                    joined_users = yield self.state.get_current_user_in_room(room_id)
                    newly_joined_users.update(joined_users)
    
                for room_id in newly_left_rooms:
                    left_users = yield self.state.get_current_user_in_room(room_id)
                    newly_left_users.update(left_users)
    
    
                # TODO: Check that these users are actually new, i.e. either they
                # weren't in the previous sync *or* they left and rejoined.
                changed.update(newly_joined_users)
    
                if not changed and not newly_left_users:
                    defer.returnValue(DeviceLists(
                        changed=[],
                        left=newly_left_users,
                    ))
    
    
                users_who_share_room = yield self.store.get_users_who_share_room_with_user(
                    user_id
                )
    
                defer.returnValue(DeviceLists(
                    changed=users_who_share_room & changed,
                    left=set(newly_left_users) - users_who_share_room,
                ))
    
                defer.returnValue(DeviceLists(
                    changed=[],
                    left=[],
                ))