Skip to content
Snippets Groups Projects
Commit 77c0dc43 authored by Tulir Asokan's avatar Tulir Asokan :cat2:
Browse files

Add experimental handling for message sync queue errors

parent 251eb81c
No related branches found
No related tags found
No related merge requests found
......@@ -343,16 +343,6 @@ class User(DBUser, BaseUser):
self.log.exception("Failed to automatically enable custom puppet")
await self._create_community()
# self.log.debug("Updating own puppet info")
# try:
# own_info = await self.client.fetch_thread_info([self.fbid]).__anext__()
# except Exception:
# self.log.warning("Error fetching own info, retrying...", exc_info=True)
# own_info = await self.client.fetch_thread_info([self.fbid]).__anext__()
# puppet = pu.Puppet.get_by_fbid(self.fbid, create=True)
# await puppet.update_info(source=self, info=cast(fbchat.UserData, own_info))
# FIXME
# await self.sync_contacts()
await self.sync_threads()
self.start_listen()
......@@ -397,20 +387,6 @@ class User(DBUser, BaseUser):
# This uses upsert instead of insert as a hacky fix for potential conflicts
await UserContact(user=self.fbid, contact=puppet.fbid, in_community=ic).upsert()
# async def sync_contacts(self):
# try:
# self.log.debug("Fetching contacts...")
# users = await self.client.fetch_users()
# self.log.debug(f"Fetched {len(users)} contacts")
# contacts = UserContact.all(self.fbid)
# update_avatars = self.config["bridge.update_avatar_initial_sync"]
# for user in users:
# puppet = pu.Puppet.get_by_fbid(user.id, create=True)
# await puppet.update_info(self, user, update_avatar=update_avatars)
# await self._add_community_puppet(contacts.get(puppet.fbid, None), puppet)
# except Exception:
# self.log.exception("Failed to sync contacts")
async def get_direct_chats(self) -> Dict[UserID, List[RoomID]]:
return {
pu.Puppet.get_mxid_from_id(portal.fbid): [portal.mxid]
......@@ -437,6 +413,8 @@ class User(DBUser, BaseUser):
# TODO paginate with 20 threads per request
resp = await self.client.fetch_thread_list(thread_count=sync_count)
self.seq_id = int(resp.sync_sequence_id)
if self.mqtt:
self.mqtt.seq_id = self.seq_id
if sync_count <= 0:
return
for thread in resp.nodes:
......@@ -581,10 +559,7 @@ class User(DBUser, BaseUser):
self._track_metric(METRIC_CONNECTED, True)
if not first_connect and disconnected_at + max_delay < now:
duration = int(now - disconnected_at)
self.log.debug("Disconnection lasted %d seconds, re-syncing threads...", duration)
await self.send_bridge_notice("Connected to Facebook Messenger after being "
f"disconnected for {duration} seconds, syncing chats...")
await self.sync_threads()
self.log.debug("Disconnection lasted %d seconds, not re-syncing threads...", duration)
elif self.temp_disconnect_notices:
await self.send_bridge_notice("Connected to Facebook Messenger")
......@@ -605,8 +580,6 @@ class User(DBUser, BaseUser):
if self.listen_task:
self.listen_task.cancel()
self.mqtt = None
# if self.client:
# self.client.sequence_id_callback = None
self.listen_task = None
async def on_logged_in(self, state: AndroidState) -> None:
......@@ -736,8 +709,10 @@ class User(DBUser, BaseUser):
# await portal.handle_facebook_admin(self, sender, user, make_admin)
async def on_message_sync_error(self, evt: mqtt_t.MessageSyncError) -> None:
# TODO handle error?
self.log.error(f"Message sync error: {evt.value}")
await self.send_bridge_notice(f"Message sync error: {evt.value}")
self.log.error(f"Message sync error: {evt.value}, resyncing...")
await self.send_bridge_notice(f"Message sync error: {evt.value}, resyncing...")
self.stop_listen()
await self.sync_threads()
self.start_listen()
# endregion
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment