Skip to content
Snippets Groups Projects
register_new_matrix_user 4.39 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Copyright 2015 OpenMarket Ltd
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import argparse
    import getpass
    import hashlib
    import hmac
    import json
    import sys
    import urllib2
    import yaml
    
    
    def request_registration(user, password, server_location, shared_secret):
        mac = hmac.new(
            key=shared_secret,
            msg=user,
            digestmod=hashlib.sha1,
        ).hexdigest()
    
        data = {
            "username": user,
            "password": password,
            "mac": mac,
        }
    
        server_location = server_location.rstrip("/")
    
        print "Sending registration request..."
    
        req = urllib2.Request(
            "%s/_matrix/client/v2_alpha/register" % (server_location,),
            data=json.dumps(data),
            headers={'Content-Type': 'application/json'}
        )
        try:
            if sys.version_info[:3] >= (2, 7, 9):
                # As of version 2.7.9, urllib2 now checks SSL certs
                import ssl
                f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
            else:
                f = urllib2.urlopen(req)
            f.read()
            f.close()
            print "Success."
        except urllib2.HTTPError as e:
            print "ERROR! Received %d %s" % (e.code, e.reason,)
            if 400 <= e.code < 500:
                if e.info().type == "application/json":
                    resp = json.load(e)
                    if "error" in resp:
                        print resp["error"]
            sys.exit(1)
    
    
    def register_new_user(user, password, server_location, shared_secret):
        if not user:
            try:
                default_user = getpass.getuser()
            except:
                default_user = None
    
            if default_user:
                user = raw_input("New user localpart [%s]: " % (default_user,))
                if not user:
                    user = default_user
            else:
                user = raw_input("New user localpart: ")
    
        if not user:
            print "Invalid user name"
            sys.exit(1)
    
        if not password:
            password = getpass.getpass("Password: ")
    
            if not password:
                print "Password cannot be blank."
                sys.exit(1)
    
            confirm_password = getpass.getpass("Confirm password: ")
    
            if password != confirm_password:
                print "Passwords do not match"
                sys.exit(1)
    
        request_registration(user, password, server_location, shared_secret)
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser(
            description="Used to register new users with a given home server when"
                        " registration has been disabled. The home server must be"
                        " configured with the 'registration_shared_secret' option"
                        " set.",
        )
        parser.add_argument(
            "-u", "--user",
            default=None,
            help="Local part of the new user. Will prompt if omitted.",
        )
        parser.add_argument(
            "-p", "--password",
            default=None,
            help="New password for user. Will prompt if omitted.",
        )
    
        group = parser.add_mutually_exclusive_group(required=True)
        group.add_argument(
            "-c", "--config",
            type=argparse.FileType('r'),
            help="Path to server config file. Used to read in shared secret.",
        )
    
        group.add_argument(
            "-k", "--shared-secret",
            help="Shared secret as defined in server config file.",
        )
    
        parser.add_argument(
            "server_url",
            default="https://localhost:8448",
            nargs='?',
            help="URL to use to talk to the home server. Defaults to "
                 " 'https://localhost:8448'.",
        )
    
        args = parser.parse_args()
    
        if "config" in args and args.config:
            config = yaml.safe_load(args.config)
            secret = config.get("registration_shared_secret", None)
            if not secret:
                print "No 'registration_shared_secret' defined in config."
                sys.exit(1)
        else:
            secret = args.shared_secret
    
        register_new_user(args.user, args.password, args.server_url, secret)