Skip to content
Snippets Groups Projects
Commit 45e22c43 authored by Tulir Asokan's avatar Tulir Asokan :cat2:
Browse files

Re-add connection state tracking

parent 0435298c
No related branches found
No related tags found
No related merge requests found
Pipeline #2876 passed
......@@ -33,7 +33,7 @@ class LoginComplete(Exception):
class Client(RPCClient):
async def start(self, session_data: Any = None) -> StartStatus:
await self.connect()
await self.send_start(session_data)
return await self.send_start(session_data)
async def send_start(self, session_data: Any = None) -> StartStatus:
return StartStatus.deserialize(await self.request("start", session_data=session_data))
......
......@@ -33,6 +33,7 @@ class RPCClient:
log: TraceLogger = logging.getLogger("mau.rpc")
user_id: UserID
rpc_connected: bool
_reader: Optional[asyncio.StreamReader]
_writer: Optional[asyncio.StreamWriter]
_req_id: int
......@@ -45,6 +46,7 @@ class RPCClient:
self.log = self.log.getChild(user_id)
self.loop = asyncio.get_running_loop()
self.user_id = user_id
self.rpc_connected = False
self._req_id = 0
self._min_broadcast_id = 0
self._event_handlers = {}
......@@ -94,16 +96,18 @@ class RPCClient:
read_loop = asyncio.create_task(self._try_read_loop())
await self.request("register", user_id=self.user_id)
self.rpc_connected = True
if initial_connect:
self.log.debug("RPC connected")
initial_connect.set_result(None)
initial_connect = None
else:
self.log.debug("RPC reconnected")
await self._run_internal_handler("reconnect")
await self._run_internal_handler("rpc_reconnect")
await read_loop
self.rpc_connected = False
self.log.debug("RPC disconnected")
await self._run_internal_handler("disconnect")
await self._run_internal_handler("rpc_disconnect")
async def disconnect(self) -> None:
if self._writer is not None:
......@@ -112,6 +116,7 @@ class RPCClient:
if self._connect_task:
self._connect_task.cancel()
self._connect_task = None
self.rpc_connected = False
self._writer = None
self._reader = None
......
# mautrix-amp - A very hacky Matrix-SMS bridge based on using Android Messages for Web in Puppeteer
# Copyright (C) 2020 Tulir Asokan
# mautrix-amp - A hacky Matrix-SMS bridge using the JS from Android Messages for Web
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
......@@ -69,6 +69,4 @@ class Message(SerializableAttrs['Message']):
@dataclass
class StartStatus(SerializableAttrs['StartStatus']):
started: bool
# is_logged_in: bool
# is_connected: bool
# is_permanently_disconnected: bool
is_connected: bool
......@@ -68,7 +68,7 @@ class User(DBUser, BaseUser):
async def is_logged_in(self) -> bool:
try:
return bool(self.client) and bool(self.session_data)
return bool(self.client) and self.client.rpc_connected and bool(self.session_data)
except Exception:
return False
......@@ -88,39 +88,30 @@ class User(DBUser, BaseUser):
self.loop.create_task(self.connect_double_puppet())
self.client = Client(self.mxid)
self.log.debug("Starting client")
await self.client.start(session_data=self.session_data)
state = await self.client.start(session_data=self.session_data)
self.client.on_message(self.handle_message)
self.client.on_chat_update(self.handle_chat_update)
self.client.add_event_handler("session_data", self.handle_session_data)
self.client.add_event_handler("disconnect", self.handle_disconnect)
self.client.add_event_handler("reconnect", self.handle_reconnect)
# if state.is_connected:
# self._track_metric(METRIC_CONNECTED, True)
# if state.is_logged_in:
# self.loop.create_task(self._try_sync())
self.client.add_event_handler("rpc_disconnect", self.handle_disconnect)
self.client.add_event_handler("rpc_reconnect", self.handle_reconnect)
self.client.add_event_handler("connection", self.handle_connection)
if state.is_connected:
self._track_metric(METRIC_CONNECTED, True)
async def handle_disconnect(self, _) -> None:
pass
self._track_metric(METRIC_CONNECTED, False)
async def handle_reconnect(self, _) -> None:
self.log.debug("Re-sending session data to puppet script after reconnect")
await self.client.send_start(session_data=self.session_data)
# async def _try_sync(self) -> None:
# try:
# await self.sync()
# except Exception:
# self.log.exception("Exception while syncing")
#
# async def _check_connection_loop(self) -> None:
# while True:
# self._track_metric(METRIC_CONNECTED, await self.client.is_connected())
# await asyncio.sleep(5)
state = await self.client.send_start(session_data=self.session_data)
if state.is_connected:
self._track_metric(METRIC_CONNECTED, True)
async def handle_connection(self, evt: Dict[str, Any]) -> None:
self.log.trace("Connection state: %s", evt)
self._track_metric(METRIC_CONNECTED, evt["data"]["connected"])
async def stop(self) -> None:
# if self._connection_check_task:
# self._connection_check_task.cancel()
# self._connection_check_task = None
if self.client:
await self.client.stop()
......
......@@ -97,6 +97,7 @@ class ProvisioningAPI:
"amp": {
"has_session": user.session_data is not None,
"connected": await user.is_logged_in(),
"poll_connected": await user.client.is_connected(),
},
}
return web.json_response(data, headers=self._acao_headers)
......@@ -105,8 +106,8 @@ class ProvisioningAPI:
user = await self.check_token(request)
status = await user.client.start()
# if status.is_logged_in:
# raise web.HTTPConflict(text='{"error": "Already logged in"}', headers=self._headers)
if status.is_connected:
raise web.HTTPConflict(text='{"error": "Already logged in"}', headers=self._headers)
ws = web.WebSocketResponse(protocols=["net.maunium.amp.login"])
await ws.prepare(request)
......
Subproject commit 082f349c400c03b524b76866ed485ef6afcce9ca
Subproject commit a62f7f98c9d41aa6348659dc72eb616c60b819ca
......@@ -182,24 +182,54 @@ export default class Client {
})
}
sendMessageUpdate = async (data) => {
this.log("Got message update:", data)
trackMessageUpdate = async (data) => {
this.log("Got message update", data.length)
}
trackReceiveMessage = async (data) => {
this.log("Got receive message", data.length)
}
sendError = async msg => {
this.error("api:", msg)
await this._write({
id: --this.notificationID,
command: "connection",
data: { connected: false },
})
}
trackDebug = async msg => {
this.log("api:", msg)
if (msg === "Connection complete") {
await this._write({
id: --this.notificationID,
command: "connection",
data: { connected: true },
})
}
}
addHandlers() {
this.client.on("qrcode", this.sendQRCode)
this.client.on("sessiondata", this.sendSessionData)
// this.client.on("messageupdate", this.sendMessageUpdate)
this.client.on("messageupdate", this.trackMessageUpdate)
this.client.on("receivemessage", this.trackReceiveMessage)
this.client.on("messagelist", this.sendMessageList)
this.client.on("convlist", this.sendConversationList)
this.client.on("error", this.sendError)
this.client.on("debug", this.trackDebug)
}
removeHandlers() {
this.client.off("qrcode", this.sendQRCode)
this.client.off("sessiondata", this.sendSessionData)
// this.client.off("messageupdate", this.sendMessageUpdate)
this.client.off("messageupdate", this.trackMessageUpdate)
this.client.off("receivemessage", this.trackReceiveMessage)
this.client.off("messagelist", this.sendMessageList)
this.client.off("convlist", this.sendConversationList)
this.client.off("error", this.sendError)
this.client.off("debug", this.trackDebug)
}
handleStart = async (req) => {
......@@ -208,22 +238,18 @@ export default class Client {
this.log("Opening new MessagesClient for", this.userID)
this.client = new MessagesClient()
this.addHandlers()
this.client.on("chunk", async data => this.log("Got chunk:", data))
this.client.on("invalidtoken", async sd => this.error("Got invalid token error:", sd))
this.client.on("closed", async m => this.error("API is closing:", m))
this.client.on("debug", async msg => this.log("api:", msg))
this.client.on("error", async msg => this.error("api:", msg))
this.manager.msgClients.set(this.userID, this.client)
started = true
}
if (req.session_data) {
this.log("Setting up client with", req.session_data)
await this.client.Setup(req.session_data)
}
return {
started,
// is_logged_in: await this.puppet.isLoggedIn(),
// is_connected: !await this.puppet.isDisconnected(),
// is_permanently_disconnected: await this.puppet.isPermanentlyDisconnected(),
is_connected: this.client.connected,
}
}
......@@ -330,12 +356,8 @@ export default class Client {
tempid: req.temp_id,
text: req.text,
}),
// set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids),
// get_chats: () => this.puppet.getRecentChats(),
// get_chat: req => this.puppet.getChatInfo(req.chat_id),
get_messages: req => this.client.GetMessages(req.chat_id.toString()),
//is_connected: async () => ({ is_connected: !await this.puppet.isDisconnected() }),
is_connected: async () => ({ is_connected: true }),
is_connected: async () => ({ is_connected: this.client.connected }),
}[req.command] || this.handleUnknownCommand
}
const resp = { id: req.id }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment