Newer
Older
# mautrix-facebook - A Matrix-Facebook Messenger puppeting bridge
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import (Any, Dict, Iterator, Optional, Iterable, Type, Callable, Awaitable,
Union, TYPE_CHECKING)
from mautrix.types import UserID, PresenceState

Tulir Asokan
committed
from mautrix.client import Client as MxClient
from mautrix.bridge._community import CommunityHelper, CommunityID
from .config import Config
from .commands import enter_2fa_code
from .db import User as DBUser, UserPortal as DBUserPortal, Contact as DBContact, ThreadType
from . import portal as po, puppet as pu
if TYPE_CHECKING:
from .context import Context
config: Config
log: logging.Logger = logging.getLogger("mau.user")
by_fbid: Dict[str, 'User'] = {}
session: Optional[fbchat.Session]
client: Optional[fbchat.Client]
listener: Optional[fbchat.Listener]
listen_task: Optional[asyncio.Task]
command_status: Optional[Dict[str, Any]]
is_whitelisted: bool
is_admin: bool
permission_level: str
_session_data: Optional[Dict[str, str]]
_db_instance: Optional[DBUser]

Tulir Asokan
committed
_community_helper: CommunityHelper
_community_id: Optional[CommunityID]
def __init__(self, mxid: UserID, session: Optional[Dict[str, str]] = None,
db_instance: Optional[DBUser] = None) -> None:
self.mxid = mxid
self.by_mxid[mxid] = self
self.command_status = None
self.is_whitelisted, self.is_admin, self.permission_level = config.get_permissions(mxid)
self._session_data = session
self._db_instance = db_instance

Tulir Asokan
committed
self._community_id = None
self.client = None
self.session = None
self.listener = None
self.listen_task = None
def fbid(self) -> Optional[str]:
if not self.session:
return None
return self.session.user.id
@property
def db_instance(self) -> DBUser:
if not self._db_instance:
self._db_instance = DBUser(mxid=self.mxid, session=self._session_data, fbid=self.fbid)
def save(self, _update_session_data: bool = True) -> None:
self.log.debug("Saving session")
if _update_session_data and self.session:
self._session_data = self.session.get_cookies()
self.db_instance.edit(session=self._session_data, fbid=self.fbid)
@classmethod
def from_db(cls, db_user: DBUser) -> 'User':
return User(mxid=db_user.mxid, session=db_user.session, db_instance=db_user)
@classmethod
def get_all(cls) -> Iterator['User']:
for db_user in DBUser.all():
yield cls.from_db(db_user)
def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['User']:
if pu.Puppet.get_id_from_mxid(mxid) is not None or mxid == cls.az.bot_mxid:
return None
pass
db_user = DBUser.get_by_mxid(mxid)
if db_user:
return cls.from_db(db_user)
if create:
user = cls(mxid)
user.db_instance.insert()
return user
return None
@classmethod
def get_by_fbid(cls, fbid: str) -> Optional['User']:
try:
return cls.by_fbid[fbid]
except KeyError:
pass
db_user = DBUser.get_by_fbid(fbid)
if db_user:
return cls.from_db(db_user)
return None
async def load_session(self) -> bool:
if self._is_logged_in:
return True
elif not self._session_data:
return False
try:
session = await fbchat.Session.from_cookies(self._session_data)
self.log.exception("Failed to restore session")
return False
self.log.info("Loaded session successfully")
self.session = session
self.client = fbchat.Client(session=self.session)
if self.listen_task:
self.listen_task.cancel()
self.listen_task = self.loop.create_task(self.try_listen())
asyncio.ensure_future(self.post_login(), loop=self.loop)
async def is_logged_in(self, _override: bool = False) -> bool:
if self._is_logged_in is None or _override:
if self.session:
try:
await self.session.logout()
except fbchat.FacebookError:
self.log.exception("Error while logging out")
ok = False
self._is_logged_in = False
self.save(_update_session_data=False)
return ok
self.log.info("Running post-login actions")
self.by_fbid[self.fbid] = self
try:
puppet = pu.Puppet.get_by_fbid(self.fbid)
if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
self.log.info(f"Automatically enabling custom puppet")
await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
except Exception:
self.log.exception("Failed to automatically enable custom puppet")

Tulir Asokan
committed
await self._create_community()
await self.sync_contacts()
self.log.debug("Updating own puppet info")
# TODO this might not be right (if it is, check that we got something sensible?)
own_info = await self.client.fetch_thread_info([self.fbid]).__anext__()
puppet = pu.Puppet.get_by_fbid(self.fbid, create=True)
await puppet.update_info(source=self, info=own_info)

Tulir Asokan
committed
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
async def _create_community(self) -> None:
template = config["bridge.community_template"]
if not template:
return
localpart, server = MxClient.parse_user_id(self.mxid)
community_localpart = template.format(localpart=localpart, server=server)
self.log.debug(f"Creating personal filtering community {community_localpart}...")
self._community_id, created = await self._community_helper.create(community_localpart)
if created:
await self._community_helper.update(self._community_id, name="Facebook Messenger",
avatar_url=config["appservice.bot_avatar"],
short_desc="Your Facebook bridged chats")
await self._community_helper.invite(self._community_id, self.mxid)
async def _add_community(self, up: Optional[DBUserPortal], contact: Optional[DBContact],
portal: 'po.Portal', puppet: Optional['pu.Puppet']) -> None:
if portal.mxid:
if not up or not up.in_community:
ic = await self._community_helper.add_room(self._community_id, portal.mxid)
if up and ic:
up.edit(in_community=True)
elif not up:
DBUserPortal(user=self.fbid, in_community=ic, portal=portal.fbid,
portal_receiver=portal.fb_receiver).insert()
if puppet:
await self._add_community_puppet(contact, puppet)
async def _add_community_puppet(self, contact: Optional[DBContact],
puppet: 'pu.Puppet') -> None:
if not contact or not contact.in_community:
await puppet.default_mxid_intent.ensure_registered()
ic = await self._community_helper.join(self._community_id,
puppet.default_mxid_intent)
if contact and ic:
contact.edit(in_community=True)
elif not contact:
DBContact(user=self.fbid, contact=puppet.fbid, in_community=ic).insert()
async def sync_contacts(self):
try:
self.log.debug("Fetching contacts...")

Tulir Asokan
committed
self.log.debug(f"Fetched {len(users)} contacts")
contacts = DBContact.all(self.fbid)
update_avatars = config["bridge.update_avatar_initial_sync"]

Tulir Asokan
committed
for user in users:
await puppet.update_info(self, user, update_avatar=update_avatars)

Tulir Asokan
committed
await self._add_community_puppet(contacts.get(puppet.fbid, None), puppet)
except Exception:
self.log.exception("Failed to sync contacts")
sync_count = min(20, config["bridge.initial_chat_sync"])
if sync_count <= 0:
return

Tulir Asokan
committed
ups = DBUserPortal.all(self.fbid)
contacts = DBContact.all(self.fbid)
async for thread in self.client.fetch_threads(limit=sync_count):
if not isinstance(thread, (fbchat.UserData, fbchat.PageData, fbchat.GroupData)):
# TODO log?
continue
self.log.debug(f"Syncing thread {thread.id} {thread.name}")
fb_receiver = self.fbid if isinstance(thread, fbchat.User) else None
portal = po.Portal.get_by_thread(thread, fb_receiver)

Tulir Asokan
committed
puppet = None
if isinstance(thread, fbchat.UserData):
puppet = pu.Puppet.get_by_fbid(thread.id, create=True)

Tulir Asokan
committed
await self._add_community(ups.get(portal.fbid, None),
contacts.get(puppet.fbid, None) if puppet else None,
portal, puppet)
await portal.create_matrix_room(self, thread)
except Exception:
self.log.exception("Failed to sync threads")
if self.command_status and self.command_status.get("action", "") == "Login":
future = self.loop.create_future()
self.command_status["future"] = future
self.command_status["next"] = enter_2fa_code
await self.az.intent.send_notice(self.command_status["room_id"],
"You have two-factor authentication enabled. "
"Please send the code here.")
return await future
self.log.warning("Unexpected on2FACode call")
# region Facebook event handling
async def try_listen(self) -> None:
try:
await self.listen()
except Exception:
self.log.exception("Fatal error in listener")
async def listen(self) -> None:
self.listener = fbchat.Listener(session=self.session, chat_on=True, foreground=False)
handlers: Dict[Type[fbchat.Event], Callable[[Any], Awaitable[None]]] = {
fbchat.UnsendEvent: self.on_message_unsent,
fbchat.ThreadsRead: self.on_message_seen,
fbchat.ReactionEvent: self.on_reaction,
fbchat.Presence: self.on_presence,
fbchat.Typing: self.on_typing,
fbchat.PeopleAdded: self.on_members_added,
fbchat.PersonRemoved: self.on_member_removed,
}
self.log.debug("Starting fbchat listener")
async for event in self.listener.listen():
self.log.debug("Handling facebook event %s", event)
try:
handler = handlers[type(event)]
except KeyError:
self.log.debug(f"Received unknown event type {type(event)}")
else:
try:
await handler(event)
except Exception:
self.log.exception("Failed to handle facebook event")
def stop_listening(self) -> None:
if self.listener:
self.listener.disconnect()
if self.listen_task:
self.listen_task.cancel()
async def on_logged_in(self, session: fbchat.Session) -> None:
self.session = session
self.client = fbchat.Client(session=session)

Tulir Asokan
committed
self.save()
if self.listen_task:
self.listen_task.cancel()
self.listen_task = self.loop.create_task(self.try_listen())

Tulir Asokan
committed
asyncio.ensure_future(self.post_login(), loop=self.loop)
async def on_message(self, evt: Union[fbchat.MessageEvent, fbchat.MessageReplyEvent]) -> None:
fb_receiver = self.fbid if isinstance(evt.thread, fbchat.User) else None
portal = po.Portal.get_by_thread(evt.thread, fb_receiver)
puppet = pu.Puppet.get_by_fbid(evt.author.id)
if not puppet.name:
await puppet.update_info(self)
await portal.handle_facebook_message(self, puppet, evt.message)
async def on_title_change(self, evt: fbchat.TitleSet) -> None:
assert isinstance(evt.thread, fbchat.Group)
portal = po.Portal.get_by_thread(evt.thread)
if not portal:
return
if not sender:
return
await portal.handle_facebook_name(self, sender, evt.title, str(evt.at.timestamp()))
async def on_image_change(self, mid: str = None, author_id: str = None, new_image: str = None,
thread_id: str = None, thread_type: ThreadType = ThreadType.GROUP,
at: int = None, msg: Any = None) -> None:
# FIXME this method isn't called
# It seems to be a maunually fetched event in fbchat.UnfetchedThreadEvent
# But the Message.fetch() doesn't return the necessary info
fb_receiver = self.fbid if thread_type == ThreadType.USER else None
portal = po.Portal.get_by_fbid(thread_id, fb_receiver)
if not portal:
return
sender = pu.Puppet.get_by_fbid(author_id)
if not sender:
return
await portal.handle_facebook_photo(self, sender, new_image, mid)
async def on_message_seen(self, evt: fbchat.ThreadsRead) -> None:
puppet = pu.Puppet.get_by_fbid(evt.author.id)
for thread in evt.threads:
fb_receiver = self.fbid if isinstance(thread, fbchat.User) else None
portal = po.Portal.get_by_thread(thread, fb_receiver)
if portal.mxid:
await portal.handle_facebook_seen(self, puppet)
async def on_message_unsent(self, evt: fbchat.UnsendEvent) -> None:
fb_receiver = self.fbid if isinstance(evt.thread, fbchat.User) else None
portal = po.Portal.get_by_thread(evt.thread, fb_receiver)
if portal.mxid:
puppet = pu.Puppet.get_by_fbid(evt.author.id)
await portal.handle_facebook_unsend(self, puppet, evt.message.id)
async def on_reaction(self, evt: fbchat.ReactionEvent) -> None:
fb_receiver = self.fbid if isinstance(evt.thread, fbchat.User) else None
portal = po.Portal.get_by_thread(evt.thread, fb_receiver)
if not portal.mxid:
return
puppet = pu.Puppet.get_by_fbid(evt.author.id)
if evt.reaction is None:
await portal.handle_facebook_reaction_remove(self, puppet, evt.message.id)
await portal.handle_facebook_reaction_add(self, puppet, evt.message.id, evt.reaction)
async def on_presence(self, evt: fbchat.Presence) -> None:
for user, status in evt.statuses.items():
puppet = pu.Puppet.get_by_fbid(user, create=False)
if puppet:
await puppet.default_mxid_intent.set_presence(
presence=PresenceState.ONLINE if status.active else PresenceState.OFFLINE,
ignore_cache=True)
async def on_typing(self, evt: fbchat.Typing) -> None:
fb_receiver = self.fbid if isinstance(evt.thread, fbchat.User) else None
portal = po.Portal.get_by_thread(evt.thread, fb_receiver)
if portal.mxid:
puppet = pu.Puppet.get_by_fbid(evt.author.id)
await puppet.intent.set_typing(portal.mxid, is_typing=evt.status, timeout=120000)
async def on_members_added(self, evt: fbchat.PeopleAdded) -> None:
assert isinstance(evt.thread, fbchat.Group)
portal = po.Portal.get_by_thread(evt.thread)
if portal.mxid:
sender = pu.Puppet.get_by_fbid(evt.author.id)
users = [pu.Puppet.get_by_fbid(user.id) for user in evt.added]
await portal.handle_facebook_join(self, sender, users)
async def on_member_removed(self, evt: fbchat.PersonRemoved) -> None:
assert isinstance(evt.thread, fbchat.Group)
portal = po.Portal.get_by_thread(evt.thread)
if portal.mxid:
sender = pu.Puppet.get_by_fbid(evt.author.id)
user = pu.Puppet.get_by_fbid(evt.removed.id)
await portal.handle_facebook_leave(self, sender, user)
def init(context: 'Context') -> Iterable[Awaitable[bool]]:
global config
User.az, config, User.loop = context.core

Tulir Asokan
committed
User._community_helper = CommunityHelper(User.az)
return (user.load_session() for user in User.get_all())