"""Flask application which provides the interface of a login panel. The application interacts with different backend, like the Kratos backend for users, Hydra for OIDC sessions and MariaDB for application and role specifications. The application provides also several command line options to interact with the user entries in the database(s)""" # Basic system imports import logging import os import urllib.parse import urllib.request import click # Hydra, OIDC Identity Provider import hydra_client # Kratos, Identity manager import ory_kratos_client #from exceptions import BackendError # Flask from flask import Flask, abort, redirect, render_template, request from flask.cli import AppGroup from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from ory_kratos_client.api import v0alpha2_api as kratos_api from areas import cli from config import * from flask import current_app from helpers import ( BadRequest, KratosError, HydraError, bad_request_error, validation_error, kratos_error, global_error, hydra_error, KratosUser, App, AppRole ) from database import db # APIs # Create HYDRA & KRATOS API interfaces HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL) # Kratos has an admin and public end-point. We create an API for them # both. The kratos implementation has bugs, which forces us to set # the discard_unknown_keys to True. tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys= True) KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL, discard_unknown_keys = True) KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) ############################################################################## # CLI INTERFACE # ############################################################################## # Define Flask CLI command groups and commands user_cli = AppGroup('user') app_cli = AppGroup('app') ## CLI APP COMMANDS @app_cli.command('create') @click.argument('slug') @click.argument('name') def create_app(slug, name): """Adds an app into the database :param slug: str short name of the app :param name: str name of the application """ current_app.logger.info(f"Creating app definition: {name} ({slug})") obj = App() obj.name = name obj.slug = slug db.session.add(obj) db.session.commit() @app_cli.command('list') def list_app(): """List all apps found in the database""" current_app.logger.info("Listing configured apps") apps = App.query.all() for obj in apps: print(f"App name: {obj.name} \t Slug: {obj.slug}") @app_cli.command('delete',) @click.argument('slug') def delete_app(slug): """Removes app from database :param slug: str Slug of app to remove """ current_app.logger.info(f"Trying to delete app: {slug}") obj = App.query.filter_by(slug=slug).first() if not obj: current_app.logger.info("Not found") return # Deleting will (probably) fail if there are still roles attached. This is a # PoC implementation only. Actually management of apps and roles will be # done by the backend application db.session.delete(obj) db.session.commit() current_app.logger.info("Success") return cli.cli.add_command(app_cli) ## CLI USER COMMANDS @user_cli.command("setrole") @click.argument("email") @click.argument("app_slug") @click.argument("role") def setrole(email, app_slug, role): """Set role for a sure :param email: Email address of user to assign role :param app_slug: Slug name of the app, for example 'nextcloud' :param role: Role to assign. currently only 'admin', 'user' """ current_app.logger.info(f"Assiging role {role} to {email} for app {app_slug}") # Find user user = KratosUser.find_by_email(KRATOS_ADMIN, email) if role not in ("admin", "user"): print("At this point only the roles 'admin' and 'user' are accepted") return if not user: print("User not found. Abort") return app_obj = db.session.query(App).filter(App.slug == app_slug).first() if not app_obj: print("App not found. Abort.") return role_obj = ( db.session.query(AppRole) .filter(AppRole.app_id == app_obj.id) .filter(AppRole.user_id == user.uuid) .first() ) if role_obj: db.session.delete(role_obj) obj = AppRole() obj.user_id = user.uuid obj.app_id = app_obj.id obj.role = role db.session.add(obj) db.session.commit() @user_cli.command("show") @click.argument("email") def show_user(email): """Show user details. Output a table with the user and details about the internal state/values of the user object :param email: Email address of the user to show """ user = KratosUser.find_by_email(KRATOS_ADMIN, email) print(user) print("") print(f"UUID: {user.uuid}") print(f"Username: {user.username}") print(f"Updated: {user.updated_at}") print(f"Created: {user.created_at}") print(f"State: {user.state}") @user_cli.command('update') @click.argument('email') @click.argument('field') @click.argument('value') def update_user(email, field, value): """Update an user object. It can modify email and name currently :param email: Email address of user to update :param field: Field to update, supported [name|email] :param value: The value to set the field with """ current_app.logger.info(f"Looking for user with email: {email}") user = KratosUser.find_by_email(KRATOS_ADMIN, email) if not user: current_app.logger.error(f"User with email {email} not found.") return if field == 'name': user.name = value elif field == 'email': user.email = value else: current_app.logger.error(f"Field not found: {field}") user.save() @user_cli.command('delete') @click.argument('email') def delete_user(email): """Delete an user from the database :param email: Email address of user to delete """ current_app.logger.info(f"Looking for user with email: {email}") user = KratosUser.find_by_email(KRATOS_ADMIN, email) if not user: current_app.logger.error(f"User with email {email} not found.") return user.delete() @user_cli.command('create') @click.argument('email') def create_user(email): """Create a user in the kratos database. The argument must be an unique email address :param email: string Email address of user to add """ current_app.logger.info(f"Creating user with email: ({email})") # Create a user user = KratosUser.find_by_email(KRATOS_ADMIN, email) if user: current_app.logger.info("User already exists. Not recreating") return user = KratosUser(KRATOS_ADMIN) user.email = email user.save() @user_cli.command('setpassword') @click.argument('email') @click.argument('password') def setpassword_user(email, password): """Set a password for an account :param email: email address of account to set a password for :param password: password to be set :return: true on success, false if not set (too weak) :rtype: boolean :raise: exception if unexepted error happens """ current_app.logger.info(f"Setting password for: ({email})") # Kratos does not provide an interface to set a password directly. However # we still want to be able to set a password. So we have to hack our way # a bit around this. We do this by creating a recovery link though the # admin interface (which is not e-mailed) and then follow the recovery # flow in the public facing pages of kratos try: # Get the ID of the user kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email) if kratos_user is None: current_app.logger.error(f"User with email '{email}' not found") return False # Get a recovery URL url = kratos_user.get_recovery_link() # Execute UI sequence to set password, given we have a recovery URL result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password) except Exception as error: current_app.logger.error(f"Error while setting password: {error}") return False if result: current_app.logger.info("Success setting password") else: current_app.logger.error("Failed to set password. Password too weak?") return result @user_cli.command('list') def list_user(): """Show a list of users in the database""" current_app.logger.info("Listing users") users = KratosUser.find_all(KRATOS_ADMIN) for obj in users: print(obj) @user_cli.command('recover') @click.argument('email') def recover_user(email): """Get recovery link for a user, to manual update the user/use :param email: Email address of the user """ current_app.logger.info(f"Trying to send recover email for user: {email}") try: # Get the ID of the user kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email) # Get a recovery URL url = kratos_user.get_recovery_link() print(url) except BackendError as error: current_app.logger.error(f"Error while getting reset link: {error}") cli.cli.add_command(user_cli)