Skip to content
Snippets Groups Projects
Unverified Commit b9391c95 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Add typing to SyncHandler (#6821)

parent ae5b3104
No related branches found
No related tags found
No related merge requests found
Add type hints to `SyncHandler`.
......@@ -189,8 +189,14 @@ class EventBase(object):
redacts = _event_dict_property("redacts", None)
room_id = _event_dict_property("room_id")
sender = _event_dict_property("sender")
state_key = _event_dict_property("state_key")
type = _event_dict_property("type")
user_id = _event_dict_property("sender")
@property
def event_id(self) -> str:
raise NotImplementedError()
@property
def membership(self):
return self.content["membership"]
......@@ -281,10 +287,7 @@ class FrozenEvent(EventBase):
else:
frozen_dict = event_dict
self.event_id = event_dict["event_id"]
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
self._event_id = event_dict["event_id"]
super(FrozenEvent, self).__init__(
frozen_dict,
......@@ -294,6 +297,10 @@ class FrozenEvent(EventBase):
rejected_reason=rejected_reason,
)
@property
def event_id(self) -> str:
return self._event_id
def __str__(self):
return self.__repr__()
......@@ -332,9 +339,6 @@ class FrozenEventV2(EventBase):
frozen_dict = event_dict
self._event_id = None
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
super(FrozenEventV2, self).__init__(
frozen_dict,
......
......@@ -14,20 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import itertools
import logging
from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
from six import iteritems, itervalues
import attr
from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import LoggingContext
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.types import (
Collection,
JsonDict,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
......@@ -62,17 +72,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncConfig = collections.namedtuple(
"SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
)
@attr.s(slots=True, frozen=True)
class SyncConfig:
user = attr.ib(type=UserID)
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
request_key = attr.ib(type=Tuple[Any, ...])
device_id = attr.ib(type=str)
class TimelineBatch(
collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
):
__slots__ = []
@attr.s(slots=True, frozen=True)
class TimelineBatch:
prev_batch = attr.ib(type=StreamToken)
events = attr.ib(type=List[EventBase])
limited = attr.ib(bool)
def __nonzero__(self):
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
......@@ -81,23 +96,17 @@ class TimelineBatch(
__bool__ = __nonzero__ # python3
class JoinedSyncResult(
collections.namedtuple(
"JoinedSyncResult",
[
"room_id", # str
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
"unread_notifications",
"summary",
],
)
):
__slots__ = []
def __nonzero__(self):
@attr.s(slots=True, frozen=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
state = attr.ib(type=StateMap[EventBase])
ephemeral = attr.ib(type=List[JsonDict])
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
......@@ -113,20 +122,14 @@ class JoinedSyncResult(
__bool__ = __nonzero__ # python3
class ArchivedSyncResult(
collections.namedtuple(
"ArchivedSyncResult",
[
"room_id", # str
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
"account_data",
],
)
):
__slots__ = []
def __nonzero__(self):
@attr.s(slots=True, frozen=True)
class ArchivedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
state = attr.ib(type=StateMap[EventBase])
account_data = attr.ib(type=List[JsonDict])
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
......@@ -135,70 +138,88 @@ class ArchivedSyncResult(
__bool__ = __nonzero__ # python3
class InvitedSyncResult(
collections.namedtuple(
"InvitedSyncResult",
["room_id", "invite"], # str # FrozenEvent: the invite event
)
):
__slots__ = []
@attr.s(slots=True, frozen=True)
class InvitedSyncResult:
room_id = attr.ib(type=str)
invite = attr.ib(type=EventBase)
def __nonzero__(self):
def __nonzero__(self) -> bool:
"""Invited rooms should always be reported to the client"""
return True
__bool__ = __nonzero__ # python3
class GroupsSyncResult(
collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
):
__slots__ = []
@attr.s(slots=True, frozen=True)
class GroupsSyncResult:
join = attr.ib(type=JsonDict)
invite = attr.ib(type=JsonDict)
leave = attr.ib(type=JsonDict)
def __nonzero__(self):
def __nonzero__(self) -> bool:
return bool(self.join or self.invite or self.leave)
__bool__ = __nonzero__ # python3
class DeviceLists(
collections.namedtuple(
"DeviceLists",
[
"changed", # list of user_ids whose devices may have changed
"left", # list of user_ids whose devices we no longer track
],
)
):
__slots__ = []
@attr.s(slots=True, frozen=True)
class DeviceLists:
"""
Attributes:
changed: List of user_ids whose devices may have changed
left: List of user_ids whose devices we no longer track
"""
changed = attr.ib(type=Collection[str])
left = attr.ib(type=Collection[str])
def __nonzero__(self):
def __nonzero__(self) -> bool:
return bool(self.changed or self.left)
__bool__ = __nonzero__ # python3
class SyncResult(
collections.namedtuple(
"SyncResult",
[
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
"account_data", # List of account_data events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have changed
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
],
)
):
__slots__ = []
def __nonzero__(self):
@attr.s
class _RoomChanges:
"""The set of room entries to include in the sync, plus the set of joined
and left room IDs since last sync.
"""
room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
invited = attr.ib(type=List[InvitedSyncResult])
newly_joined_rooms = attr.ib(type=List[str])
newly_left_rooms = attr.ib(type=List[str])
@attr.s(slots=True, frozen=True)
class SyncResult:
"""
Attributes:
next_batch: Token for the next sync
presence: List of presence events for the user.
account_data: List of account_data events for the user.
joined: JoinedSyncResult for each joined room.
invited: InvitedSyncResult for each invited room.
archived: ArchivedSyncResult for each archived room.
to_device: List of direct messages for the device.
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
groups: Group updates, if any
"""
next_batch = attr.ib(type=StreamToken)
presence = attr.ib(type=List[JsonDict])
account_data = attr.ib(type=List[JsonDict])
joined = attr.ib(type=List[JoinedSyncResult])
invited = attr.ib(type=List[InvitedSyncResult])
archived = attr.ib(type=List[ArchivedSyncResult])
to_device = attr.ib(type=List[JsonDict])
device_lists = attr.ib(type=DeviceLists)
device_one_time_keys_count = attr.ib(type=JsonDict)
groups = attr.ib(type=Optional[GroupsSyncResult])
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
......@@ -240,13 +261,15 @@ class SyncHandler(object):
)
async def wait_for_sync_for_user(
self, sync_config, since_token=None, timeout=0, full_state=False
):
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
Deferred[SyncResult]
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
......@@ -265,8 +288,12 @@ class SyncHandler(object):
return res
async def _wait_for_sync_for_user(
self, sync_config, since_token, timeout, full_state
):
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
if since_token is None:
sync_type = "initial_sync"
elif full_state:
......@@ -305,25 +332,33 @@ class SyncHandler(object):
return result
def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
async def current_sync_for_user(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
return self.generate_sync_result(sync_config, since_token, full_state)
return await self.generate_sync_result(sync_config, since_token, full_state)
async def push_rules_for_user(self, user):
async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
rules = await self.store.get_push_rules_for_user(user_id)
rules = format_push_rules_for_user(user, rules)
return rules
async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
async def ephemeral_by_room(
self,
sync_result_builder: "SyncResultBuilder",
now_token: StreamToken,
since_token: Optional[StreamToken] = None,
) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
"""Get the ephemeral events for each room the user is in
Args:
sync_result_builder(SyncResultBuilder)
now_token (StreamToken): Where the server is currently up to.
since_token (StreamToken): Where the server was when the client
sync_result_builder
now_token: Where the server is currently up to.
since_token: Where the server was when the client
last synced.
Returns:
A tuple of the now StreamToken, updated to reflect the which typing
......@@ -348,7 +383,7 @@ class SyncHandler(object):
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
ephemeral_by_room = {}
ephemeral_by_room = {} # type: JsonDict
for event in typing:
# we want to exclude the room_id from the event, but modifying the
......@@ -380,13 +415,13 @@ class SyncHandler(object):
async def _load_filtered_recents(
self,
room_id,
sync_config,
now_token,
since_token=None,
recents=None,
newly_joined_room=False,
):
room_id: str,
sync_config: SyncConfig,
now_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
"""
Returns:
a Deferred TimelineBatch
......@@ -397,21 +432,29 @@ class SyncHandler(object):
sync_config.filter_collection.blocks_all_room_timeline()
)
if recents is None or newly_joined_room or timeline_limit < len(recents):
if (
potential_recents is None
or newly_joined_room
or timeline_limit < len(potential_recents)
):
limited = True
else:
limited = False
if recents:
recents = sync_config.filter_collection.filter_room_timeline(recents)
if potential_recents:
recents = sync_config.filter_collection.filter_room_timeline(
potential_recents
)
# We check if there are any state events, if there are then we pass
# all current state events to the filter_events function. This is to
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
current_state_ids = frozenset() # type: FrozenSet[str]
if any(e.is_state() for e in recents):
current_state_ids = await self.state.get_current_state_ids(room_id)
current_state_ids = frozenset(itervalues(current_state_ids))
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
current_state_ids = frozenset(itervalues(current_state_ids_map))
recents = await filter_events_for_client(
self.storage,
......@@ -463,8 +506,10 @@ class SyncHandler(object):
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids = await self.state.get_current_state_ids(room_id)
current_state_ids = frozenset(itervalues(current_state_ids))
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
current_state_ids = frozenset(itervalues(current_state_ids_map))
loaded_recents = await filter_events_for_client(
self.storage,
......@@ -493,17 +538,15 @@ class SyncHandler(object):
limited=limited or newly_joined_room,
)
async def get_state_after_event(self, event, state_filter=StateFilter.all()):
async def get_state_after_event(
self, event: EventBase, state_filter: StateFilter = StateFilter.all()
) -> StateMap[str]:
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
state_filter (StateFilter): The state filter used to fetch state
from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
event: event of interest
state_filter: The state filter used to fetch state from the database.
"""
state_ids = await self.state_store.get_state_ids_for_event(
event.event_id, state_filter=state_filter
......@@ -514,18 +557,17 @@ class SyncHandler(object):
return state_ids
async def get_state_at(
self, room_id, stream_position, state_filter=StateFilter.all()
):
self,
room_id: str,
stream_position: StreamToken,
state_filter: StateFilter = StateFilter.all(),
) -> StateMap[str]:
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
state_filter (StateFilter): The state filter used to fetch state
from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
......@@ -546,23 +588,25 @@ class SyncHandler(object):
state = {}
return state
async def compute_summary(self, room_id, sync_config, batch, state, now_token):
async def compute_summary(
self,
room_id: str,
sync_config: SyncConfig,
batch: TimelineBatch,
state: StateMap[EventBase],
now_token: StreamToken,
) -> Optional[JsonDict]:
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
Args:
room_id(str):
sync_config(synapse.handlers.sync.SyncConfig):
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
the room that will be sent to the user.
state(dict): dict of (type, state_key) -> Event as returned by
compute_state_delta
now_token(str): Token of the end of the current batch.
Returns:
A deferred dict describing the room summary
Args
room_id
sync_config
batch: The timeline batch for the room that will be sent to the user.
state: State as returned by compute_state_delta
now_token: Token of the end of the current batch.
"""
# FIXME: we could/should get this from room_stats when matthew/stats lands
......@@ -681,7 +725,7 @@ class SyncHandler(object):
return summary
def get_lazy_loaded_members_cache(self, cache_key):
def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
......@@ -692,23 +736,24 @@ class SyncHandler(object):
return cache
async def compute_state_delta(
self, room_id, batch, sync_config, since_token, now_token, full_state
):
self,
room_id: str,
batch: TimelineBatch,
sync_config: SyncConfig,
since_token: Optional[StreamToken],
now_token: StreamToken,
full_state: bool,
) -> StateMap[EventBase]:
""" Works out the difference in state between the start of the timeline
and the previous sync.
Args:
room_id(str):
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
the room that will be sent to the user.
sync_config(synapse.handlers.sync.SyncConfig):
since_token(str|None): Token of the end of the previous batch. May
be None.
now_token(str): Token of the end of the current batch.
full_state(bool): Whether to force returning the full state.
Returns:
A deferred dict of (type, state_key) -> Event
room_id:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
since_token: Token of the end of the previous batch. May be None.
now_token: Token of the end of the current batch.
full_state: Whether to force returning the full state.
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
......@@ -800,6 +845,10 @@ class SyncHandler(object):
# about them).
state_filter = StateFilter.all()
# If this is an initial sync then full_state should be set, and
# that case is handled above. We assert here to ensure that this
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
)
......@@ -874,7 +923,7 @@ class SyncHandler(object):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)
state = {}
state = {} # type: Dict[str, EventBase]
if state_ids:
state = await self.store.get_events(list(state_ids.values()))
......@@ -885,7 +934,9 @@ class SyncHandler(object):
)
}
async def unread_notifs_for_room_id(self, room_id, sync_config):
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> Optional[Dict[str, str]]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
......@@ -893,7 +944,6 @@ class SyncHandler(object):
receipt_type="m.read",
)
notifs = []
if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
......@@ -905,17 +955,12 @@ class SyncHandler(object):
return None
async def generate_sync_result(
self, sync_config, since_token=None, full_state=False
):
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
"""Generates a sync result.
Args:
sync_config (SyncConfig)
since_token (StreamToken)
full_state (bool)
Returns:
Deferred(SyncResult)
"""
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
......@@ -977,7 +1022,7 @@ class SyncHandler(object):
)
device_id = sync_config.device_id
one_time_key_counts = {}
one_time_key_counts = {} # type: JsonDict
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
......@@ -1007,7 +1052,9 @@ class SyncHandler(object):
)
@measure_func("_generate_sync_entry_for_groups")
async def _generate_sync_entry_for_groups(self, sync_result_builder):
async def _generate_sync_entry_for_groups(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
......@@ -1052,27 +1099,22 @@ class SyncHandler(object):
@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_users,
newly_left_rooms,
newly_left_users,
):
sync_result_builder: "SyncResultBuilder",
newly_joined_rooms: Set[str],
newly_joined_or_invited_users: Set[str],
newly_left_rooms: Set[str],
newly_left_users: Set[str],
) -> DeviceLists:
"""Generate the DeviceLists section of sync
Args:
sync_result_builder (SyncResultBuilder)
newly_joined_rooms (set[str]): Set of rooms user has joined since
previous sync
newly_joined_or_invited_users (set[str]): Set of users that have
joined or been invited to a room since previous sync.
newly_left_rooms (set[str]): Set of rooms user has left since
sync_result_builder
newly_joined_rooms: Set of rooms user has joined since previous sync
newly_joined_or_invited_users: Set of users that have joined or
been invited to a room since previous sync.
newly_left_rooms: Set of rooms user has left since previous sync
newly_left_users: Set of users that have left a room we're in since
previous sync
newly_left_users (set[str]): Set of users that have left a room
we're in since previous sync
Returns:
Deferred[DeviceLists]
"""
user_id = sync_result_builder.sync_config.user.to_string()
......@@ -1133,15 +1175,11 @@ class SyncHandler(object):
else:
return DeviceLists(changed=[], left=[])
async def _generate_sync_entry_for_to_device(self, sync_result_builder):
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
"""Generates the portion of the sync response. Populates
`sync_result_builder` with the result.
Args:
sync_result_builder(SyncResultBuilder)
Returns:
Deferred(dict): A dictionary containing the per room account data.
"""
user_id = sync_result_builder.sync_config.user.to_string()
device_id = sync_result_builder.sync_config.device_id
......@@ -1179,15 +1217,17 @@ class SyncHandler(object):
else:
sync_result_builder.to_device = []
async def _generate_sync_entry_for_account_data(self, sync_result_builder):
async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder"
) -> Dict[str, Dict[str, JsonDict]]:
"""Generates the account data portion of the sync response. Populates
`sync_result_builder` with the result.
Args:
sync_result_builder(SyncResultBuilder)
sync_result_builder
Returns:
Deferred(dict): A dictionary containing the per room account data.
A dictionary containing the per room account data.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
......@@ -1231,18 +1271,21 @@ class SyncHandler(object):
return account_data_by_room
async def _generate_sync_entry_for_presence(
self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
):
self,
sync_result_builder: "SyncResultBuilder",
newly_joined_rooms: Set[str],
newly_joined_or_invited_users: Set[str],
) -> None:
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
Args:
sync_result_builder(SyncResultBuilder)
newly_joined_rooms(list): List of rooms that the user has joined
since the last sync (or empty if an initial sync)
newly_joined_or_invited_users(list): List of users that have joined
or been invited to rooms since the last sync (or empty if an initial
sync)
sync_result_builder
newly_joined_rooms: Set of rooms that the user has joined since
the last sync (or empty if an initial sync)
newly_joined_or_invited_users: Set of users that have joined or
been invited to rooms since the last sync (or empty if an
initial sync)
"""
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
......@@ -1286,17 +1329,19 @@ class SyncHandler(object):
sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms(
self, sync_result_builder, account_data_by_room
):
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
Args:
sync_result_builder(SyncResultBuilder)
account_data_by_room(dict): Dictionary of per room account data
sync_result_builder
account_data_by_room: Dictionary of per room account data
Returns:
Deferred(tuple): Returns a 4-tuple of
Returns a 4-tuple of
`(newly_joined_rooms, newly_joined_or_invited_users,
newly_left_rooms, newly_left_users)`
"""
......@@ -1307,7 +1352,7 @@ class SyncHandler(object):
)
if block_all_room_ephemeral:
ephemeral_by_room = {}
ephemeral_by_room = {} # type: Dict[str, List[JsonDict]]
else:
now_token, ephemeral_by_room = await self.ephemeral_by_room(
sync_result_builder,
......@@ -1328,7 +1373,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
return [], [], [], []
return set(), set(), set(), set()
ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
......@@ -1340,19 +1385,22 @@ class SyncHandler(object):
ignored_users = frozenset()
if since_token:
res = await self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms, newly_left_rooms = res
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
res = await self._get_all_rooms(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
newly_left_rooms = []
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
tags_by_room = await self.store.get_tags_for_user(user_id)
room_entries = room_changes.room_entries
invited = room_changes.invited
newly_joined_rooms = room_changes.newly_joined_rooms
newly_left_rooms = room_changes.newly_left_rooms
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
......@@ -1392,13 +1440,15 @@ class SyncHandler(object):
newly_left_users -= newly_joined_or_invited_users
return (
newly_joined_rooms,
set(newly_joined_rooms),
newly_joined_or_invited_users,
newly_left_rooms,
set(newly_left_rooms),
newly_left_users,
)
async def _have_rooms_changed(self, sync_result_builder):
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
"""
......@@ -1422,22 +1472,10 @@ class SyncHandler(object):
return True
return False
async def _get_rooms_changed(self, sync_result_builder, ignored_users):
async def _get_rooms_changed(
self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
) -> _RoomChanges:
"""Gets the the changes that have happened since the last sync.
Args:
sync_result_builder(SyncResultBuilder)
ignored_users(set(str)): Set of users ignored by user.
Returns:
Deferred(tuple): Returns a tuple of the form:
`(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
where:
room_entries is a list [RoomSyncResultBuilder]
invited_rooms is a list [InvitedSyncResult]
newly_joined_rooms is a list[str] of room ids
newly_left_rooms is a list[str] of room ids
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
......@@ -1451,7 +1489,7 @@ class SyncHandler(object):
user_id, since_token.room_key, now_token.room_key
)
mem_change_events_by_room_id = {}
mem_change_events_by_room_id = {} # type: Dict[str, List[EventBase]]
for event in rooms_changed:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
......@@ -1570,7 +1608,7 @@ class SyncHandler(object):
# This is all screaming out for a refactor, as the logic here is
# subtle and the moving parts numerous.
if leave_event.internal_metadata.is_out_of_band_membership():
batch_events = [leave_event]
batch_events = [leave_event] # type: Optional[List[EventBase]]
else:
batch_events = None
......@@ -1636,18 +1674,17 @@ class SyncHandler(object):
)
room_entries.append(entry)
return room_entries, invited, newly_joined_rooms, newly_left_rooms
return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
async def _get_all_rooms(self, sync_result_builder, ignored_users):
async def _get_all_rooms(
self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
Args:
sync_result_builder(SyncResultBuilder)
ignored_users(set(str)): Set of users ignored by user.
sync_result_builder
ignored_users: Set of users ignored by user.
Returns:
Deferred(tuple): Returns a tuple of the form:
`([RoomSyncResultBuilder], [InvitedSyncResult], [])`
"""
user_id = sync_result_builder.sync_config.user.to_string()
......@@ -1709,30 +1746,30 @@ class SyncHandler(object):
)
)
return room_entries, invited, []
return _RoomChanges(room_entries, invited, [], [])
async def _generate_room_entry(
self,
sync_result_builder,
ignored_users,
room_builder,
ephemeral,
tags,
account_data,
always_include=False,
sync_result_builder: "SyncResultBuilder",
ignored_users: Set[str],
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
tags: Optional[List[JsonDict]],
account_data: Dict[str, JsonDict],
always_include: bool = False,
):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
Args:
sync_result_builder(SyncResultBuilder)
ignored_users(set(str)): Set of users ignored by user.
room_builder(RoomSyncResultBuilder)
ephemeral(list): List of new ephemeral events for room
tags(list): List of *all* tags for room, or None if there has been
sync_result_builder
ignored_users: Set of users ignored by user.
room_builder
ephemeral: List of new ephemeral events for room
tags: List of *all* tags for room, or None if there has been
no change.
account_data(list): List of new account data for room
always_include(bool): Always include this room in the sync response,
account_data: List of new account data for room
always_include: Always include this room in the sync response,
even if empty.
"""
newly_joined = room_builder.newly_joined
......@@ -1758,7 +1795,7 @@ class SyncHandler(object):
sync_config,
now_token=upto_token,
since_token=since_token,
recents=events,
potential_recents=events,
newly_joined_room=newly_joined,
)
......@@ -1809,7 +1846,7 @@ class SyncHandler(object):
room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
summary = {}
summary = {} # type: Optional[JsonDict]
# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
......@@ -1833,7 +1870,7 @@ class SyncHandler(object):
)
if room_builder.rtype == "joined":
unread_notifications = {}
unread_notifications = {} # type: Dict[str, str]
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
......@@ -1860,18 +1897,20 @@ class SyncHandler(object):
% (room_id, user_id, len(state))
)
elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult(
archived_room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
account_data=account_data_events,
)
if room_sync or always_include:
sync_result_builder.archived.append(room_sync)
if archived_room_sync or always_include:
sync_result_builder.archived.append(archived_room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(self, user_id, stream_ordering):
async def get_rooms_for_user_at(
self, user_id: str, stream_ordering: int
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
The stream ordering *must* be recent, otherwise this may throw an
......@@ -1879,12 +1918,11 @@ class SyncHandler(object):
current token, which should be perfectly fine).
Args:
user_id (str)
stream_ordering (int)
user_id
stream_ordering
ReturnValue:
Deferred[frozenset[str]]: Set of room_ids the user is in at given
stream_ordering.
Set of room_ids the user is in at given stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
......@@ -1911,11 +1949,10 @@ class SyncHandler(object):
if user_id in users_in_room:
joined_room_ids.add(room_id)
joined_room_ids = frozenset(joined_room_ids)
return joined_room_ids
return frozenset(joined_room_ids)
def _action_has_highlight(actions):
def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
try:
if action.get("set_tweak", None) == "highlight":
......@@ -1927,22 +1964,23 @@ def _action_has_highlight(actions):
def _calculate_state(
timeline_contains, timeline_start, previous, current, lazy_load_members
):
timeline_contains: StateMap[str],
timeline_start: StateMap[str],
previous: StateMap[str],
current: StateMap[str],
lazy_load_members: bool,
) -> StateMap[str]:
"""Works out what state to include in a sync response.
Args:
timeline_contains (dict): state in the timeline
timeline_start (dict): state at the start of the timeline
previous (dict): state at the end of the previous sync (or empty dict
timeline_contains: state in the timeline
timeline_start: state at the start of the timeline
previous: state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
lazy_load_members (bool): whether to return members from timeline_start
current: state at the end of the timeline
lazy_load_members: whether to return members from timeline_start
or not. assumes that timeline_start has already been filtered to
include only the members the client needs to know about.
Returns:
dict
"""
event_id_to_key = {
e: key
......@@ -1979,15 +2017,16 @@ def _calculate_state(
return {event_id_to_key[e]: e for e in state_ids}
class SyncResultBuilder(object):
@attr.s
class SyncResultBuilder:
"""Used to help build up a new SyncResult for a user
Attributes:
sync_config (SyncConfig)
full_state (bool)
since_token (StreamToken)
now_token (StreamToken)
joined_room_ids (list[str])
sync_config
full_state: The full_state flag as specified by user
since_token: The token supplied by user, or None.
now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to
# The following mirror the fields in a sync response
presence (list)
......@@ -1995,61 +2034,45 @@ class SyncResultBuilder(object):
joined (list[JoinedSyncResult])
invited (list[InvitedSyncResult])
archived (list[ArchivedSyncResult])
device (list)
groups (GroupsSyncResult|None)
to_device (list)
"""
def __init__(
self, sync_config, full_state, since_token, now_token, joined_room_ids
):
"""
Args:
sync_config (SyncConfig)
full_state (bool): The full_state flag as specified by user
since_token (StreamToken): The token supplied by user, or None.
now_token (StreamToken): The token to sync up to.
joined_room_ids (list[str]): List of rooms the user is joined to
"""
self.sync_config = sync_config
self.full_state = full_state
self.since_token = since_token
self.now_token = now_token
self.joined_room_ids = joined_room_ids
self.presence = []
self.account_data = []
self.joined = []
self.invited = []
self.archived = []
self.device = []
self.groups = None
self.to_device = []
sync_config = attr.ib(type=SyncConfig)
full_state = attr.ib(type=bool)
since_token = attr.ib(type=Optional[StreamToken])
now_token = attr.ib(type=StreamToken)
joined_room_ids = attr.ib(type=FrozenSet[str])
presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
@attr.s
class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
Attributes:
room_id
rtype: One of `"joined"` or `"archived"`
events: List of events to include in the room (more events may be added
when generating result).
newly_joined: If the user has newly joined the room
full_state: Whether the full state should be sent in result
since_token: Earliest point to return events from, or None
upto_token: Latest point to return events from.
"""
def __init__(
self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
):
"""
Args:
room_id(str)
rtype(str): One of `"joined"` or `"archived"`
events(list[FrozenEvent]): List of events to include in the room
(more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None
upto_token(StreamToken): Latest point to return events from.
"""
self.room_id = room_id
self.rtype = rtype
self.events = events
self.newly_joined = newly_joined
self.full_state = full_state
self.since_token = since_token
self.upto_token = upto_token
room_id = attr.ib(type=str)
rtype = attr.ib(type=str)
events = attr.ib(type=Optional[List[EventBase]])
newly_joined = attr.ib(type=bool)
full_state = attr.ib(type=bool)
since_token = attr.ib(type=Optional[StreamToken])
upto_token = attr.ib(type=StreamToken)
......@@ -238,8 +238,11 @@ class RedactionTestCase(unittest.HomeserverTestCase):
@defer.inlineCallbacks
def build(self, prev_event_ids):
built_event = yield self._base_builder.build(prev_event_ids)
built_event.event_id = self._event_id
built_event._event_id = self._event_id
built_event._event_dict["event_id"] = self._event_id
assert built_event.event_id == self._event_id
return built_event
@property
......
......@@ -180,6 +180,7 @@ commands = mypy \
synapse/api \
synapse/config/ \
synapse/federation/transport \
synapse/handlers/sync.py \
synapse/handlers/ui_auth \
synapse/logging/ \
synapse/module_api \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment