Skip to content
Snippets Groups Projects
Commit 3accee1a authored by Erik Johnston's avatar Erik Johnston
Browse files

Merge branch 'release-v0.21.0' of github.com:matrix-org/synapse

parents c45dc6c6 a5425b2e
No related branches found
No related tags found
No related merge requests found
Showing
with 650 additions and 255 deletions
Changes in synapse v0.21.0 (2017-05-18)
=======================================
No changes since v0.21.0-rc3
Changes in synapse v0.21.0-rc3 (2017-05-17)
===========================================
Features:
* Add per user rate-limiting overrides (PR #2208)
* Add config option to limit maximum number of events requested by ``/sync``
and ``/messages`` (PR #2221) Thanks to @psaavedra!
Changes:
* Various small performance fixes (PR #2201, #2202, #2224, #2226, #2227, #2228,
#2229)
* Update username availability checker API (PR #2209, #2213)
* When purging, don't de-delta state groups we're about to delete (PR #2214)
* Documentation to check synapse version (PR #2215) Thanks to @hamber-dick!
* Add an index to event_search to speed up purge history API (PR #2218)
Bug fixes:
* Fix API to allow clients to upload one-time-keys with new sigs (PR #2206)
Changes in synapse v0.21.0-rc2 (2017-05-08)
===========================================
Changes:
* Always mark remotes as up if we receive a signed request from them (PR #2190)
Bug fixes:
* Fix bug where users got pushed for rooms they had muted (PR #2200)
Changes in synapse v0.21.0-rc1 (2017-05-08)
===========================================
Features:
* Add username availability checker API (PR #2183)
* Add read marker API (PR #2120)
Changes:
* Enable guest access for the 3pl/3pid APIs (PR #1986)
* Add setting to support TURN for guests (PR #2011)
* Various performance improvements (PR #2075, #2076, #2080, #2083, #2108,
#2158, #2176, #2185)
* Make synctl a bit more user friendly (PR #2078, #2127) Thanks @APwhitehat!
* Replace HTTP replication with TCP replication (PR #2082, #2097, #2098,
#2099, #2103, #2014, #2016, #2115, #2116, #2117)
* Support authenticated SMTP (PR #2102) Thanks @DanielDent!
* Add a counter metric for successfully-sent transactions (PR #2121)
* Propagate errors sensibly from proxied IS requests (PR #2147)
* Add more granular event send metrics (PR #2178)
Bug fixes:
* Fix nuke-room script to work with current schema (PR #1927) Thanks
@zuckschwerdt!
* Fix db port script to not assume postgres tables are in the public schema
(PR #2024) Thanks @jerrykan!
* Fix getting latest device IP for user with no devices (PR #2118)
* Fix rejection of invites to unreachable servers (PR #2145)
* Fix code for reporting old verify keys in synapse (PR #2156)
* Fix invite state to always include all events (PR #2163)
* Fix bug where synapse would always fetch state for any missing event (PR #2170)
* Fix a leak with timed out HTTP connections (PR #2180)
* Fix bug where we didn't time out HTTP requests to ASes (PR #2192)
Docs:
* Clarify doc for SQLite to PostgreSQL port (PR #1961) Thanks @benhylau!
* Fix typo in synctl help (PR #2107) Thanks @HarHarLinks!
* ``web_client_location`` documentation fix (PR #2131) Thanks @matthewjwolff!
* Update README.rst with FreeBSD changes (PR #2132) Thanks @feld!
* Clarify setting up metrics (PR #2149) Thanks @encks!
Changes in synapse v0.20.0 (2017-04-11) Changes in synapse v0.20.0 (2017-04-11)
======================================= =======================================
......
...@@ -109,10 +109,10 @@ Installing prerequisites on ArchLinux:: ...@@ -109,10 +109,10 @@ Installing prerequisites on ArchLinux::
sudo pacman -S base-devel python2 python-pip \ sudo pacman -S base-devel python2 python-pip \
python-setuptools python-virtualenv sqlite3 python-setuptools python-virtualenv sqlite3
Installing prerequisites on CentOS 7:: Installing prerequisites on CentOS 7 or Fedora 25::
sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
lcms2-devel libwebp-devel tcl-devel tk-devel \ lcms2-devel libwebp-devel tcl-devel tk-devel redhat-rpm-config \
python-virtualenv libffi-devel openssl-devel python-virtualenv libffi-devel openssl-devel
sudo yum groupinstall "Development Tools" sudo yum groupinstall "Development Tools"
...@@ -246,6 +246,25 @@ Setting up a TURN server ...@@ -246,6 +246,25 @@ Setting up a TURN server
For reliable VoIP calls to be routed via this homeserver, you MUST configure For reliable VoIP calls to be routed via this homeserver, you MUST configure
a TURN server. See `<docs/turn-howto.rst>`_ for details. a TURN server. See `<docs/turn-howto.rst>`_ for details.
IPv6
----
As of Synapse 0.19 we finally support IPv6, many thanks to @kyrias and @glyph
for providing PR #1696.
However, for federation to work on hosts with IPv6 DNS servers you **must**
be running Twisted 17.1.0 or later - see https://github.com/matrix-org/synapse/issues/1002
for details. We can't make Synapse depend on Twisted 17.1 by default
yet as it will break most older distributions (see https://github.com/matrix-org/synapse/pull/1909)
so if you are using operating system dependencies you'll have to install your
own Twisted 17.1 package via pip or backports etc.
If you're running in a virtualenv then pip should have installed the newest
Twisted automatically, but if your virtualenv is old you will need to manually
upgrade to a newer Twisted dependency via:
pip install Twisted>=17.1.0
Running Synapse Running Synapse
=============== ===============
...@@ -336,8 +355,11 @@ ArchLinux ...@@ -336,8 +355,11 @@ ArchLinux
--------- ---------
The quickest way to get up and running with ArchLinux is probably with the community package The quickest way to get up and running with ArchLinux is probably with the community package
https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in all https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in most of
the necessary dependencies. the necessary dependencies. If the default web client is to be served (enabled by default in
the generated config),
https://www.archlinux.org/packages/community/any/python2-matrix-angular-sdk/ will also need to
be installed.
Alternatively, to install using pip a few changes may be needed as ArchLinux Alternatively, to install using pip a few changes may be needed as ArchLinux
defaults to python 3, but synapse currently assumes python 2.7 by default: defaults to python 3, but synapse currently assumes python 2.7 by default:
...@@ -374,7 +396,7 @@ FreeBSD ...@@ -374,7 +396,7 @@ FreeBSD
Synapse can be installed via FreeBSD Ports or Packages contributed by Brendan Molloy from: Synapse can be installed via FreeBSD Ports or Packages contributed by Brendan Molloy from:
- Ports: ``cd /usr/ports/net/py-matrix-synapse && make install clean`` - Ports: ``cd /usr/ports/net-im/py-matrix-synapse && make install clean``
- Packages: ``pkg install py27-matrix-synapse`` - Packages: ``pkg install py27-matrix-synapse``
......
...@@ -28,6 +28,15 @@ running: ...@@ -28,6 +28,15 @@ running:
git pull git pull
# Update the versions of synapse's python dependencies. # Update the versions of synapse's python dependencies.
python synapse/python_dependencies.py | xargs -n1 pip install --upgrade python synapse/python_dependencies.py | xargs -n1 pip install --upgrade
To check whether your update was sucessfull, run:
.. code:: bash
# replace your.server.domain with ther domain of your synaspe homeserver
curl https://<your.server.domain>/_matrix/federation/v1/version
So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
Upgrading to v0.15.0 Upgrading to v0.15.0
......
...@@ -36,15 +36,13 @@ class HttpClient(object): ...@@ -36,15 +36,13 @@ class HttpClient(object):
the request body. This will be encoded as JSON. the request body. This will be encoded as JSON.
Returns: Returns:
Deferred: Succeeds when we get *any* HTTP response. Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
""" """
pass pass
def get_json(self, url, args=None): def get_json(self, url, args=None):
""" Get's some json from the given host homeserver and path """ Gets some json from the given host homeserver and path
Args: Args:
url (str): The URL to GET data from. url (str): The URL to GET data from.
...@@ -54,10 +52,8 @@ class HttpClient(object): ...@@ -54,10 +52,8 @@ class HttpClient(object):
and *not* a string. and *not* a string.
Returns: Returns:
Deferred: Succeeds when we get *any* HTTP response. Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
""" """
pass pass
...@@ -214,4 +210,4 @@ class _JsonProducer(object): ...@@ -214,4 +210,4 @@ class _JsonProducer(object):
pass pass
def stopProducing(self): def stopProducing(self):
pass pass
\ No newline at end of file
Query Account
=============
This API returns information about a specific user account.
The api is::
GET /_matrix/client/r0/admin/whois/<user_id>
including an ``access_token`` of a server admin.
It returns a JSON body like the following:
.. code:: json
{
"user_id": "<user_id>",
"devices": {
"": {
"sessions": [
{
"connections": [
{
"ip": "1.2.3.4",
"last_seen": 1417222374433,
"user_agent": "Mozilla/5.0 ..."
},
{
"ip": "1.2.3.10",
"last_seen": 1417222374500,
"user_agent": "Dalvik/2.1.0 ..."
}
]
}
]
}
}
}
``last_seen`` is measured in milliseconds since the Unix epoch.
Deactivate Account
==================
This API deactivates an account. It removes active access tokens, resets the
password, and deletes third-party IDs (to prevent the user requesting a
password reset).
The api is::
POST /_matrix/client/r0/admin/deactivate/<user_id>
including an ``access_token`` of a server admin, and an empty request body.
Reset password
==============
Changes the password of another user.
The api is::
POST /_matrix/client/r0/admin/reset_password/<user_id>
with a body of:
.. code:: json
{
"new_password": "<secret>"
}
including an ``access_token`` of a server admin.
...@@ -21,13 +21,12 @@ How to monitor Synapse metrics using Prometheus ...@@ -21,13 +21,12 @@ How to monitor Synapse metrics using Prometheus
3. Add a prometheus target for synapse. 3. Add a prometheus target for synapse.
It needs to set the ``metrics_path`` to a non-default value:: It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``)::
- job_name: "synapse" - job_name: "synapse"
metrics_path: "/_synapse/metrics" metrics_path: "/_synapse/metrics"
static_configs: static_configs:
- targets: - targets: ["my.server.here:9092"]
"my.server.here:9092"
If your prometheus is older than 1.5.2, you will need to replace If your prometheus is older than 1.5.2, you will need to replace
``static_configs`` in the above with ``target_groups``. ``static_configs`` in the above with ``target_groups``.
......
...@@ -112,9 +112,9 @@ script one last time, e.g. if the SQLite database is at ``homeserver.db`` ...@@ -112,9 +112,9 @@ script one last time, e.g. if the SQLite database is at ``homeserver.db``
run:: run::
synapse_port_db --sqlite-database homeserver.db \ synapse_port_db --sqlite-database homeserver.db \
--postgres-config database_config.yaml --postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file using the ``database_config`` parameter (see database configuration file ``homeserver-postgres.yaml`` (i.e. rename it to
`Synapse Config`_) and restart synapse. Synapse should now be running against ``homeserver.yaml``) and restart synapse. Synapse should now be running against
PostgreSQL. PostgreSQL.
...@@ -26,28 +26,10 @@ expose the append-only log to the readers should be fairly minimal. ...@@ -26,28 +26,10 @@ expose the append-only log to the readers should be fairly minimal.
Architecture Architecture
------------ ------------
The Replication API The Replication Protocol
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~
Synapse will optionally expose a long poll HTTP API for extracting updates. The See ``tcp_replication.rst``
API will have a similar shape to /sync in that clients provide tokens
indicating where in the log they have reached and a timeout. The synapse server
then either responds with updates immediately if it already has updates or it
waits until the timeout for more updates. If the timeout expires and nothing
happened then the server returns an empty response.
However unlike the /sync API this replication API is returning synapse specific
data rather than trying to implement a matrix specification. The replication
results are returned as arrays of rows where the rows are mostly lifted
directly from the database. This avoids unnecessary JSON parsing on the server
and hopefully avoids an impedance mismatch between the data returned and the
required updates to the datastore.
This does not replicate all the database tables as many of the database tables
are indexes that can be recovered from the contents of other tables.
The format and parameters for the api are documented in
``synapse/replication/resource.py``.
The Slaved DataStore The Slaved DataStore
......
TCP Replication
===============
Motivation
----------
Previously the workers used an HTTP long poll mechanism to get updates from the
master, which had the problem of causing a lot of duplicate work on the server.
This TCP protocol replaces those APIs with the aim of increased efficiency.
Overview
--------
The protocol is based on fire and forget, line based commands. An example flow
would be (where '>' indicates master to worker and '<' worker to master flows)::
> SERVER example.com
< REPLICATE events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
The example shows the server accepting a new connection and sending its identity
with the ``SERVER`` command, followed by the client asking to subscribe to the
``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
commands which have the format ``RDATA <stream_name> <token> <row>``, where the
format of ``<row>`` is defined by the individual streams.
Error reporting happens by either the client or server sending an `ERROR`
command, and usually the connection will be closed.
Since the protocol is a simple line based, its possible to manually connect to
the server using a tool like netcat. A few things should be noted when manually
using the protocol:
* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
be used to get all future updates. The special stream name ``ALL`` can be used
with ``NOW`` to subscribe to all available streams.
* The federation stream is only available if federation sending has been
disabled on the main process.
* The server will only time connections out that have sent a ``PING`` command.
If a ping is sent then the connection will be closed if no further commands
are receieved within 15s. Both the client and server protocol implementations
will send an initial PING on connection and ensure at least one command every
5s is sent (not necessarily ``PING``).
* ``RDATA`` commands *usually* include a numeric token, however if the stream
has multiple rows to replicate per token the server will send multiple
``RDATA`` commands, with all but the last having a token of ``batch``. See
the documentation on ``commands.RdataCommand`` for further details.
Architecture
------------
The basic structure of the protocol is line based, where the initial word of
each line specifies the command. The rest of the line is parsed based on the
command. For example, the `RDATA` command is defined as::
RDATA <stream_name> <token> <row_json>
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
Blank lines are ignored.
Keep alives
~~~~~~~~~~~
Both sides are expected to send at least one command every 5s or so, and
should send a ``PING`` command if necessary. If either side do not receive a
command within e.g. 15s then the connection should be closed.
Because the server may be connected to manually using e.g. netcat, the timeouts
aren't enabled until an initial ``PING`` command is seen. Both the client and
server implementations below send a ``PING`` command immediately on connection to
ensure the timeouts are enabled.
This ensures that both sides can quickly realize if the tcp connection has gone
and handle the situation appropriately.
Start up
~~~~~~~~
When a new connection is made, the server:
* Sends a ``SERVER`` command, which includes the identity of the server, allowing
the client to detect if its connected to the expected server
* Sends a ``PING`` command as above, to enable the client to time out connections
promptly.
The client:
* Sends a ``NAME`` command, allowing the server to associate a human friendly
name with the connection. This is optional.
* Sends a ``PING`` as above
* For each stream the client wishes to subscribe to it sends a ``REPLICATE``
with the stream_name and token it wants to subscribe from.
* On receipt of a ``SERVER`` command, checks that the server name matches the
expected server name.
Error handling
~~~~~~~~~~~~~~
If either side detects an error it can send an ``ERROR`` command and close the
connection.
If the client side loses the connection to the server it should reconnect,
following the steps above.
Congestion
~~~~~~~~~~
If the server sends messages faster than the client can consume them the server
will first buffer a (fairly large) number of commands and then disconnect the
client. This ensures that we don't queue up an unbounded number of commands in
memory and gives us a potential oppurtunity to squawk loudly. When/if the client
recovers it can reconnect to the server and ask for missed messages.
Reliability
~~~~~~~~~~~
In general the replication stream should be considered an unreliable transport
since e.g. commands are not resent if the connection disappears.
The exception to that are the replication streams, i.e. RDATA commands, since
these include tokens which can be used to restart the stream on connection
errors.
The client should keep track of the token in the last RDATA command received
for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details.
Example
~~~~~~~
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
indicate which side is sending, these are *not* included on the wire::
* connection established *
> SERVER localhost:8823
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
The ``POSITION`` command sent by the server is used to set the clients position
without needing to send data with the ``RDATA`` command.
An example of a batched set of ``RDATA`` is::
> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
In this case the client shouldn't advance their caches token until it sees the
the last ``RDATA``.
List of commands
~~~~~~~~~~~~~~~~
The list of valid commands, with which side can send it: server (S) or client (C):
SERVER (S)
Sent at the start to identify which server the client is talking to
RDATA (S)
A single update in a stream
POSITION (S)
The position of the stream has been updated
ERROR (S, C)
There was an error
PING (S, C)
Sent periodically to ensure the connection is still alive
NAME (C)
Sent at the start by client to inform the server who they are
REPLICATE (C)
Asks the server to replicate a given stream
USER_SYNC (C)
A user has started or stopped syncing
FEDERATION_ACK (C)
Acknowledge receipt of some federation data
REMOVE_PUSHER (C)
Inform the server a pusher should be removed
INVALIDATE_CACHE (C)
Inform the server a cache should be invalidated
SYNC (S, C)
Used exclusively in tests
See ``synapse/replication/tcp/commands.py`` for a detailed description and the
format of each command.
...@@ -50,14 +50,37 @@ You may be able to setup coturn via your package manager, or set it up manually ...@@ -50,14 +50,37 @@ You may be able to setup coturn via your package manager, or set it up manually
pwgen -s 64 1 pwgen -s 64 1
5. Ensure youe firewall allows traffic into the TURN server on 5. Consider your security settings. TURN lets users request a relay
which will connect to arbitrary IP addresses and ports. At the least
we recommend:
# VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay.
no-tcp-relay
# don't let the relay ever try to connect to private IP address ranges within your network (if any)
# given the turn server is likely behind your firewall, remember to include any privileged public IPs too.
denied-peer-ip=10.0.0.0-10.255.255.255
denied-peer-ip=192.168.0.0-192.168.255.255
denied-peer-ip=172.16.0.0-172.31.255.255
# special case the turn server itself so that client->TURN->TURN->client flows work
allowed-peer-ip=10.0.0.1
# consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS.
user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user.
total-quota=1200
Ideally coturn should refuse to relay traffic which isn't SRTP;
see https://github.com/matrix-org/synapse/issues/2009
6. Ensure your firewall allows traffic into the TURN server on
the ports you've configured it to listen on (remember to allow the ports you've configured it to listen on (remember to allow
both TCP and UDP if you've enabled both). both TCP and UDP TURN traffic)
6. If you've configured coturn to support TLS/DTLS, generate or 7. If you've configured coturn to support TLS/DTLS, generate or
import your private key and certificate. import your private key and certificate.
7. Start the turn server:: 8. Start the turn server::
bin/turnserver -o bin/turnserver -o
...@@ -83,12 +106,19 @@ Your home server configuration file needs the following extra keys: ...@@ -83,12 +106,19 @@ Your home server configuration file needs the following extra keys:
to refresh credentials. The TURN REST API specification recommends to refresh credentials. The TURN REST API specification recommends
one day (86400000). one day (86400000).
4. "turn_allow_guests": Whether to allow guest users to use the TURN
server. This is enabled by default, as otherwise VoIP will not
work reliably for guests. However, it does introduce a security risk
as it lets guests connect to arbitrary endpoints without having gone
through a CAPTCHA or similar to register a real account.
As an example, here is the relevant section of the config file for As an example, here is the relevant section of the config file for
matrix.org:: matrix.org::
turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ] turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ]
turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons
turn_user_lifetime: 86400000 turn_user_lifetime: 86400000
turn_allow_guests: True
Now, restart synapse:: Now, restart synapse::
......
...@@ -12,7 +12,7 @@ across multiple processes is a recipe for disaster, plus you should be using ...@@ -12,7 +12,7 @@ across multiple processes is a recipe for disaster, plus you should be using
postgres anyway if you care about scalability). postgres anyway if you care about scalability).
The workers communicate with the master synapse process via a synapse-specific The workers communicate with the master synapse process via a synapse-specific
HTTP protocol called 'replication' - analogous to MySQL or Postgres style TCP protocol called 'replication' - analogous to MySQL or Postgres style
database replication; feeding a stream of relevant data to the workers so they database replication; feeding a stream of relevant data to the workers so they
can be kept in sync with the main synapse process and database state. can be kept in sync with the main synapse process and database state.
...@@ -21,16 +21,11 @@ To enable workers, you need to add a replication listener to the master synapse, ...@@ -21,16 +21,11 @@ To enable workers, you need to add a replication listener to the master synapse,
listeners: listeners:
- port: 9092 - port: 9092
bind_address: '127.0.0.1' bind_address: '127.0.0.1'
type: http type: replication
tls: false
x_forwarded: false
resources:
- names: [replication]
compress: false
Under **no circumstances** should this replication API listener be exposed to the Under **no circumstances** should this replication API listener be exposed to the
public internet; it currently implements no authentication whatsoever and is public internet; it currently implements no authentication whatsoever and is
unencrypted HTTP. unencrypted.
You then create a set of configs for the various worker processes. These should be You then create a set of configs for the various worker processes. These should be
worker configuration files should be stored in a dedicated subdirectory, to allow worker configuration files should be stored in a dedicated subdirectory, to allow
...@@ -50,14 +45,16 @@ e.g. the HTTP listener that it provides (if any); logging configuration; etc. ...@@ -50,14 +45,16 @@ e.g. the HTTP listener that it provides (if any); logging configuration; etc.
You should minimise the number of overrides though to maintain a usable config. You should minimise the number of overrides though to maintain a usable config.
You must specify the type of worker application (worker_app) and the replication You must specify the type of worker application (worker_app) and the replication
endpoint that it's talking to on the main synapse process (worker_replication_url). endpoint that it's talking to on the main synapse process (worker_replication_host
and worker_replication_port).
For instance:: For instance::
worker_app: synapse.app.synchrotron worker_app: synapse.app.synchrotron
# The replication listener on the synapse to talk to. # The replication listener on the synapse to talk to.
worker_replication_url: http://127.0.0.1:9092/_synapse/replication worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_listeners: worker_listeners:
- type: http - type: http
...@@ -95,4 +92,3 @@ To manipulate a specific worker, you pass the -w option to synctl:: ...@@ -95,4 +92,3 @@ To manipulate a specific worker, you pass the -w option to synctl::
All of the above is highly experimental and subject to change as Synapse evolves, All of the above is highly experimental and subject to change as Synapse evolves,
but documenting it here to help folks needing highly scalable Synapses similar but documenting it here to help folks needing highly scalable Synapses similar
to the one running matrix.org! to the one running matrix.org!
...@@ -9,16 +9,39 @@ ...@@ -9,16 +9,39 @@
ROOMID="$1" ROOMID="$1"
sqlite3 homeserver.db <<EOF sqlite3 homeserver.db <<EOF
DELETE FROM context_depth WHERE context = '$ROOMID'; DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM current_state WHERE context = '$ROOMID'; DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID'; DELETE FROM event_edges WHERE room_id = '$ROOMID';
DELETE FROM messages WHERE room_id = '$ROOMID'; DELETE FROM room_depth WHERE room_id = '$ROOMID';
DELETE FROM pdu_backward_extremities WHERE context = '$ROOMID'; DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM pdu_edges WHERE context = '$ROOMID'; DELETE FROM events WHERE room_id = '$ROOMID';
DELETE FROM pdu_forward_extremities WHERE context = '$ROOMID'; DELETE FROM event_json WHERE room_id = '$ROOMID';
DELETE FROM pdus WHERE context = '$ROOMID'; DELETE FROM state_events WHERE room_id = '$ROOMID';
DELETE FROM room_data WHERE room_id = '$ROOMID'; DELETE FROM current_state_events WHERE room_id = '$ROOMID';
DELETE FROM room_memberships WHERE room_id = '$ROOMID'; DELETE FROM room_memberships WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID';
DELETE FROM topics WHERE room_id = '$ROOMID';
DELETE FROM room_names WHERE room_id = '$ROOMID';
DELETE FROM rooms WHERE room_id = '$ROOMID'; DELETE FROM rooms WHERE room_id = '$ROOMID';
DELETE FROM state_pdus WHERE context = '$ROOMID'; DELETE FROM room_hosts WHERE room_id = '$ROOMID';
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
DELETE FROM local_invites WHERE room_id = '$ROOMID';
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
DELETE FROM event_reports WHERE room_id = '$ROOMID';
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
DELETE FROM event_auth WHERE room_id = '$ROOMID';
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
VACUUM;
EOF EOF
...@@ -447,9 +447,7 @@ class Porter(object): ...@@ -447,9 +447,7 @@ class Porter(object):
postgres_tables = yield self.postgres_store._simple_select_onecol( postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables", table="information_schema.tables",
keyvalues={ keyvalues={},
"table_schema": "public",
},
retcol="distinct table_name", retcol="distinct table_name",
) )
......
...@@ -16,4 +16,4 @@ ...@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.20.0" __version__ = "0.21.0"
...@@ -66,6 +66,17 @@ class CodeMessageException(RuntimeError): ...@@ -66,6 +66,17 @@ class CodeMessageException(RuntimeError):
return cs_error(self.msg) return cs_error(self.msg)
class MatrixCodeMessageException(CodeMessageException):
"""An error from a general matrix endpoint, eg. from a proxied Matrix API call.
Attributes:
errcode (str): Matrix error code e.g 'M_FORBIDDEN'
"""
def __init__(self, code, msg, errcode=Codes.UNKNOWN):
super(MatrixCodeMessageException, self).__init__(code, msg)
self.errcode = errcode
class SynapseError(CodeMessageException): class SynapseError(CodeMessageException):
"""A base exception type for matrix errors which have an errcode and error """A base exception type for matrix errors which have an errcode and error
message (as well as an HTTP status code). message (as well as an HTTP status code).
......
...@@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore ...@@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from synapse import events from synapse import events
from twisted.internet import reactor, defer from twisted.internet import reactor
from twisted.web.resource import Resource from twisted.web.resource import Resource
from daemonize import Daemonize from daemonize import Daemonize
...@@ -120,30 +120,25 @@ class AppserviceServer(HomeServer): ...@@ -120,30 +120,25 @@ class AppserviceServer(HomeServer):
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
@defer.inlineCallbacks self.get_tcp_replication().start_replication(self)
def replicate(self):
http_client = self.get_simple_http_client() def build_tcp_replication(self):
store = self.get_datastore() return ASReplicationHandler(self)
replication_url = self.config.worker_replication_url
appservice_handler = self.get_application_service_handler()
class ASReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks def __init__(self, hs):
def replicate(results): super(ASReplicationHandler, self).__init__(hs.get_datastore())
stream = results.get("events") self.appservice_handler = hs.get_application_service_handler()
if stream:
max_stream_id = stream["position"] def on_rdata(self, stream_name, token, rows):
yield appservice_handler.notify_interested_services(max_stream_id) super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
while True: if stream_name == "events":
try: max_stream_id = self.store.get_room_max_stream_ordering()
args = store.stream_positions() preserve_fn(
args["timeout"] = 30000 self.appservice_handler.notify_interested_services
result = yield http_client.get_json(replication_url, args=args) )(max_stream_id)
yield store.process_replication(result)
replicate(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
def start(config_options): def start(config_options):
...@@ -199,7 +194,6 @@ def start(config_options): ...@@ -199,7 +194,6 @@ def start(config_options):
reactor.run() reactor.run()
def start(): def start():
ps.replicate()
ps.get_datastore().start_profiling() ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching() ps.get_state_handler().start_caching()
......
...@@ -30,11 +30,11 @@ from synapse.replication.slave.storage.room import RoomStore ...@@ -30,11 +30,11 @@ from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
...@@ -45,7 +45,7 @@ from synapse.crypto import context_factory ...@@ -45,7 +45,7 @@ from synapse.crypto import context_factory
from synapse import events from synapse import events
from twisted.internet import reactor, defer from twisted.internet import reactor
from twisted.web.resource import Resource from twisted.web.resource import Resource
from daemonize import Daemonize from daemonize import Daemonize
...@@ -145,21 +145,10 @@ class ClientReaderServer(HomeServer): ...@@ -145,21 +145,10 @@ class ClientReaderServer(HomeServer):
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
@defer.inlineCallbacks self.get_tcp_replication().start_replication(self)
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
while True: def build_tcp_replication(self):
try: return ReplicationClientHandler(self.get_datastore())
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def start(config_options): def start(config_options):
...@@ -209,7 +198,6 @@ def start(config_options): ...@@ -209,7 +198,6 @@ def start(config_options):
def start(): def start():
ss.get_state_handler().start_caching() ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling() ss.get_datastore().start_profiling()
ss.replicate()
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
......
...@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore ...@@ -27,9 +27,9 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
...@@ -42,7 +42,7 @@ from synapse.crypto import context_factory ...@@ -42,7 +42,7 @@ from synapse.crypto import context_factory
from synapse import events from synapse import events
from twisted.internet import reactor, defer from twisted.internet import reactor
from twisted.web.resource import Resource from twisted.web.resource import Resource
from daemonize import Daemonize from daemonize import Daemonize
...@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer): ...@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer):
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
@defer.inlineCallbacks self.get_tcp_replication().start_replication(self)
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
while True: def build_tcp_replication(self):
try: return ReplicationClientHandler(self.get_datastore())
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def start(config_options): def start(config_options):
...@@ -198,7 +187,6 @@ def start(config_options): ...@@ -198,7 +187,6 @@ def start(config_options):
def start(): def start():
ss.get_state_handler().start_caching() ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling() ss.get_datastore().start_profiling()
ss.replicate()
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
......
...@@ -23,19 +23,19 @@ from synapse.config.homeserver import HomeServerConfig ...@@ -23,19 +23,19 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.federation import send_queue from synapse.federation import send_queue
from synapse.federation.units import Edu
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
...@@ -50,16 +50,36 @@ from daemonize import Daemonize ...@@ -50,16 +50,36 @@ from daemonize import Daemonize
import sys import sys
import logging import logging
import gc import gc
import ujson as json
logger = logging.getLogger("synapse.app.appservice") logger = logging.getLogger("synapse.app.appservice")
class FederationSenderSlaveStore( class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
): ):
pass def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
# that we don't have to bounce via a deferred once when we start the
# replication streams.
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
sql = (
"SELECT stream_id FROM federation_stream_position"
" WHERE type = ?"
)
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()
txn.execute(sql, ("federation",))
rows = txn.fetchall()
txn.close()
return rows[0][0] if rows else -1
class FederationSenderServer(HomeServer): class FederationSenderServer(HomeServer):
...@@ -127,26 +147,27 @@ class FederationSenderServer(HomeServer): ...@@ -127,26 +147,27 @@ class FederationSenderServer(HomeServer):
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
@defer.inlineCallbacks self.get_tcp_replication().start_replication(self)
def replicate(self):
http_client = self.get_simple_http_client() def build_tcp_replication(self):
store = self.get_datastore() return FederationSenderReplicationHandler(self)
replication_url = self.config.worker_replication_url
send_handler = FederationSenderHandler(self)
class FederationSenderReplicationHandler(ReplicationClientHandler):
send_handler.on_start() def __init__(self, hs):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
while True: self.send_handler = FederationSenderHandler(hs, self)
try:
args = store.stream_positions() def on_rdata(self, stream_name, token, rows):
args.update((yield send_handler.stream_positions())) super(FederationSenderReplicationHandler, self).on_rdata(
args["timeout"] = 30000 stream_name, token, rows
result = yield http_client.get_json(replication_url, args=args) )
yield store.process_replication(result) self.send_handler.process_replication_rows(stream_name, token, rows)
yield send_handler.process_replication(result)
except: def get_streams_to_replicate(self):
logger.exception("Error replicating from %r", replication_url) args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
yield sleep(30) args.update(self.send_handler.stream_positions())
return args
def start(config_options): def start(config_options):
...@@ -205,7 +226,6 @@ def start(config_options): ...@@ -205,7 +226,6 @@ def start(config_options):
reactor.run() reactor.run()
def start(): def start():
ps.replicate()
ps.get_datastore().start_profiling() ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching() ps.get_state_handler().start_caching()
...@@ -229,9 +249,15 @@ class FederationSenderHandler(object): ...@@ -229,9 +249,15 @@ class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries """Processes the replication stream and forwards the appropriate entries
to the federation sender. to the federation sender.
""" """
def __init__(self, hs): def __init__(self, hs, replication_client):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
self._room_serials = {} self._room_serials = {}
self._room_typing = {} self._room_typing = {}
...@@ -243,98 +269,35 @@ class FederationSenderHandler(object): ...@@ -243,98 +269,35 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering() self.store.get_room_max_stream_ordering()
) )
@defer.inlineCallbacks
def stream_positions(self): def stream_positions(self):
stream_id = yield self.store.get_federation_out_pos("federation") return {"federation": self.federation_position}
defer.returnValue({
"federation": stream_id,
# Ack stuff we've "processed", this should only be called from def process_replication_rows(self, stream_name, token, rows):
# one process.
"federation_ack": stream_id,
})
@defer.inlineCallbacks
def process_replication(self, result):
# The federation stream contains things that we want to send out, e.g. # The federation stream contains things that we want to send out, e.g.
# presence, typing, etc. # presence, typing, etc.
fed_stream = result.get("federation") if stream_name == "federation":
if fed_stream: send_queue.process_rows_for_federation(self.federation_sender, rows)
latest_id = int(fed_stream["position"]) preserve_fn(self.update_token)(token)
# The federation stream containis a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
presence_to_send = {}
keyed_edus = {}
edus = {}
failures = {}
device_destinations = set()
# Parse the rows in the stream
for row in fed_stream["rows"]:
position, typ, content_js = row
content = json.loads(content_js)
if typ == send_queue.PRESENCE_TYPE:
destination = content["destination"]
state = UserPresenceState.from_dict(content["state"])
presence_to_send.setdefault(destination, []).append(state)
elif typ == send_queue.KEYED_EDU_TYPE:
key = content["key"]
edu = Edu(**content["edu"])
keyed_edus.setdefault(
edu.destination, {}
)[(edu.destination, tuple(key))] = edu
elif typ == send_queue.EDU_TYPE:
edu = Edu(**content)
edus.setdefault(edu.destination, []).append(edu)
elif typ == send_queue.FAILURE_TYPE:
destination = content["destination"]
failure = content["failure"]
failures.setdefault(destination, []).append(failure)
elif typ == send_queue.DEVICE_MESSAGE_TYPE:
device_destinations.add(content["destination"])
else:
raise Exception("Unrecognised federation type: %r", typ)
# We've finished collecting, send everything off
for destination, states in presence_to_send.items():
self.federation_sender.send_presence(destination, states)
for destination, edu_map in keyed_edus.items():
for key, edu in edu_map.items():
self.federation_sender.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in edus.items():
for edu in edu_list:
self.federation_sender.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination, failure_list in failures.items():
for failure in failure_list:
self.federation_sender.send_failure(destination, failure)
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)
# Record where we are in the stream.
yield self.store.update_federation_out_pos(
"federation", latest_id
)
# We also need to poke the federation sender when new events happen # We also need to poke the federation sender when new events happen
event_stream = result.get("events") elif stream_name == "events":
if event_stream: self.federation_sender.notify_new_events(token)
latest_pos = event_stream["position"]
self.federation_sender.notify_new_events(latest_pos) @defer.inlineCallbacks
def update_token(self, token):
self.federation_position = token
# We linearize here to ensure we don't have races updating the token
with (yield self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
yield self.store.update_federation_out_pos(
"federation", self.federation_position
)
# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -25,7 +25,7 @@ import synapse.config.logger ...@@ -25,7 +25,7 @@ import synapse.config.logger
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.python_dependencies import ( from synapse.python_dependencies import (
check_requirements, DEPENDENCY_LINKS check_requirements, CONDITIONAL_REQUIREMENTS
) )
from synapse.rest import ClientRestResource from synapse.rest import ClientRestResource
...@@ -55,7 +55,7 @@ from synapse.crypto import context_factory ...@@ -55,7 +55,7 @@ from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
...@@ -92,7 +92,7 @@ def build_resource_for_web_client(hs): ...@@ -92,7 +92,7 @@ def build_resource_for_web_client(hs):
"\n" "\n"
"You can also disable hosting of the webclient via the\n" "You can also disable hosting of the webclient via the\n"
"configuration option `web_client`\n" "configuration option `web_client`\n"
% {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]} % {"dep": CONDITIONAL_REQUIREMENTS["web_client"].keys()[0]}
) )
syweb_path = os.path.dirname(syweb.__file__) syweb_path = os.path.dirname(syweb.__file__)
webclient_path = os.path.join(syweb_path, "webclient") webclient_path = os.path.join(syweb_path, "webclient")
...@@ -166,9 +166,6 @@ class SynapseHomeServer(HomeServer): ...@@ -166,9 +166,6 @@ class SynapseHomeServer(HomeServer):
if name == "metrics" and self.get_config().enable_metrics: if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(self)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationResource(self)
if WEB_CLIENT_PREFIX in resources: if WEB_CLIENT_PREFIX in resources:
root_resource = RootRedirect(WEB_CLIENT_PREFIX) root_resource = RootRedirect(WEB_CLIENT_PREFIX)
else: else:
...@@ -222,6 +219,16 @@ class SynapseHomeServer(HomeServer): ...@@ -222,6 +219,16 @@ class SynapseHomeServer(HomeServer):
), ),
interface=address interface=address
) )
elif listener["type"] == "replication":
bind_addresses = listener["bind_addresses"]
for address in bind_addresses:
factory = ReplicationStreamProtocolFactory(self)
server_listener = reactor.listenTCP(
listener["port"], factory, interface=address
)
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment