Skip to content
Snippets Groups Projects
Unverified Commit be3c7b08 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Fix deleting device inbox when using background worker (#16311)

Introduced in #16240

The action for the task was only defined on the "master" handler, rather than the base worker one.
parent ab13fb08
Branches
Tags
No related merge requests found
Delete device messages asynchronously and in staged batches using the task scheduler.
...@@ -91,9 +91,14 @@ class DeviceWorkerHandler: ...@@ -91,9 +91,14 @@ class DeviceWorkerHandler:
self._query_appservices_for_keys = ( self._query_appservices_for_keys = (
hs.config.experimental.msc3984_appservice_key_query hs.config.experimental.msc3984_appservice_key_query
) )
self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListWorkerUpdater(hs) self.device_list_updater = DeviceListWorkerUpdater(hs)
self._task_scheduler.register_action(
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)
@trace @trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]: async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
""" """
...@@ -383,6 +388,32 @@ class DeviceWorkerHandler: ...@@ -383,6 +388,32 @@ class DeviceWorkerHandler:
"Trying handling device list state for partial join: not supported on workers." "Trying handling device list state for partial join: not supported on workers."
) )
DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
async def _delete_device_messages(
self,
task: ScheduledTask,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
assert task.params is not None
user_id = task.params["user_id"]
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None
class DeviceHandler(DeviceWorkerHandler): class DeviceHandler(DeviceWorkerHandler):
device_list_updater: "DeviceListUpdater" device_list_updater: "DeviceListUpdater"
...@@ -394,7 +425,6 @@ class DeviceHandler(DeviceWorkerHandler): ...@@ -394,7 +425,6 @@ class DeviceHandler(DeviceWorkerHandler):
self._account_data_handler = hs.get_account_data_handler() self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool self.db_pool = hs.get_datastores().main.db_pool
self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListUpdater(hs, self) self.device_list_updater = DeviceListUpdater(hs, self)
...@@ -428,10 +458,6 @@ class DeviceHandler(DeviceWorkerHandler): ...@@ -428,10 +458,6 @@ class DeviceHandler(DeviceWorkerHandler):
self._delete_stale_devices, self._delete_stale_devices,
) )
self._task_scheduler.register_action(
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)
def _check_device_name_length(self, name: Optional[str]) -> None: def _check_device_name_length(self, name: Optional[str]) -> None:
""" """
Checks whether a device name is longer than the maximum allowed length. Checks whether a device name is longer than the maximum allowed length.
...@@ -590,32 +616,6 @@ class DeviceHandler(DeviceWorkerHandler): ...@@ -590,32 +616,6 @@ class DeviceHandler(DeviceWorkerHandler):
await self.notify_device_update(user_id, device_ids) await self.notify_device_update(user_id, device_ids)
DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
async def _delete_device_messages(
self,
task: ScheduledTask,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
assert task.params is not None
user_id = task.params["user_id"]
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None
async def update_device(self, user_id: str, device_id: str, content: dict) -> None: async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
"""Update the given device """Update the given device
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment