Skip to content
Snippets Groups Projects
synctl 12.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Copyright 2014-2016 OpenMarket Ltd
    
    # Copyright 2018 New Vector Ltd
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import argparse
    import collections
    import errno
    import glob
    import os
    import os.path
    import signal
    import subprocess
    import sys
    import time
    
    import yaml
    
    
    from synapse.config import find_config_files
    
    
    SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
    
    GREEN = "\x1b[1;32m"
    YELLOW = "\x1b[1;33m"
    RED = "\x1b[1;31m"
    NORMAL = "\x1b[m"
    
    
    def pid_running(pid):
        try:
            os.kill(pid, 0)
            return True
        except OSError as err:
            if err.errno == errno.EPERM:
                return True
            return False
    
    
    def write(message, colour=NORMAL, stream=sys.stdout):
    
        # Lets check if we're writing to a TTY before colouring
        should_colour = False
    
            should_colour = stream.isatty()
    
        except AttributeError:
            # Just in case `isatty` isn't defined on everything. The python
            # docs are incredibly vague.
            pass
    
            stream.write(message + "\n")
        else:
            stream.write(colour + message + NORMAL + "\n")
    
    
    def abort(message, colour=RED, stream=sys.stderr):
        write(message, colour, stream)
        sys.exit(1)
    
    
    
    def start(configfile: str, daemonize: bool = True) -> bool:
        """Attempts to start synapse.
        Args:
            configfile: path to a yaml synapse config file
            daemonize: whether to daemonize synapse or keep it attached to the current
                session
    
        Returns:
            True if the process started successfully
            False if there was an error starting the process
    
            If deamonize is False it will only return once synapse exits.
        """
    
    
        write("Starting ...")
        args = SYNAPSE
    
    
        if daemonize:
            args.extend(["--daemonize", "-c", configfile])
        else:
            args.extend(["-c", configfile])
    
    
        try:
            subprocess.check_call(args)
    
            write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
    
        except subprocess.CalledProcessError as e:
            write(
                "error starting (exit code: %d); see above for logs" % e.returncode,
                colour=RED,
            )
    
    def start_worker(app: str, configfile: str, worker_configfile: str) -> bool:
        """Attempts to start a synapse worker.
        Args:
            app: name of the worker's appservice
            configfile: path to a yaml synapse config file
            worker_configfile: path to worker specific yaml synapse file
    
        Returns:
            True if the process started successfully
            False if there was an error starting the process
        """
    
        args = [
            sys.executable,
            "-B",
            "-m",
            app,
            "-c",
            configfile,
            "-c",
            worker_configfile,
            "--daemonize",
        ]
    
    
        try:
            subprocess.check_call(args)
            write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
    
        except subprocess.CalledProcessError as e:
            write(
    
                "error starting %s(%r) (exit code: %d); see above for logs"
                % (app, worker_configfile, e.returncode),
    
    def stop(pidfile: str, app: str) -> bool:
        """Attempts to kill a synapse worker from the pidfile.
        Args:
            pidfile: path to file containing worker's pid
            app: name of the worker's appservice
    
        Returns:
            True if the process stopped successfully
            False if process was already stopped or an error occured
        """
    
    
        if os.path.exists(pidfile):
            pid = int(open(pidfile).read())
            try:
                os.kill(pid, signal.SIGTERM)
                write("stopped %s" % (app,), colour=GREEN)
    
            except OSError as err:
                if err.errno == errno.ESRCH:
                    write("%s not running" % (app,), colour=YELLOW)
                elif err.errno == errno.EPERM:
                    abort("Cannot stop %s: Operation not permitted" % (app,))
                else:
                    abort("Cannot stop %s: Unknown error" % (app,))
    
                return False
        else:
            write(
                "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)"
                % (app, pidfile),
                colour=YELLOW,
            )
        return False
    
    Worker = collections.namedtuple(
        "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
    )
    
    
    
    def main():
    
        parser = argparse.ArgumentParser()
    
        parser.add_argument(
            "action",
            choices=["start", "stop", "restart"],
            help="whether to start, stop or restart the synapse",
        )
        parser.add_argument(
            "configfile",
            nargs="?",
            default="homeserver.yaml",
    
            help="the homeserver config file. Defaults to homeserver.yaml. May also be"
            " a directory with *.yaml files",
    
            "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
    
            metavar="WORKERCONFIGDIR",
            help="start or stop all the workers in the given directory"
    
        parser.add_argument(
            "--no-daemonize",
            action="store_false",
    
            dest="daemonize",
    
            help="Run synapse in the foreground for debugging. "
    
            "Will work only if the daemonize option is not set in the config.",
    
    
        options = parser.parse_args()
    
        if options.worker and options.all_processes:
    
            write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
    
        if not options.daemonize and options.all_processes:
    
            write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr)
            sys.exit(1)
    
    
        configfile = options.configfile
    
        if not os.path.exists(configfile):
            write(
                "No config file found\n"
                "To generate a config file, run '%s -c %s --generate-config"
    
                " --server-name=<server name> --report-stats=<yes/no>'\n"
                % (" ".join(SYNAPSE), options.configfile),
    
        config_files = find_config_files([configfile])
        config = {}
        for config_file in config_files:
            with open(config_file) as file_stream:
                yaml_config = yaml.safe_load(file_stream)
    
            if yaml_config is not None:
                config.update(yaml_config)
    
    
        pidfile = config["pid_file"]
        cache_factor = config.get("synctl_cache_factor")
        start_stop_synapse = True
    
        if cache_factor:
            os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
    
        cache_factors = config.get("synctl_cache_factors", {})
    
        for cache_name, factor in cache_factors.items():
    
            os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
    
        worker_configfiles = []
        if options.worker:
            start_stop_synapse = False
            worker_configfile = options.worker
            if not os.path.exists(worker_configfile):
                write(
    
                    "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
    
                )
                sys.exit(1)
            worker_configfiles.append(worker_configfile)
    
        if options.all_processes:
            # To start the main synapse with -a you need to add a worker file
            # with worker_app == "synapse.app.homeserver"
            start_stop_synapse = False
            worker_configdir = options.all_processes
            if not os.path.isdir(worker_configdir):
                write(
                    "No worker config directory found at %r" % (worker_configdir,),
                    stream=sys.stderr,
                )
                sys.exit(1)
    
            worker_configfiles.extend(
                sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
            )
    
    
        workers = []
        for worker_configfile in worker_configfiles:
            with open(worker_configfile) as stream:
    
    Erik Johnston's avatar
    Erik Johnston committed
                worker_config = yaml.safe_load(stream)
    
            worker_app = worker_config["worker_app"]
            if worker_app == "synapse.app.homeserver":
                # We need to special case all of this to pick up options that may
                # be set in the main config file or in this worker config file.
    
                worker_pidfile = worker_config.get("pid_file") or pidfile
                worker_cache_factor = (
                    worker_config.get("synctl_cache_factor") or cache_factor
    
                    worker_config.get("synctl_cache_factors") or cache_factors
    
                # The master process doesn't support using worker_* config.
                for key in worker_config:
                    if key == "worker_app":  # But we allow worker_app
                        continue
    
                    assert not key.startswith(
                        "worker_"
                    ), "Main process cannot use worker_* config"
    
            else:
                worker_pidfile = worker_config["worker_pid_file"]
                worker_cache_factor = worker_config.get("synctl_cache_factor")
    
                worker_cache_factors = worker_config.get("synctl_cache_factors", {})
    
            workers.append(
                Worker(
                    worker_app,
                    worker_configfile,
                    worker_pidfile,
                    worker_cache_factor,
                    worker_cache_factors,
                )
            )
    
    
        action = options.action
    
        if action == "stop" or action == "restart":
    
            for worker in workers:
    
                if not stop(worker.pidfile, worker.app):
                    # A worker could not be stopped.
                    has_stopped = False
    
                if not stop(pidfile, "synapse.app.homeserver"):
                    has_stopped = False
    
            if not has_stopped and action == "stop":
    
    
        # Wait for synapse to actually shutdown before starting it again
        if action == "restart":
            running_pids = []
            if start_stop_synapse and os.path.exists(pidfile):
                running_pids.append(int(open(pidfile).read()))
            for worker in workers:
                if os.path.exists(worker.pidfile):
                    running_pids.append(int(open(worker.pidfile).read()))
            if len(running_pids) > 0:
                write("Waiting for process to exit before restarting...")
                for running_pid in running_pids:
                    while pid_running(running_pid):
                        time.sleep(0.2)
                write("All processes exited; now restarting...")
    
        if action == "start" or action == "restart":
    
            if start_stop_synapse:
                # Check if synapse is already running
                if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
                    abort("synapse.app.homeserver already running")
    
    
                if not start(configfile, bool(options.daemonize)):
                    error = True
    
                # Skip starting a worker if its already running
                if os.path.exists(worker.pidfile) and pid_running(
                    int(open(worker.pidfile).read())
                ):
                    print(worker.app + " already running")
                    continue
    
    
                if worker.cache_factor:
                    os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
    
    
                for cache_name, factor in worker.cache_factors.items():
    
                    os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
    
    
                if not start_worker(worker.app, configfile, worker.configfile):
                    error = True
    
                # Reset env back to the original
                os.environ.clear()
                os.environ.update(env)
    
    
    if __name__ == "__main__":
        main()