Skip to content
Snippets Groups Projects
Unverified Commit 9eebd460 authored by Amber Brown's avatar Amber Brown Committed by GitHub
Browse files

Improve the performance of structured logging (#6322)

parent 78cfc05f
No related branches found
No related tags found
No related merge requests found
Improve the performance of outputting structured logging.
...@@ -261,6 +261,18 @@ def parse_drain_configs( ...@@ -261,6 +261,18 @@ def parse_drain_configs(
) )
class StoppableLogPublisher(LogPublisher):
"""
A log publisher that can tell its observers to shut down any external
communications.
"""
def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()
def setup_structured_logging( def setup_structured_logging(
hs, hs,
config, config,
...@@ -336,7 +348,7 @@ def setup_structured_logging( ...@@ -336,7 +348,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error. # We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,)) raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers) publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate() log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get( for namespace, namespace_config in log_config.get(
......
...@@ -17,25 +17,29 @@ ...@@ -17,25 +17,29 @@
Log formatters that output terse JSON. Log formatters that output terse JSON.
""" """
import json
import sys import sys
import traceback
from collections import deque from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor from math import floor
from typing import IO from typing import IO, Optional
import attr import attr
from simplejson import dumps
from zope.interface import implementer from zope.interface import implementer
from twisted.application.internet import ClientService from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import ( from twisted.internet.endpoints import (
HostnameEndpoint, HostnameEndpoint,
TCP4ClientEndpoint, TCP4ClientEndpoint,
TCP6ClientEndpoint, TCP6ClientEndpoint,
) )
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
def flatten_event(event: dict, metadata: dict, include_time: bool = False): def flatten_event(event: dict, metadata: dict, include_time: bool = False):
...@@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb ...@@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
def formatEvent(_event: dict) -> str: def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata) flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" return _encoder.encode(flattened) + "\n"
return FileLogObserver(outFile, formatEvent) return FileLogObserver(outFile, formatEvent)
@attr.s
@implementer(IPushProducer)
class LogProducer(object):
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
"""
transport = attr.ib(type=ITransport)
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
def pauseProducing(self):
self._paused = True
def stopProducing(self):
self._paused = True
self._buffer = None
def resumeProducing(self):
self._paused = False
while self._paused is False and (self._buffer and self.transport.connected):
try:
event = self._buffer.popleft()
self.transport.write(_encoder.encode(event).encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break
@attr.s @attr.s
@implementer(ILogObserver) @implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object): class TerseJSONToTCPLogObserver(object):
...@@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object): ...@@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
metadata = attr.ib(type=dict) metadata = attr.ib(type=dict)
maximum_buffer = attr.ib(type=int) maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque) _buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None) _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger)) _logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])
def start(self) -> None: def start(self) -> None:
...@@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object): ...@@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object):
factory = Factory.forProtocol(Protocol) factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService() self._service.startService()
self._connect()
def _write_loop(self) -> None: def stop(self):
self._service.stopService()
def _connect(self) -> None:
""" """
Implement the write loop. Triggers an attempt to connect then write to the remote if not already writing.
""" """
if self._writer: if self._connection_waiter:
return return
self._writer = self._service.whenConnected() self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
@self._writer.addBoth @self._connection_waiter.addCallback
def writer(r): def writer(r):
if isinstance(r, Failure): # We have a connection. If we already have a producer, and its
r.printTraceback(file=sys.__stderr__) # transport is the same, just trigger a resumeProducing.
self._writer = None if self._producer and r.transport is self._producer.transport:
self.hs.get_reactor().callLater(1, self._write_loop) self._producer.resumeProducing()
return return
try: # If the producer is still producing, stop it.
for event in self._buffer: if self._producer:
r.transport.write( self._producer.stopProducing()
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8" # Make a new producer and start it.
) self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
) r.transport.registerProducer(self._producer, True)
r.transport.write(b"\n") self._producer.resumeProducing()
self._buffer.clear() self._connection_waiter = None
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
self._writer = False
self.hs.get_reactor().callLater(1, self._write_loop)
def _handle_pressure(self) -> None: def _handle_pressure(self) -> None:
""" """
...@@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object): ...@@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object):
self._logger.failure("Failed clearing backpressure") self._logger.failure("Failed clearing backpressure")
# Try and write immediately. # Try and write immediately.
self._write_loop() self._connect()
...@@ -379,6 +379,7 @@ class FakeTransport(object): ...@@ -379,6 +379,7 @@ class FakeTransport(object):
disconnecting = False disconnecting = False
disconnected = False disconnected = False
connected = True
buffer = attr.ib(default=b"") buffer = attr.ib(default=b"")
producer = attr.ib(default=None) producer = attr.ib(default=None)
autoflush = attr.ib(default=True) autoflush = attr.ib(default=True)
...@@ -402,6 +403,7 @@ class FakeTransport(object): ...@@ -402,6 +403,7 @@ class FakeTransport(object):
"FakeTransport: Delaying disconnect until buffer is flushed" "FakeTransport: Delaying disconnect until buffer is flushed"
) )
else: else:
self.connected = False
self.disconnected = True self.disconnected = True
def abortConnection(self): def abortConnection(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment