Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • maunium/synapse
  • leytilera/synapse
2 results
Show changes
Showing
with 746 additions and 238 deletions
......@@ -7,7 +7,7 @@ ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
# target image. For repeated rebuilds, this is much faster than apt installing
# each time.
FROM debian:bullseye-slim AS deps_base
FROM docker.io/library/debian:bookworm-slim AS deps_base
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
......@@ -21,7 +21,7 @@ FROM debian:bullseye-slim AS deps_base
# which makes it much easier to copy (but we need to make sure we use an image
# based on the same debian version as the synapse image, to make sure we get
# the expected version of libc.
FROM redis:6-bullseye AS redis_base
FROM docker.io/library/redis:7-bookworm AS redis_base
# now build the final image, based on the the regular Synapse docker image
FROM $FROM
......
......@@ -15,7 +15,7 @@ and run Synapse against Complement.
Consult the [contributing guide][guideComplementSh] for instructions on how to use it.
[guideComplementSh]: https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-integration-tests-complement
[guideComplementSh]: https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-integration-tests-complement
## Building and running the images manually
......
......@@ -45,7 +45,7 @@ docker run -it --rm \
```
For information on picking a suitable server name, see
https://matrix-org.github.io/synapse/latest/setup/installation.html.
https://element-hq.github.io/synapse/latest/setup/installation.html.
The above command will generate a `homeserver.yaml` in (typically)
`/var/lib/docker/volumes/synapse-data/_data`. You should check this file, and
......@@ -73,11 +73,12 @@ The following environment variables are supported in `generate` mode:
will log sensitive information such as access tokens.
This should not be needed unless you are a developer attempting to debug something
particularly tricky.
* `SYNAPSE_LOG_TESTING`: if set, Synapse will log additional information useful
for testing.
## Postgres
By default the config will use SQLite. See the [docs on using Postgres](https://github.com/matrix-org/synapse/blob/develop/docs/postgres.md) for more info on how to use Postgres. Until this section is improved [this issue](https://github.com/matrix-org/synapse/issues/8304) may provide useful information.
By default the config will use SQLite. See the [docs on using Postgres](https://github.com/element-hq/synapse/blob/develop/docs/postgres.md) for more info on how to use Postgres. Until this section is improved [this issue](https://github.com/element-hq/synapse/issues/8304) may provide useful information.
## Running synapse
......@@ -113,6 +114,9 @@ The following environment variables are supported in `run` mode:
is set via `docker run --user`, defaults to `991`, `991`. Note that this user
must have permission to read the config files, and write to the data directories.
* `TZ`: the [timezone](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) the container will run with. Defaults to `UTC`.
* `SYNAPSE_HTTP_PROXY`: Passed through to the Synapse process as the `http_proxy` environment variable.
* `SYNAPSE_HTTPS_PROXY`: Passed through to the Synapse process as the `https_proxy` environment variable.
* `SYNAPSE_NO_PROXY`: Passed through to the Synapse process as `no_proxy` environment variable.
For more complex setups (e.g. for workers) you can also pass your args directly to synapse using `run` mode. For example like this:
......@@ -150,10 +154,10 @@ is suitable for local testing, but for any practical use, you will either need
to use a reverse proxy, or configure Synapse to expose an HTTPS port.
For documentation on using a reverse proxy, see
https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.md.
https://github.com/element-hq/synapse/blob/master/docs/reverse_proxy.md.
For more information on enabling TLS support in synapse itself, see
https://matrix-org.github.io/synapse/latest/setup/installation.html#tls-certificates. Of
https://element-hq.github.io/synapse/latest/setup/installation.html#tls-certificates. Of
course, you will need to expose the TLS port from the container with a `-p`
argument to `docker run`.
......@@ -241,4 +245,4 @@ healthcheck:
Jemalloc is embedded in the image and will be used instead of the default allocator.
You can read about jemalloc by reading the Synapse
[Admin FAQ](https://matrix-org.github.io/synapse/latest/usage/administration/admin_faq.html#help-synapse-is-slow-and-eats-all-my-ramcpu).
[Admin FAQ](https://element-hq.github.io/synapse/latest/usage/administration/admin_faq.html#help-synapse-is-slow-and-eats-all-my-ramcpu).
......@@ -11,6 +11,9 @@ DIST=$(cut -d ':' -f2 <<< "${distro:?}")
cp -aT /synapse/source /synapse/build
cd /synapse/build
# Delete any existing `.so` files to ensure a clean build.
rm -f /synapse/build/synapse/*.so
# if this is a prerelease, set the Section accordingly.
#
# When the package is later added to the package repo, reprepro will use the
......
# syntax=docker/dockerfile:1
# This dockerfile builds on top of 'docker/Dockerfile-workers' in matrix-org/synapse
# This dockerfile builds on top of 'docker/Dockerfile-workers' in element-hq/synapse
# by including a built-in postgres instance, as well as setting up the homeserver so
# that it is ready for testing via Complement.
#
# Instructions for building this image from those it depends on is detailed in this guide:
# https://github.com/matrix-org/synapse/blob/develop/docker/README-testing.md#testing-with-postgresql-and-single-or-multi-process-synapse
# https://github.com/element-hq/synapse/blob/develop/docker/README-testing.md#testing-with-postgresql-and-single-or-multi-process-synapse
ARG SYNAPSE_VERSION=latest
# This is an intermediate image, to be built locally (not pulled from a registry).
ARG FROM=matrixdotorg/synapse-workers:$SYNAPSE_VERSION
FROM $FROM
# First of all, we copy postgres server from the official postgres image,
# since for repeated rebuilds, this is much faster than apt installing
# postgres each time.
# This trick only works because (a) the Synapse image happens to have all the
# shared libraries that postgres wants, (b) we use a postgres image based on
# the same debian version as Synapse's docker image (so the versions of the
# shared libraries match).
RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
COPY --from=postgres:13-bullseye /usr/lib/postgresql /usr/lib/postgresql
COPY --from=postgres:13-bullseye /usr/share/postgresql /usr/share/postgresql
RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
ENV PGDATA=/var/lib/postgresql/data
# We also initialize the database at build time, rather than runtime, so that it's faster to spin up the image.
RUN gosu postgres initdb --locale=C --encoding=UTF-8 --auth-host password
# Configure a password and create a database for Synapse
RUN echo "ALTER USER postgres PASSWORD 'somesecret'" | gosu postgres postgres --single
RUN echo "CREATE DATABASE synapse" | gosu postgres postgres --single
# Extend the shared homeserver config to disable rate-limiting,
# set Complement's static shared secret, enable registration, amongst other
# tweaks to get Synapse ready for testing.
# To do this, we copy the old template out of the way and then include it
# with Jinja2.
RUN mv /conf/shared.yaml.j2 /conf/shared-orig.yaml.j2
COPY conf/workers-shared-extra.yaml.j2 /conf/shared.yaml.j2
WORKDIR /data
COPY conf/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
# Copy the entrypoint
COPY conf/start_for_complement.sh /
# Expose nginx's listener ports
EXPOSE 8008 8448
ENTRYPOINT ["/start_for_complement.sh"]
# Update the healthcheck to have a shorter check interval
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
CMD /bin/sh /healthcheck.sh
# First of all, we copy postgres server from the official postgres image,
# since for repeated rebuilds, this is much faster than apt installing
# postgres each time.
# This trick only works because (a) the Synapse image happens to have all the
# shared libraries that postgres wants, (b) we use a postgres image based on
# the same debian version as Synapse's docker image (so the versions of the
# shared libraries match).
RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
COPY --from=docker.io/library/postgres:13-bookworm /usr/lib/postgresql /usr/lib/postgresql
COPY --from=docker.io/library/postgres:13-bookworm /usr/share/postgresql /usr/share/postgresql
RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
ENV PGDATA=/var/lib/postgresql/data
# We also initialize the database at build time, rather than runtime, so that it's faster to spin up the image.
RUN gosu postgres initdb --locale=C --encoding=UTF-8 --auth-host password
# Configure a password and create a database for Synapse
RUN echo "ALTER USER postgres PASSWORD 'somesecret'" | gosu postgres postgres --single
RUN echo "CREATE DATABASE synapse" | gosu postgres postgres --single
# Extend the shared homeserver config to disable rate-limiting,
# set Complement's static shared secret, enable registration, amongst other
# tweaks to get Synapse ready for testing.
# To do this, we copy the old template out of the way and then include it
# with Jinja2.
RUN mv /conf/shared.yaml.j2 /conf/shared-orig.yaml.j2
COPY conf/workers-shared-extra.yaml.j2 /conf/shared.yaml.j2
WORKDIR /data
COPY conf/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
# Copy the entrypoint
COPY conf/start_for_complement.sh /
# Expose nginx's listener ports
EXPOSE 8008 8448
ENTRYPOINT ["/start_for_complement.sh"]
# Update the healthcheck to have a shorter check interval
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
CMD /bin/sh /healthcheck.sh
......@@ -30,3 +30,14 @@ Consult `scripts-dev/complement.sh` in the repository root for a real example.
[complement]: https://github.com/matrix-org/complement
[complementEnv]: https://github.com/matrix-org/complement/pull/382
## How to modify homeserver.yaml for Complement tests
It's common for MSCs to be gated behind a feature flag like this:
```yaml
experimental_features:
faster_joins: true
```
To modify this for the Complement image, modify `./conf/workers-shared-extra.yaml.j2`. Despite the name,
this will affect non-worker mode as well. Remember to _rebuild_ the image (so don't use `-e` if using
`complement.sh`).
[program:postgres]
command=/usr/local/bin/prefix-log gosu postgres postgres
# Only start if START_POSTGRES=1
# Only start if START_POSTGRES=true
autostart=%(ENV_START_POSTGRES)s
# Lower priority number = starts first
......
......@@ -32,8 +32,9 @@ case "$SYNAPSE_COMPLEMENT_DATABASE" in
;;
sqlite|"")
# Configure supervisord not to start Postgres, as we don't need it
export START_POSTGRES=false
# Set START_POSTGRES to false unless it has already been set
# (i.e. by another container image inheriting our own).
export START_POSTGRES=${START_POSTGRES:-false}
;;
*)
......@@ -51,8 +52,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
# -z True if the length of string is zero.
if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
export SYNAPSE_WORKER_TYPES="\
event_persister, \
event_persister, \
event_persister:2, \
background_worker, \
frontend_proxy, \
event_creator, \
......@@ -64,10 +64,16 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
synchrotron, \
client_reader, \
appservice, \
pusher"
pusher, \
stream_writers=account_data+presence+receipts+to_device+typing"
fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"
# adjust connection pool limits on worker mode as otherwise running lots of worker synapses
# can make docker unhappy (in GHA)
export POSTGRES_CP_MIN=1
export POSTGRES_CP_MAX=3
echo "using reduced connection pool limits for worker mode"
# Improve startup times by using a launcher based on fork()
export SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER=1
else
......
......@@ -7,6 +7,7 @@
#}
## Server ##
public_baseurl: http://127.0.0.1:8008/
report_stats: False
trusted_key_servers: []
enable_registration: true
......@@ -84,6 +85,18 @@ rc_invites:
per_user:
per_second: 1000
burst_count: 1000
per_issuer:
per_second: 1000
burst_count: 1000
rc_presence:
per_user:
per_second: 9999
burst_count: 9999
rc_delayed_event_mgmt:
per_second: 9999
burst_count: 9999
federation_rr_transactions_per_room_per_second: 9999
......@@ -92,10 +105,6 @@ allow_device_name_lookup_over_federation: true
## Experimental Features ##
experimental_features:
# Enable history backfilling support
msc2716_enabled: true
# client-side support for partial state in /send_join responses
faster_joins: true
# Enable support for polls
msc3381_polls_enabled: true
# Enable deleting device-specific notification settings stored in account data
......@@ -104,6 +113,20 @@ experimental_features:
msc3391_enabled: true
# Filtering /messages by relation type.
msc3874_enabled: true
# no UIA for x-signing upload for the first time
msc3967_enabled: true
# Expose a room summary for public rooms
msc3266_enabled: true
# Send to-device messages to application services
msc2409_to_device_messages_enabled: true
# Allow application services to masquerade devices
msc3202_device_masquerading: true
# Sending device list changes, one-time key counts and fallback key usage to application services
msc3202_transaction_extensions: true
# Proxy OTK claim requests to exclusive ASes
msc3983_appservice_otk_claims: true
# Proxy key queries to exclusive ASes
msc3984_appservice_key_query: true
server_notices:
system_mxid_localpart: _server
......@@ -111,10 +134,18 @@ server_notices:
system_mxid_avatar_url: ""
room_name: "Server Alert"
# Enable delayed events (msc4140)
max_event_delay_duration: 24h
# Disable sync cache so that initial `/sync` requests are up-to-date.
caches:
sync_response_cache_duration: 0
# Complement assumes that it can publish to the room list by default.
room_list_publication_rules:
- action: allow
{% include "shared-orig.yaml.j2" %}
......@@ -35,9 +35,16 @@ server {
# Send all other traffic to the main process
location ~* ^(\\/_matrix|\\/_synapse) {
{% if using_unix_sockets %}
proxy_pass http://unix:/run/main_public.sock;
{% else %}
# note: do not add a path (even a single /) after the port in `proxy_pass`,
# otherwise nginx will canonicalise the URI and cause signature verification
# errors.
proxy_pass http://localhost:8080;
{% endif %}
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
proxy_set_header Host $host:$server_port;
}
}
......@@ -6,6 +6,9 @@
{% if enable_redis %}
redis:
enabled: true
{% if using_unix_sockets %}
path: /tmp/redis.sock
{% endif %}
{% endif %}
{% if appservice_registrations is not none %}
......
......@@ -19,7 +19,11 @@ username=www-data
autorestart=true
[program:redis]
{% if using_unix_sockets %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
{% else %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
{% endif %}
priority=1
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
......
{% if use_forking_launcher %}
[program:synapse_fork]
environment=http_proxy="%(ENV_SYNAPSE_HTTP_PROXY)s",https_proxy="%(ENV_SYNAPSE_HTTPS_PROXY)s",no_proxy="%(ENV_SYNAPSE_NO_PROXY)s"
command=/usr/local/bin/python -m synapse.app.complement_fork_starter
{{ main_config_path }}
synapse.app.homeserver
......@@ -20,6 +21,7 @@ exitcodes=0
{% else %}
[program:synapse_main]
environment=http_proxy="%(ENV_SYNAPSE_HTTP_PROXY)s",https_proxy="%(ENV_SYNAPSE_HTTPS_PROXY)s",no_proxy="%(ENV_SYNAPSE_NO_PROXY)s"
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
......@@ -36,6 +38,7 @@ exitcodes=0
{% for worker in workers %}
[program:synapse_{{ worker.name }}]
environment=http_proxy="%(ENV_SYNAPSE_HTTP_PROXY)s",https_proxy="%(ENV_SYNAPSE_HTTPS_PROXY)s",no_proxy="%(ENV_SYNAPSE_NO_PROXY)s"
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
......
......@@ -6,13 +6,13 @@
worker_app: "{{ app }}"
worker_name: "{{ name }}"
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_listeners:
- type: http
{% if using_unix_sockets %}
path: "/run/worker.{{ port }}"
{% else %}
port: {{ port }}
{% endif %}
{% if listener_resources %}
resources:
- names:
......
......@@ -36,12 +36,17 @@ listeners:
# Allow configuring in case we want to reverse proxy 8008
# using another process in the same container
{% if SYNAPSE_USE_UNIX_SOCKET %}
# Unix sockets don't care about TLS or IP addresses or ports
- path: '/run/main_public.sock'
type: http
{% else %}
- port: {{ SYNAPSE_HTTP_PORT or 8008 }}
tls: false
bind_addresses: ['::']
type: http
x_forwarded: false
{% endif %}
resources:
- names: [client]
compress: true
......@@ -57,10 +62,13 @@ database:
user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}"
{% if not SYNAPSE_USE_UNIX_SOCKET %}
{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
cp_min: 5
cp_max: 10
{% endif %}
cp_min: {{ POSTGRES_CP_MIN or 5 }}
cp_max: {{ POSTGRES_CP_MAX or 10 }}
{% else %}
database:
name: "sqlite3"
......@@ -168,7 +176,6 @@ app_service_config_files:
{% endif %}
macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
expire_access_token: False
## Signing Keys ##
......
......@@ -49,17 +49,35 @@ handlers:
class: logging.StreamHandler
formatter: precise
{% if not SYNAPSE_LOG_SENSITIVE %}
{#
If SYNAPSE_LOG_SENSITIVE is unset, then override synapse.storage.SQL to INFO
so that DEBUG entries (containing sensitive information) are not emitted.
#}
loggers:
# This is just here so we can leave `loggers` in the config regardless of whether
# we configure other loggers below (avoid empty yaml dict error).
_placeholder:
level: "INFO"
{% if not SYNAPSE_LOG_SENSITIVE %}
{#
If SYNAPSE_LOG_SENSITIVE is unset, then override synapse.storage.SQL to INFO
so that DEBUG entries (containing sensitive information) are not emitted.
#}
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
{% endif %}
{% endif %}
{% if SYNAPSE_LOG_TESTING %}
{#
If Synapse is under test, log a few more useful things for a developer
attempting to debug something particularly tricky.
With `synapse.visibility.filtered_event_debug`, it logs when events are (maybe
unexpectedly) filtered out of responses in tests. It's just nice to be able to
look at the CI log and figure out why an event isn't being returned.
#}
synapse.visibility.filtered_event_debug:
level: DEBUG
{% endif %}
root:
level: {{ SYNAPSE_LOG_LEVEL or "INFO" }}
......
#!/usr/bin/env python
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2021 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script reads environment variables and generates a shared Synapse worker,
# nginx and supervisord configs depending on the workers requested.
......@@ -19,8 +26,15 @@
# The environment variables it reads are:
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
# below. Leave empty for no workers.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
# below. Leave empty for no workers. Add a ':' and a number at the end to
# multiply that worker. Append multiple worker types with '+' to merge the
# worker types into a single worker. Add a name and a '=' to the front of a
# worker type to give this instance a name in logs and nginx.
# Examples:
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
......@@ -33,6 +47,8 @@
# log level. INFO is the default.
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
# regardless of the SYNAPSE_LOG_LEVEL setting.
# * SYNAPSE_LOG_TESTING: if set, Synapse will log additional information useful
# for testing.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
......@@ -40,15 +56,39 @@
import os
import platform
import re
import subprocess
import sys
from argparse import ArgumentParser
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
from typing import (
Any,
Dict,
List,
Mapping,
MutableMapping,
NoReturn,
Optional,
Set,
SupportsIndex,
)
import yaml
from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093
# Obviously, these would only be used with the UNIX socket option
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
......@@ -70,12 +110,14 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
"shared_extra_conf": {
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.media_repository",
"listener_resources": ["media"],
"app": "synapse.app.generic_worker",
"listener_resources": ["media", "client"],
"endpoint_patterns": [
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
......@@ -83,11 +125,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
"^/_matrix/client/v1/media/.*$",
"^/_matrix/federation/v1/media/.*$",
],
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
"enable_media_repo": False,
"media_instance_running_background_jobs": "media_repository1",
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
},
"worker_extra_conf": "enable_media_repo: true",
},
......@@ -95,7 +139,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
"shared_extra_conf": {
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"federation_sender": {
......@@ -135,6 +181,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/versions$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
"^/_matrix/client/(r0|v3|unstable)/register$",
"^/_matrix/client/(r0|v3|unstable)/register/available$",
"^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
......@@ -143,6 +190,10 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
"^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
"^/_matrix/client/(r0|v3|unstable)/password_policy$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
......@@ -162,6 +213,8 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/federation/(v1|v2)/make_leave/",
"^/_matrix/federation/(v1|v2)/send_join/",
"^/_matrix/federation/(v1|v2)/send_leave/",
"^/_matrix/federation/v1/make_knock/",
"^/_matrix/federation/v1/send_knock/",
"^/_matrix/federation/(v1|v2)/invite/",
"^/_matrix/federation/(v1|v2)/query_auth/",
"^/_matrix/federation/(v1|v2)/event_auth/",
......@@ -192,9 +245,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
# This worker cannot be sharded. Therefore there should only ever be one background
# worker, and it should be named background_worker1
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
......@@ -207,7 +260,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
......@@ -262,6 +314,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"push_rules": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}
# Templates for sections that may be inserted multiple times in config files
......@@ -275,7 +334,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """
"""
NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_base_name} {{
{body}
}}
"""
......@@ -326,7 +385,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
......@@ -334,22 +393,37 @@ def add_worker_roles_to_shared_config(
append appropriate worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being converted to YAML)
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication streams
# The instance_map config field marks the workers that write to various replication
# streams
instance_map = shared_config.setdefault("instance_map", {})
# Worker-type specific sharding config
if worker_type == "pusher":
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
"push_rules",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
elif worker_type == "federation_sender":
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
elif worker_type == "event_persister":
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
......@@ -357,24 +431,168 @@ def add_worker_roles_to_shared_config(
)
# Map of stream writer instance names to host/ports combos
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
# Update the list of stream writers
# It's convenient that the name of the worker type is the same as the stream to write
shared_config.setdefault("stream_writers", {}).setdefault(
worker_type, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
existing_dict: Optional[Dict[str, Any]],
to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""When given an existing dict of worker template configuration consisting with both
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
return new dict.
Args:
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
"""
new_dict: Dict[str, Any] = {}
if not existing_dict:
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
new_dict = to_be_merged_dict.copy()
else:
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced later
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works
# because all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
dict_to_edit["shared_extra_conf"][k] = worker_name
return dict_to_edit
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
"""
Apply multiplier(if found) by returning a new expanded list with some basic error
checking.
Args:
worker_types: The unprocessed List of requested workers
Returns:
A new list with all requested workers expanded.
"""
# Checking performed:
# 1. if worker:2 or more is declared, it will create additional workers up to number
# 2. if worker:1, it will create a single copy of this worker as if no number was
# given
# 3. if worker:0 is declared, this worker will be ignored. This is to allow for
# scripting and automated expansion and is intended behaviour.
# 4. if worker:NaN or is a negative number, it will error and log it.
new_worker_types = []
for worker_type in worker_types:
if ":" in worker_type:
worker_type_components = split_and_strip_string(worker_type, ":", 1)
worker_count = 0
# Should only be 2 components, a type of worker(s) and an integer as a
# string. Cast the number as an int then it can be used as a counter.
try:
worker_count = int(worker_type_components[1])
except ValueError:
error(
f"Bad number in worker count for '{worker_type}': "
f"'{worker_type_components[1]}' is not an integer"
)
# As long as there are more than 0, we add one to the list to make below.
for _ in range(worker_count):
new_worker_types.append(worker_type_components[0])
else:
# If it's not a real worker_type, it will error out later.
new_worker_types.append(worker_type)
return new_worker_types
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
return worker_type not in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]
def split_and_strip_string(
given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]:
"""
Helper to split a string on split_char and strip whitespace from each end of each
element.
Args:
given_string: The string to split
split_char: The character to split the string on
max_split: kwarg for split() to limit how many times the split() happens
Returns:
A List of strings
"""
# Removes whitespace from ends of result strings before adding to list. Allow for
# overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
def generate_base_homeserver_config() -> None:
......@@ -389,37 +607,173 @@ def generate_base_homeserver_config() -> None:
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
requested_worker_types: List[str],
) -> Dict[str, Set[str]]:
"""Read the desired list of requested workers and prepare the data for use in
generating worker config files while also checking for potential gotchas.
Args:
requested_worker_types: The list formed from the split environment variable
containing the unprocessed requests for workers.
Returns: A dict of worker names to set of worker types. Format:
{'worker_name':
{'worker_type', 'worker_type2'}
}
"""
# A counter of worker_base_name -> int. Used for determining the name for a given
# worker when generating its config file, as each worker's name is just
# worker_base_name followed by instance number
worker_base_name_counter: Dict[str, int] = defaultdict(int)
# Similar to above, but more finely grained. This is used to determine we don't have
# more than a single worker for cases where multiples would be bad(e.g. presence).
worker_type_shard_counter: Dict[str, int] = defaultdict(int)
# The final result of all this processing
dict_to_return: Dict[str, Set[str]] = {}
# Handle any multipliers requested for given workers.
multiple_processed_worker_types = apply_requested_multiplier_for_worker(
requested_worker_types
)
# Process each worker_type_string
# Examples of expected formats:
# - requested_name=type1+type2+type3
# - synchrotron
# - event_creator+event_persister
for worker_type_string in multiple_processed_worker_types:
# First, if a name is requested, use that — otherwise generate one.
worker_base_name: str = ""
if "=" in worker_type_string:
# Split on "=", remove extra whitespace from ends then make list
worker_type_split = split_and_strip_string(worker_type_string, "=")
if len(worker_type_split) > 2:
error(
"There should only be one '=' in the worker type string. "
f"Please fix: {worker_type_string}"
)
# Assign the name
worker_base_name = worker_type_split[0]
if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name):
# Apply a fairly narrow regex to the worker names. Some characters
# aren't safe for use in file paths or nginx configurations.
# Don't allow to end with a number because we'll add a number
# ourselves in a moment.
error(
"Invalid worker name; please choose a name consisting of "
"alphanumeric letters, _ + -, but not ending with a digit: "
f"{worker_base_name!r}"
)
# Continue processing the remainder of the worker_type string
# with the name override removed.
worker_type_string = worker_type_split[1]
# Split the worker_type_string on "+", remove whitespace from ends then make
# the list a set so it's deduplicated.
worker_types_set: Set[str] = set(
split_and_strip_string(worker_type_string, "+")
)
if not worker_base_name:
# No base name specified: generate one deterministically from set of
# types
worker_base_name = "+".join(sorted(worker_types_set))
# At this point, we have:
# worker_base_name which is the name for the worker, without counter.
# worker_types_set which is the set of worker types for this worker.
# Validate worker_type and make sure we don't allow sharding for a worker type
# that doesn't support it. Will error and stop if it is a problem,
# e.g. 'background_worker'.
for worker_type in worker_types_set:
# Verify this is a real defined worker type. If it's not, stop everything so
# it can be fixed.
if worker_type not in WORKERS_CONFIG:
error(
f"{worker_type} is an unknown worker type! Was found in "
f"'{worker_type_string}'. Please fix!"
)
if worker_type in worker_type_shard_counter:
if not is_sharding_allowed_for_worker_type(worker_type):
error(
f"There can be only a single worker with {worker_type} "
"type. Please recount and remove."
)
# Not in shard counter, must not have seen it yet, add it.
worker_type_shard_counter[worker_type] += 1
# Generate the number for the worker using incrementing counter
worker_base_name_counter[worker_base_name] += 1
worker_number = worker_base_name_counter[worker_base_name]
worker_name = f"{worker_base_name}{worker_number}"
if worker_number > 1:
# If this isn't the first worker, check that we don't have a confusing
# mixture of worker types with the same base name.
first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
if first_worker_with_base_name != worker_types_set:
error(
f"Can not use worker_name: '{worker_name}' for worker_type(s): "
f"{worker_types_set!r}. It is already in use by "
f"worker_type(s): {first_worker_with_base_name!r}"
)
dict_to_return[worker_name] = worker_types_set
return dict_to_return
def generate_worker_files(
environ: Mapping[str, str], config_path: str, data_dir: str
environ: Mapping[str, str],
config_path: str,
data_dir: str,
requested_worker_types: Dict[str, Set[str]],
) -> None:
"""Read the desired list of workers from environment variables and generate
shared homeserver, nginx and supervisord configs.
"""Read the desired workers(if any) that is passed in and generate shared
homeserver, nginx and supervisord configs.
Args:
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
requested_worker_types: A Dict containing requested workers in the format of
{'worker_name1': {'worker_type', ...}}
"""
# Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below.
# shared_config is the contents of a Synapse config file that will be shared amongst
# the main Synapse process as well as all workers.
# It is intended mainly for disabling functionality when certain workers are spun up,
# and adding a replication listener.
# First read the original config file and extract the listeners block. Then we'll add
# another listener for replication. Later we'll write out the result to the shared
# config file.
listeners = [
{
"port": 9093,
"bind_address": "127.0.0.1",
"type": "http",
"resources": [{"names": ["replication"]}],
}
]
# Convenience helper for if using unix sockets instead of host:port
using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
# First read the original config file and extract the listeners block. Then we'll
# add another listener for replication. Later we'll write out the result to the
# shared config file.
listeners: List[Any]
if using_unix_sockets:
listeners = [
{
"path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
"type": "http",
"resources": [{"names": ["replication"]}],
}
]
else:
listeners = [
{
"port": MAIN_PROCESS_REPLICATION_PORT,
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
"type": "http",
"resources": [{"names": ["replication"]}],
}
]
with open(config_path) as file_stream:
original_config = yaml.safe_load(file_stream)
original_listeners = original_config.get("listeners")
......@@ -427,9 +781,9 @@ def generate_worker_files(
listeners += original_listeners
# The shared homeserver config. The contents of which will be inserted into the
# base shared worker jinja2 template.
#
# This config file will be passed to all workers, included Synapse's main process.
# base shared worker jinja2 template. This config file will be passed to all
# workers, included Synapse's main process. It is intended mainly for disabling
# functionality when certain workers are spun up, and adding a replication listener.
shared_config: Dict[str, Any] = {"listeners": listeners}
# List of dicts that describe workers.
......@@ -437,31 +791,20 @@ def generate_worker_files(
# program blocks.
worker_descriptors: List[Dict[str, Any]] = []
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
# ports of each worker. For example:
# Upstreams for load-balancing purposes. This dict takes the form of the worker
# type to the ports of each worker. For example:
# {
# worker_type: {1234, 1235, ...}}
# }
# and will be used to construct 'upstream' nginx directives.
nginx_upstreams: Dict[str, Set[int]] = {}
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
# placed after the proxy_pass directive. The main benefit to representing this data as a
# dict over a str is that we can easily deduplicate endpoints across multiple instances
# of the same worker.
#
# An nginx site config that will be amended to depending on the workers that are
# spun up. To be placed in /etc/nginx/conf.d.
nginx_locations = {}
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
if not worker_types_env:
# No workers, just the main process
worker_types = []
else:
# Split type names by comma, ignoring whitespace.
worker_types = [x.strip() for x in worker_types_env.split(",")]
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
# will be placed after the proxy_pass directive. The main benefit to representing
# this data as a dict over a str is that we can easily deduplicate endpoints
# across multiple instances of the same worker. The final rendering will be combined
# with nginx_upstreams and placed in /etc/nginx/conf.d.
nginx_locations: Dict[str, str] = {}
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
......@@ -469,76 +812,86 @@ def generate_worker_files(
# Start worker ports from this arbitrary port
worker_port = 18009
# A counter of worker_type -> int. Used for determining the name for a given
# worker type when generating its config file, as each worker's name is just
# worker_type + instance #
worker_type_counter: Dict[str, int] = {}
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
# For each worker type specified by the user, create config values
for worker_type in worker_types:
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
else:
error(worker_type + " is an unknown worker type! Please fix!")
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
# This list ends up being part of the command line to curl, (curl added support for
# Unix sockets in version 7.40).
if using_unix_sockets:
healthcheck_urls = [
f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
# The scheme and hostname from the following URL are ignored.
# The only thing that matters is the path `/health`
"http://localhost/health"
]
else:
healthcheck_urls = ["http://localhost:8080/health"]
# Get the set of all worker types that we have configured
all_worker_types_in_use = set(chain(*requested_worker_types.values()))
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
# For each worker type specified by the user, create config values and write it's
# yaml config file
for worker_name, worker_types_set in requested_worker_types.items():
# The collected and processed data will live here.
worker_config: Dict[str, Any] = {}
# Merge all worker config templates for this worker into a single config
for worker_type in worker_types_set:
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
# Merge worker type template configuration data. It's a combination of lists
# and dicts, so use this helper.
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
)
# Replace placeholder names in the config template with the actual worker name.
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
# Name workers by their type concatenated with an incrementing number
# e.g. federation_reader1
worker_name = worker_type + str(new_worker_count)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker-type specific options
shared_config.update(worker_config["shared_extra_conf"])
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
# Update the shared config with any worker_type specific options. The first of a
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"]
if using_unix_sockets:
healthcheck_urls.append(
f"--unix-socket /run/worker.{worker_port} http://localhost/health"
)
else:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_type, worker_name, worker_port
shared_config, worker_types_set, worker_name, worker_port
)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
# Add nginx location blocks for this worker's endpoints (if any are defined)
for pattern in worker_config["endpoint_patterns"]:
# Determine whether we need to load-balance this worker
if worker_type_total_count > 1:
# Create or add to a load-balanced upstream for this worker
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
# Upstreams are named after the worker_type
upstream = "http://" + worker_type
else:
upstream = "http://localhost:%d" % (worker_port,)
# Note that this endpoint should proxy to this upstream
nginx_locations[pattern] = upstream
# Write out the worker's logging config file
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
# Then a worker config file
convert(
"/conf/worker.yaml.j2",
"/conf/workers/{name}.yaml".format(name=worker_name),
f"/conf/workers/{worker_name}.yaml",
**worker_config,
worker_log_config_filepath=log_config_filepath,
using_unix_sockets=using_unix_sockets,
)
# Save this worker's port number to the correct nginx upstreams
for worker_type in worker_types_set:
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
worker_port += 1
# Build the nginx location config blocks
......@@ -551,15 +904,19 @@ def generate_worker_files(
# Determine the load-balancing upstreams to configure
nginx_upstream_config = ""
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = ""
for port in upstream_worker_ports:
body += " server localhost:%d;\n" % (port,)
if using_unix_sockets:
for port in upstream_worker_ports:
body += f" server unix:/run/worker.{port};\n"
else:
for port in upstream_worker_ports:
body += f" server localhost:{port};\n"
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
upstream_worker_type=upstream_worker_type,
upstream_worker_base_name=upstream_worker_base_name,
body=body,
)
......@@ -580,7 +937,20 @@ def generate_worker_files(
if reg_path.suffix.lower() in (".yaml", ".yml")
]
workers_in_use = len(worker_types) > 0
workers_in_use = len(requested_worker_types) > 0
# If there are workers, add the main process to the instance_map too.
if workers_in_use:
instance_map = shared_config.setdefault("instance_map", {})
if using_unix_sockets:
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
"path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
}
else:
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
"host": MAIN_PROCESS_LOCALHOST_ADDRESS,
"port": MAIN_PROCESS_REPLICATION_PORT,
}
# Shared homeserver config
convert(
......@@ -590,6 +960,7 @@ def generate_worker_files(
appservice_registrations=appservice_registrations,
enable_redis=workers_in_use,
workers_in_use=workers_in_use,
using_unix_sockets=using_unix_sockets,
)
# Nginx config
......@@ -600,6 +971,7 @@ def generate_worker_files(
upstream_directives=nginx_upstream_config,
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
using_unix_sockets=using_unix_sockets,
)
# Supervisord config
......@@ -609,6 +981,7 @@ def generate_worker_files(
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
enable_redis=workers_in_use,
using_unix_sockets=using_unix_sockets,
)
convert(
......@@ -648,6 +1021,7 @@ def generate_worker_log_config(
extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
"SYNAPSE_LOG_SENSITIVE"
)
extra_log_template_args["SYNAPSE_LOG_TESTING"] = environ.get("SYNAPSE_LOG_TESTING")
# Render and write the file
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
......@@ -664,6 +1038,14 @@ def generate_worker_log_config(
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
parser = ArgumentParser()
parser.add_argument(
"--generate-only",
action="store_true",
help="Only generate configuration; don't run Synapse.",
)
opts = parser.parse_args(args)
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
......@@ -678,13 +1060,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_base_homeserver_config()
else:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of file).
# Don't re-configure workers in this instance.
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
# Only process worker_types if they exist
if not worker_types_env:
# No workers, just the main process
worker_types = []
requested_worker_types: Dict[str, Any] = {}
else:
# Split type names by comma, ignoring whitespace.
worker_types = split_and_strip_string(worker_types_env, ",")
requested_worker_types = parse_worker_types(worker_types)
# Always regenerate all other config files
log("Generating worker config files")
generate_worker_files(environ, config_path, data_dir)
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(mark_filepath, "w") as f:
......@@ -692,6 +1087,10 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
else:
log("Worker config exists—not regenerating")
if opts.generate_only:
log("--generate-only: won't run Synapse")
return
# Lifted right out of start.py
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
......@@ -700,6 +1099,13 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
else:
log("Could not find %s, will not use" % (jemallocpath,))
# Empty strings are falsy in Python so this default is fine. We just can't have these
# be undefined because supervisord will complain about our
# `%(ENV_SYNAPSE_HTTP_PROXY)s` usage.
environ.setdefault("SYNAPSE_HTTP_PROXY", "")
environ.setdefault("SYNAPSE_HTTPS_PROXY", "")
environ.setdefault("SYNAPSE_NO_PROXY", "")
# Start supervisord, which will start Synapse, all of the configured worker
# processes, redis, nginx etc. according to the config we created above.
log("Starting supervisord")
......@@ -714,4 +1120,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if __name__ == "__main__":
main(sys.argv, os.environ)
main(sys.argv[1:], os.environ)
......@@ -8,9 +8,9 @@ ARG PYTHON_VERSION=3.9
###
### Stage 0: generate requirements.txt
###
# We hardcode the use of Debian bullseye here because this could change upstream
# and other Dockerfiles used for testing are expecting bullseye.
FROM docker.io/python:${PYTHON_VERSION}-slim-bullseye
# We hardcode the use of Debian bookworm here because this could change upstream
# and other Dockerfiles used for testing are expecting bookworm.
FROM docker.io/library/python:${PYTHON_VERSION}-slim-bookworm
# Install Rust and other dependencies (stolen from normal Dockerfile)
# install the OS build deps
......@@ -33,7 +33,7 @@ RUN \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
libwebp7 \
xmlsec1 \
libjemalloc2 \
&& rm -rf /var/lib/apt/lists/*
......
......@@ -7,6 +7,9 @@
# prefix-log command [args...]
#
exec 1> >(awk '{print "'"${SUPERVISOR_PROCESS_NAME}"' | "$0}' >&1)
exec 2> >(awk '{print "'"${SUPERVISOR_PROCESS_NAME}"' | "$0}' >&2)
# '-W interactive' is a `mawk` extension which disables buffering on stdout and sets line-buffered reads on
# stdin. The effect is that the output is flushed after each line, rather than being batched, which helps reduce
# confusion due to to interleaving of the different processes.
exec 1> >(awk -W interactive '{print "'"${SUPERVISOR_PROCESS_NAME}"' | "$0 }' >&1)
exec 2> >(awk -W interactive '{print "'"${SUPERVISOR_PROCESS_NAME}"' | "$0 }' >&2)
exec "$@"
......@@ -82,7 +82,7 @@ def generate_config_from_template(
with open(filename) as handle:
value = handle.read()
else:
log("Generating a random secret for {}".format(secret))
log(f"Generating a random secret for {secret}")
value = codecs.encode(os.urandom(32), "hex").decode()
with open(filename, "w") as handle:
handle.write(value)
......@@ -160,11 +160,6 @@ def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) ->
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
if ownership is not None:
# make sure that synapse has perms to write to the data dir.
log(f"Setting ownership on {data_dir} to {ownership}")
subprocess.run(["chown", ownership, data_dir], check=True)
# create a suitable log config from our template
log_config_file = "%s/%s.log.config" % (config_dir, server_name)
if not os.path.exists(log_config_file):
......@@ -189,9 +184,15 @@ def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) ->
"--generate-config",
"--open-private-ports",
]
if ownership is not None:
# make sure that synapse has perms to write to the data dir.
log(f"Setting ownership on {data_dir} to {ownership}")
subprocess.run(["chown", ownership, data_dir], check=True)
args = ["gosu", ownership] + args
# log("running %s" % (args, ))
flush_buffers()
os.execv(sys.executable, args)
subprocess.run(args, check=True)
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
......@@ -239,7 +240,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
log("Could not find %s, will not use" % (jemallocpath,))
# if there are no config files passed to synapse, try adding the default file
if not any(p.startswith("--config-path") or p.startswith("-c") for p in args):
if not any(p.startswith(("--config-path", "-c")) for p in args):
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get(
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
......