Newer
Older
# Copyright 2018, 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
FrozenSet,
List,
Optional,
Set,
Tuple,
)
from prometheus_client import Counter
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
JsonDict,
MutableStateMap,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
Amber Brown
committed
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
# "true" or "false" depending on if the request asked for lazy loaded members or
# not.
non_empty_sync_counter = Counter(
"synapse_handlers_sync_nonempty_total",
"Count of non empty sync responses. type is initial_sync/full_state_sync"
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
"enabled for that request.",
["type", "lazy_loaded"],
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
# Remember the last 100 members we sent to a client for the purposes of
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
@attr.s(slots=True, frozen=True)
class SyncConfig:
user = attr.ib(type=UserID)
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
request_key = attr.ib(type=Tuple[Any, ...])
device_id = attr.ib(type=Optional[str])
@attr.s(slots=True, frozen=True)
class TimelineBatch:
prev_batch = attr.ib(type=StreamToken)
events = attr.ib(type=List[EventBase])
limited = attr.ib(type=bool)
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
# We can't freeze this class, because we need to update it after it's instantiated to
# update its unread count. This is because we calculate the unread count for a room only
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
@attr.s(slots=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
state = attr.ib(type=StateMap[EventBase])
ephemeral = attr.ib(type=List[JsonDict])
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(
self.timeline
or self.state
or self.ephemeral
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
@attr.s(slots=True, frozen=True)
class ArchivedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
state = attr.ib(type=StateMap[EventBase])
account_data = attr.ib(type=List[JsonDict])
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state or self.account_data)
@attr.s(slots=True, frozen=True)
class InvitedSyncResult:
room_id = attr.ib(type=str)
invite = attr.ib(type=EventBase)
def __bool__(self) -> bool:
"""Invited rooms should always be reported to the client"""
return True
@attr.s(slots=True, frozen=True)
class GroupsSyncResult:
join = attr.ib(type=JsonDict)
invite = attr.ib(type=JsonDict)
leave = attr.ib(type=JsonDict)
def __bool__(self) -> bool:
return bool(self.join or self.invite or self.leave)
@attr.s(slots=True, frozen=True)
class DeviceLists:
"""
Attributes:
changed: List of user_ids whose devices may have changed
left: List of user_ids whose devices we no longer track
"""
changed = attr.ib(type=Collection[str])
left = attr.ib(type=Collection[str])
def __bool__(self) -> bool:
return bool(self.changed or self.left)
class _RoomChanges:
"""The set of room entries to include in the sync, plus the set of joined
and left room IDs since last sync.
"""
room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
invited = attr.ib(type=List[InvitedSyncResult])
newly_joined_rooms = attr.ib(type=List[str])
newly_left_rooms = attr.ib(type=List[str])
@attr.s(slots=True, frozen=True)
class SyncResult:
"""
Attributes:
next_batch: Token for the next sync
presence: List of presence events for the user.
account_data: List of account_data events for the user.
joined: JoinedSyncResult for each joined room.
invited: InvitedSyncResult for each invited room.
archived: ArchivedSyncResult for each archived room.
to_device: List of direct messages for the device.
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
device_unused_fallback_key_types: List of key types that have an unused fallback
key
groups: Group updates, if any
"""
next_batch = attr.ib(type=StreamToken)
presence = attr.ib(type=List[JsonDict])
account_data = attr.ib(type=List[JsonDict])
joined = attr.ib(type=List[JoinedSyncResult])
invited = attr.ib(type=List[InvitedSyncResult])
archived = attr.ib(type=List[ArchivedSyncResult])
to_device = attr.ib(type=List[JsonDict])
device_lists = attr.ib(type=DeviceLists)
device_one_time_keys_count = attr.ib(type=JsonDict)
device_unused_fallback_key_types = attr.ib(type=List[str])
groups = attr.ib(type=Optional[GroupsSyncResult])
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
self.presence
or self.joined
or self.invited
or self.archived
or self.account_data
or self.to_device
or self.device_lists
or self.groups
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
) # type: ResponseCache[Tuple[Any, ...]]
self.state = hs.get_state_handler()
self.storage = hs.get_storage()
self.state_store = self.storage.state
# ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache",
self.clock,
max_len=0,
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
) # type: ExpiringCache[Tuple[str, Optional[str]], LruCache[str, str]]
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
# auth_blocking will occur)
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(requester=requester)
res = await self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config,
since_token,
timeout,
full_state,
logger.debug("Returning sync response for %s", user_id)
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
if since_token is None:
sync_type = "initial_sync"
elif full_state:
sync_type = "full_state_sync"
else:
sync_type = "incremental_sync"
context = current_context()
context.tag = sync_type
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = await self.current_sync_for_user(
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
timeout,
current_sync_callback,
if result:
if sync_config.filter_collection.lazy_load_members():
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
async def current_sync_for_user(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
with start_active_span("current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
rules = await self.store.get_push_rules_for_user(user_id)
rules = format_push_rules_for_user(user, rules)
async def ephemeral_by_room(
self,
sync_result_builder: "SyncResultBuilder",
now_token: StreamToken,
since_token: Optional[StreamToken] = None,
) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
"""Get the ephemeral events for each room the user is in
sync_result_builder
now_token: Where the server is currently up to.
since_token: Where the server was when the client
last synced.
Returns:
A tuple of the now StreamToken, updated to reflect the which typing
events are included, and a dict mapping from room_id to a list of
typing events for that room.
"""
sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids
typing_source = self.event_sources.sources["typing"]
typing, typing_key = await typing_source.get_new_events(
user=sync_config.user,
from_key=typing_key,
limit=sync_config.filter_collection.ephemeral_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
ephemeral_by_room = {} # type: JsonDict
for event in typing:
# we want to exclude the room_id from the event, but modifying the
# result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
receipt_key = since_token.receipt_key if since_token else 0
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
user=sync_config.user,
from_key=receipt_key,
limit=sync_config.filter_collection.ephemeral_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
for event in receipts:
room_id = event["room_id"]
# exclude room id, as above
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
return now_token, ephemeral_by_room
room_id: str,
sync_config: SyncConfig,
now_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline()
)
if (
potential_recents is None
or newly_joined_room
or timeline_limit < len(potential_recents)
):
if potential_recents:
recents = sync_config.filter_collection.filter_room_timeline(
potential_recents
)
# We check if there are any state events, if there are then we pass
# all current state events to the filter_events function. This is to
# ensure that we always include current state in the timeline
current_state_ids = frozenset() # type: FrozenSet[str]
if any(e.is_state() for e in recents):
current_state_ids_map = await self.store.get_current_state_ids(
current_state_ids = frozenset(current_state_ids_map.values())
recents = await filter_events_for_client(
sync_config.user.to_string(),
recents,
always_include_ids=current_state_ids,
if not limited or block_all_timeline:
prev_batch_token = now_token
if recents:
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
events=recents, prev_batch=prev_batch_token, limited=False
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
since_key = None
if since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
# If we have a since_key then we are trying to get any events
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in toplogical ordering.
events, end_key = await self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = await self.store.get_recent_events_for_room(
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
# We check if there are any state events, if there are then we pass
# all current state events to the filter_events function. This is to
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids_map = await self.store.get_current_state_ids(
current_state_ids = frozenset(current_state_ids_map.values())
loaded_recents = await filter_events_for_client(
sync_config.user.to_string(),
loaded_recents,
always_include_ids=current_state_ids,
)
loaded_recents.extend(recents)
recents = loaded_recents
if len(events) <= load_limit:
limited = False
break
max_repeat -= 1
Mark Haines
committed
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
Mark Haines
committed
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
Mark Haines
committed
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
limited=limited or newly_joined_room,
Mark Haines
committed
self, event: EventBase, state_filter: Optional[StateFilter] = None
event: event of interest
state_filter: The state filter used to fetch state from the database.
state_ids = await self.state_store.get_state_ids_for_event(
event.event_id, state_filter=state_filter or StateFilter.all()
if event.is_state():
state_ids = dict(state_ids)
state_ids[(event.type, event.state_key)] = event.event_id
self,
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
"""Get the room state at a particular stream position
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
last_event = last_events[-1]
state = await self.get_state_after_event(
last_event, state_filter=state_filter or StateFilter.all()
# no events in this room - so presumably no state
Richard van der Hoff
committed
state = {}
async def compute_summary(
self,
room_id: str,
sync_config: SyncConfig,
batch: TimelineBatch,
state: MutableStateMap[EventBase],
now_token: StreamToken,
) -> Optional[JsonDict]:
"""Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
Args
room_id
sync_config
batch: The timeline batch for the room that will be sent to the user.
state: State as returned by compute_state_delta
now_token: Token of the end of the current batch.
# FIXME: we could/should get this from room_stats when matthew/stats lands
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = await self.store.get_recent_event_ids_for_room(
)
if not last_events:
last_event = last_events[-1]
state_ids = await self.state_store.get_state_ids_for_event(
last_event.event_id,
state_filter=StateFilter.from_types(
[(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
),
details = await self.store.get_room_summary(room_id)
name_id = state_ids.get((EventTypes.Name, ""))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
# TODO: only send these when they change.
summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count
summary["m.invited_member_count"] = details.get(
Membership.INVITE, empty_ms
).count
# if the room has a name or canonical_alias set, we can skip
# calculating heroes. Empty strings are falsey, so we check
# for the "name" value and default to an empty string.
if name_id:
name = await self.store.get_event(name_id, allow_none=True)
if canonical_alias_id:
canonical_alias = await self.store.get_event(
if canonical_alias and canonical_alias.content.get("alias"):
Brendan Abolivier
committed
me = sync_config.user.to_string()
r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
gone_user_ids = [
r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
# FIXME: only build up a member_ids list for our heroes
member_ids = {}
for membership in (
Membership.JOIN,
Membership.INVITE,
Membership.LEAVE,
):
for user_id, event_id in details.get(membership, empty_ms).members:
member_ids[user_id] = event_id
# FIXME: order by stream ordering rather than as returned by SQL
summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5]
summary["m.heroes"] = sorted(gone_user_ids)[0:5]
if not sync_config.filter_collection.lazy_load_members():
# ensure we send membership events for heroes if needed
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
# track which members the client should already know about via LL:
# Ones which are already in state...
existing_members = {
user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
}
# ...or ones which are in the timeline...
for ev in batch.events:
if ev.type == EventTypes.Member:
existing_members.add(ev.state_key)
# ...and then ensure any missing ones get included in state.
missing_hero_event_ids = [
member_ids[hero_id]
cache.get(hero_id) != member_ids[hero_id]
and hero_id not in existing_members
missing_hero_state = await self.store.get_events(missing_hero_event_ids)
for s in missing_hero_state.values():
cache.set(s.state_key, s.event_id)
state[(EventTypes.Member, s.state_key)] = s
def get_lazy_loaded_members_cache(
self, cache_key: Tuple[str, Optional[str]]
) -> LruCache[str, str]:
cache = self.lazy_loaded_members_cache.get(
cache_key
) # type: Optional[LruCache[str, str]]
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
return cache
self,
room_id: str,
batch: TimelineBatch,
sync_config: SyncConfig,
since_token: Optional[StreamToken],
now_token: StreamToken,
full_state: bool,
) -> MutableStateMap[EventBase]:
"""Works out the difference in state between the start of the timeline
room_id:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
since_token: Token of the end of the previous batch. May be None.
now_token: Token of the end of the current batch.
full_state: Whether to force returning the full state.
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
# updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
Richard van der Hoff
committed
with Measure(self.clock, "compute_state_delta"):
members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
sync_config.filter_collection.include_redundant_members()
)
# We only request state for the members needed to display the
# timeline:
members_to_fetch = {
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
}
if full_state:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
current_state_ids = await self.state_store.get_state_ids_for_event(
Erik Johnston
committed
)
state_ids = await self.state_store.get_state_ids_for_event(
current_state_ids = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
Erik Johnston
committed
)
state_at_timeline_start = (
await self.state_store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
# more state in the server than if we were LLing.
#
# We still have to filter timeline_start to LL entries (above) in order
# for _calculate_state's LL logic to work, as we have to include LL
# members for timeline senders in case they weren't loaded in the initial
# sync. We do this by (counterintuitively) by filtering timeline_start
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
state_filter = StateFilter.all()
# If this is an initial sync then full_state should be set, and
# that case is handled above. We assert here to ensure that this
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
current_state_ids = await self.state_store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
else:
# Its not clear how we get here, but empirically we do
# (#5407). Logging has been added elsewhere to try and
# figure out where this state comes from.
current_state_ids = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
Erik Johnston
committed
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
# we have to include LL members in case LL initial sync missed them
state_ids = {}
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
state_ids = await self.state_store.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member)
for member in members_to_fetch
),
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
# de-duplicated.
if since_token is None:
logger.debug("clearing LruCache for %r", cache_key)
cache.clear()
else:
# only send members which aren't in our LruCache (either
# because they're new to this client or have been pushed out
# of the cache)
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
for t, event_id in state_ids.items()
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)
# add any member IDs we are about to send into our LruCache
for t, event_id in itertools.chain(
):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)
state = {} # type: Dict[str, EventBase]
state = await self.store.get_events(list(state_ids.values()))
return {
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(
list(state.values())
)
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
)
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Generates a sync result."""
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})
"Calculating sync response for %r between %s and %s",
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else: