Skip to content
Snippets Groups Projects
Commit 50f379a2 authored by Tulir Asokan's avatar Tulir Asokan :cat2:
Browse files

Add option to refresh if a normal reconnection fails

parent 21ed0d42
No related branches found
No related tags found
No related merge requests found
......@@ -113,7 +113,7 @@ class MessengerBridge(Bridge):
elif mode == "reconnect":
user.listener.disconnect()
await user.listen_task
user.listen_task = user.loop.create_task(user.try_listen())
user.start_listen()
except asyncio.CancelledError:
log.debug("Periodic reconnect loop stopped")
return
......
......@@ -43,7 +43,7 @@ async def connect(evt: CommandEvent) -> None:
if evt.sender.listen_task and not evt.sender.listen_task.done():
await evt.reply("You already have a Messenger MQTT connection")
return
evt.sender.listen_task = evt.loop.create_task(evt.sender.try_listen())
evt.sender.start_listen()
@command_handler(needs_auth=True, management_only=True, help_section=SECTION_CONNECTION,
......
......@@ -74,6 +74,7 @@ class Config(BaseBridgeConfig):
copy("bridge.periodic_reconnect.always")
copy("bridge.resync_max_disconnected_time")
copy("bridge.temporary_disconnect_notices")
copy("bridge.refresh_on_reconnection_fail")
copy_dict("bridge.permissions")
......
......@@ -156,6 +156,9 @@ bridge:
# If this is false, disconnections will never send messages and connections will only send
# messages if it was disconnected for more than resync_max_disconnected_time seconds.
temporary_disconnect_notices: true
# Whether or not the bridge should try to "refresh" the connection if a normal reconnection
# attempt fails.
refresh_on_reconnection_fail: false
# Permissions for using the bridge.
# Permitted values:
......
......@@ -55,6 +55,7 @@ class User(BaseUser):
_is_connected: Optional[bool]
_connection_time: float
_prev_thread_sync: float
_prev_reconnect_fail_refresh: float
_session_data: Optional[Dict[str, str]]
_db_instance: Optional[DBUser]
_sync_lock: SimpleLock
......@@ -78,6 +79,7 @@ class User(BaseUser):
self._is_connected = None
self._connection_time = time.monotonic()
self._prev_thread_sync = -10
self._prev_reconnect_fail_refresh = time.monotonic()
self._session_data = session
self._db_instance = db_instance
self._community_id = None
......@@ -203,7 +205,7 @@ class User(BaseUser):
self.is_connected = None
if self.listen_task:
self.listen_task.cancel()
self.listen_task = self.loop.create_task(self.try_listen())
self.start_listen()
asyncio.ensure_future(self.post_login(), loop=self.loop)
return True
return False
......@@ -217,13 +219,22 @@ class User(BaseUser):
# endregion
async def try_refresh(self) -> None:
try:
await self.refresh()
except Exception:
self.log.exception("Fatal error while trying to refresh after connection error")
await self.send_bridge_notice("Fatal error while trying to refresh after connection "
"error (see logs for more info)", important=True)
async def refresh(self) -> None:
event_id = None
if self.listener:
event_id = await self.send_bridge_notice("Disconnecting Messenger MQTT connection "
"for session refresh...")
self.listener.disconnect()
await self.listen_task
if self.listen_task:
await self.listen_task
event_id = await self.send_bridge_notice("Refreshing session...", edit=event_id)
try:
ok = await self.load_session(_override=True, _raise_errors=True)
......@@ -357,7 +368,7 @@ class User(BaseUser):
self.log.exception("Failed to sync threads")
async def _sync_thread(self, thread: Union[fbchat.UserData, fbchat.PageData, fbchat.GroupData],
ups: Dict[str, 'DBUserPortal'], contacts: Dict[str, 'DBContact']
ups: Dict[str, DBUserPortal], contacts: Dict[str, DBContact]
) -> None:
self.log.debug(f"Syncing thread {thread.id} {thread.name}")
fb_receiver = self.fbid if isinstance(thread, fbchat.User) else None
......@@ -420,27 +431,41 @@ class User(BaseUser):
# region Facebook event handling
async def try_listen(self) -> None:
def start_listen(self) -> None:
self.listen_task = self.loop.create_task(self._try_listen())
def _disconnect_listener_after_error(self) -> None:
try:
await self.listen()
except Exception as e:
self.is_connected = False
if isinstance(e, fbchat.NotLoggedIn):
message = f"Disconnected from Facebook Messenger: {e}"
self.log.warning(message)
elif isinstance(e, fbchat.NotConnected):
message = f"Failed to connect to Facebook Messenger: {e}"
self.log.warning(message)
self.listener.disconnect()
except Exception:
self.log.debug("Error disconnecting listener after error", exc_info=True)
async def _try_listen(self) -> None:
try:
await self._listen()
return
except (fbchat.NotLoggedIn, fbchat.NotConnected) as e:
refresh = (config["bridge.refresh_on_reconnection_fail"]
and self._prev_reconnect_fail_refresh + 120 < time.monotonic())
next_action = ("Refreshing session..." if refresh else "Not retrying!")
event = ("Disconnected from" if isinstance(e, fbchat.NotLoggedIn)
else "Failed to connect to")
message = f"{event} Facebook Messenger: {e}. {next_action}"
self.log.warning(message)
await self.send_bridge_notice(message, important=not refresh)
if refresh:
self._prev_reconnect_fail_refresh = time.monotonic()
self.loop.create_task(self.try_refresh())
else:
message = "Fatal error in listener (see logs for more info)"
self.log.exception("Fatal error in listener")
await self.send_bridge_notice(message, important=True)
try:
self.listener.disconnect()
except Exception:
self.log.debug("Error disconnecting listener after error", exc_info=True)
self._disconnect_listener_after_error()
except Exception:
self.is_connected = False
self.log.exception("Fatal error in listener")
await self.send_bridge_notice("Fatal error in listener (see logs for more info)",
important=True)
self._disconnect_listener_after_error()
async def listen(self) -> None:
async def _listen(self) -> None:
if not self.listener:
self.listener = fbchat.Listener(session=self.session, chat_on=True, foreground=False)
......@@ -503,7 +528,7 @@ class User(BaseUser):
self.save()
if self.listen_task:
self.listen_task.cancel()
self.listen_task = self.loop.create_task(self.try_listen())
self.start_listen()
asyncio.ensure_future(self.post_login(), loop=self.loop)
async def on_message(self, evt: Union[fbchat.MessageEvent, fbchat.MessageReplyEvent]) -> None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment