dashboard/cliapp/cliapp/cli.py

313 lines
9.2 KiB
Python

"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and MariaDB for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
import click
import hydra_client
import ory_kratos_client
from flask import current_app
from flask.cli import AppGroup
from ory_kratos_client.api import v0alpha2_api as kratos_api
from sqlalchemy import func
from config import *
from helpers import KratosUser
from cliapp import cli
from areas.roles import Role
from areas.apps import AppRole, App
from database import db
# APIs
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL, discard_unknown_keys=True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
##############################################################################
# CLI INTERFACE #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup("user")
app_cli = AppGroup("app")
## CLI APP COMMANDS
@app_cli.command("create")
@click.argument("slug")
@click.argument("name")
def create_app(slug, name):
"""Adds an app into the database
:param slug: str short name of the app
:param name: str name of the application
"""
current_app.logger.info(f"Creating app definition: {name} ({slug})")
obj = App()
obj.name = name
obj.slug = slug
app_obj = App.query.filter_by(slug=slug).first()
if app_obj:
current_app.logger.info(f"App definition: {name} ({slug}) already exists in database")
return
db.session.add(obj)
db.session.commit()
current_app.logger.info(f"App definition: {name} ({slug}) created")
@app_cli.command("list")
def list_app():
"""List all apps found in the database"""
current_app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print(f"App name: {obj.name} \t Slug: {obj.slug}")
@app_cli.command(
"delete",
)
@click.argument("slug")
def delete_app(slug):
"""Removes app from database
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to delete app: {slug}")
obj = App.query.filter_by(slug=slug).first()
if not obj:
current_app.logger.info("Not found")
return
# Deleting will (probably) fail if there are still roles attached. This is a
# PoC implementation only. Actually management of apps and roles will be
# done by the backend application
db.session.delete(obj)
db.session.commit()
current_app.logger.info("Success")
return
cli.cli.add_command(app_cli)
## CLI USER COMMANDS
@user_cli.command("setrole")
@click.argument("email")
@click.argument("app_slug")
@click.argument("role")
def setrole(email, app_slug, role):
"""Set role for a sure
:param email: Email address of user to assign role
:param app_slug: Slug name of the app, for example 'nextcloud'
:param role: Role to assign. currently only 'admin', 'user'
"""
current_app.logger.info(f"Assiging role {role} to {email} for app {app_slug}")
# Find user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if role not in ("admin", "user"):
print("At this point only the roles 'admin' and 'user' are accepted")
return
if not user:
print("User not found. Abort")
return
app_obj = db.session.query(App).filter(App.slug == app_slug).first()
if not app_obj:
print("App not found. Abort.")
return
role_obj = (
db.session.query(AppRole)
.filter(AppRole.app_id == app_obj.id)
.filter(AppRole.user_id == user.uuid)
.first()
)
if role_obj:
db.session.delete(role_obj)
role = Role.query.filter(func.lower(Role.name) == func.lower(role)).first()
obj = AppRole()
obj.user_id = user.uuid
obj.app_id = app_obj.id
obj.role_id = role.id if role else None
db.session.add(obj)
db.session.commit()
@user_cli.command("show")
@click.argument("email")
def show_user(email):
"""Show user details. Output a table with the user and details about the
internal state/values of the user object
:param email: Email address of the user to show
"""
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
print(user)
print("")
print(f"UUID: {user.uuid}")
print(f"Username: {user.username}")
print(f"Updated: {user.updated_at}")
print(f"Created: {user.created_at}")
print(f"State: {user.state}")
@user_cli.command("update")
@click.argument("email")
@click.argument("field")
@click.argument("value")
def update_user(email, field, value):
"""Update an user object. It can modify email and name currently
:param email: Email address of user to update
:param field: Field to update, supported [name|email]
:param value: The value to set the field with
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
if field == "name":
user.name = value
elif field == "email":
user.email = value
else:
current_app.logger.error(f"Field not found: {field}")
user.save()
@user_cli.command("delete")
@click.argument("email")
def delete_user(email):
"""Delete an user from the database
:param email: Email address of user to delete
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
user.delete()
@user_cli.command("create")
@click.argument("email")
def create_user(email):
"""Create a user in the kratos database. The argument must be an unique
email address
:param email: string Email address of user to add
"""
current_app.logger.info(f"Creating user with email: ({email})")
# Create a user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if user:
current_app.logger.info("User already exists. Not recreating")
return
user = KratosUser(KRATOS_ADMIN)
user.email = email
user.save()
@user_cli.command("setpassword")
@click.argument("email")
@click.argument("password")
def setpassword_user(email, password):
"""Set a password for an account
:param email: email address of account to set a password for
:param password: password to be set
:return: true on success, false if not set (too weak)
:rtype: boolean
:raise: exception if unexepted error happens
"""
current_app.logger.info(f"Setting password for: ({email})")
# Kratos does not provide an interface to set a password directly. However
# we still want to be able to set a password. So we have to hack our way
# a bit around this. We do this by creating a recovery link though the
# admin interface (which is not e-mailed) and then follow the recovery
# flow in the public facing pages of kratos
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if kratos_user is None:
current_app.logger.error(f"User with email '{email}' not found")
return False
# Get a recovery URL
url = kratos_user.get_recovery_link()
# Execute UI sequence to set password, given we have a recovery URL
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
except Exception as error:
current_app.logger.error(f"Error while setting password: {error}")
return False
if result:
current_app.logger.info("Success setting password")
else:
current_app.logger.error("Failed to set password. Password too weak?")
return result
@user_cli.command("list")
def list_user():
"""Show a list of users in the database"""
current_app.logger.info("Listing users")
users = KratosUser.find_all(KRATOS_ADMIN)
for obj in users:
print(obj)
@user_cli.command("recover")
@click.argument("email")
def recover_user(email):
"""Get recovery link for a user, to manual update the user/use
:param email: Email address of the user
"""
current_app.logger.info(f"Trying to send recover email for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
# Get a recovery URL
url = kratos_user.get_recovery_link()
print(url)
except Exception as error:
current_app.logger.error(f"Error while getting reset link: {error}")
cli.cli.add_command(user_cli)