Skip to content
Snippets Groups Projects
Commit e31e5dee authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

Add CPU metrics for _fetch_event_list

add a Measure block on _fetch_event_list, in the hope that we can better
measure CPU usage here.
parent 395fa8d1
No related branches found
No related tags found
No related merge requests found
...@@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore): ...@@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore):
"""Takes a database connection and waits for requests for events from """Takes a database connection and waits for requests for events from
the _event_fetch_list queue. the _event_fetch_list queue.
""" """
event_list = []
i = 0 i = 0
while True: while True:
try: with self._event_fetch_lock:
with self._event_fetch_lock: event_list = self._event_fetch_list
event_list = self._event_fetch_list self._event_fetch_list = []
self._event_fetch_list = []
if not event_list:
if not event_list: single_threaded = self.database_engine.single_threaded
single_threaded = self.database_engine.single_threaded if single_threaded or i > EVENT_QUEUE_ITERATIONS:
if single_threaded or i > EVENT_QUEUE_ITERATIONS: self._event_fetch_ongoing -= 1
self._event_fetch_ongoing -= 1 return
return else:
else: self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) i += 1
i += 1 continue
continue i = 0
i = 0
self._fetch_event_list(conn, event_list)
def _fetch_event_list(self, conn, event_list):
"""Handle a load of requests from the _event_fetch_list queue
Args:
conn (twisted.enterprise.adbapi.Connection): database connection
event_list (list[Tuple[list[str], Deferred]]):
The fetch requests. Each entry consists of a list of event
ids to be fetched, and a deferred to be completed once the
events have been fetched.
"""
with Measure(self._clock, "_fetch_event_list"):
try:
event_id_lists = zip(*event_list)[0] event_id_lists = zip(*event_list)[0]
event_ids = [ event_ids = [
item for sublist in event_id_lists for item in sublist item for sublist in event_id_lists for item in sublist
...@@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore): ...@@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext(): with PreserveLoggingContext():
d.errback(e) d.errback(e)
if event_list: with PreserveLoggingContext():
with PreserveLoggingContext(): self.hs.get_reactor().callFromThread(fire, event_list)
self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks @defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment