dashboard/backend/cliapp/cliapp/cli.py

417 lines
12 KiB
Python

"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and MariaDB for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
import click
import hydra_client
import ory_kratos_client
from flask import current_app
from flask.cli import AppGroup
from ory_kratos_client.api import v0alpha2_api as kratos_api
from sqlalchemy import func
from config import HYDRA_ADMIN_URL, KRATOS_ADMIN_URL, KRATOS_PUBLIC_URL
from helpers import KratosUser
from cliapp import cli
from areas.roles import Role
from areas.apps import AppRole, App
from database import db
# APIs
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
kratos_admin_api_configuration = \
ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
KRATOS_ADMIN = \
kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_admin_api_configuration))
kratos_public_api_configuration = \
ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL, discard_unknown_keys=True)
KRATOS_PUBLIC = \
kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_public_api_configuration))
##############################################################################
# CLI INTERFACE #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup("user")
app_cli = AppGroup("app")
## CLI APP COMMANDS
@app_cli.command("create")
@click.argument("slug")
@click.argument("name")
@click.argument("external-url", required=False)
def create_app(slug, name, external_url = None):
"""Adds an app into the database
:param slug: str short name of the app
:param name: str name of the application
:param extenal-url: if set, it marks this as an external app and
configures the url
"""
current_app.logger.info(f"Creating app definition: {name} ({slug}")
obj = App()
obj.name = name
obj.slug = slug
app_obj = App.query.filter_by(slug=slug).first()
if app_obj:
current_app.logger.info(f"App definition: {name} ({slug}) already exists in database")
return
if (external_url):
obj.external = True
obj.url = external_url
db.session.add(obj)
db.session.commit()
current_app.logger.info(f"App definition: {name} ({slug}) created")
@app_cli.command("list")
def list_app():
"""List all apps found in the database"""
current_app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print(f"App name: {obj.name}\tSlug: {obj.slug},\tURL: {obj.get_url()}\tStatus: {obj.get_status()}")
@app_cli.command(
"delete",
)
@click.argument("slug")
def delete_app(slug):
"""Removes app from database
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to delete app: {slug}")
app_obj = App.query.filter_by(slug=slug).first()
if not app_obj:
current_app.logger.info("Not found")
return
app_status = app_obj.get_status()
if app_status.installed and not app_obj.external:
current_app.logger.info("Can not delete installed application, run"
" 'uninstall' first")
return
app_obj.delete()
current_app.logger.info("Success.")
@app_cli.command(
"uninstall",
)
@click.argument("slug")
def uninstall_app(slug):
"""Uninstalls the app from the cluster
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to uninstall app: {slug}")
app_obj = App.query.filter_by(slug=slug).first()
if not app_obj:
current_app.logger.info("Not found")
return
app_obj.uninstall()
current_app.logger.info("Success.")
return
@app_cli.command("status")
@click.argument("slug")
def status_app(slug):
"""Gets the current app status from the Kubernetes cluster
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Getting status for app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
return
current_app.logger.info(app.get_status())
@app_cli.command("install")
@click.argument("slug")
def install_app(slug):
"""Installs app into Kubernetes cluster
:param slug: str Slug of app to install
"""
current_app.logger.info(f"Installing app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
return
if app.external:
current_app.logger.info(
f"App {slug} is an external app and can not be provisioned automatically")
return
current_status = app.get_status()
if not current_status.installed:
app.install()
current_app.logger.info(
f"App {slug} installing... use `status` to see status")
else:
current_app.logger.error(f"App {slug} is already installed")
@app_cli.command("roles")
@click.argument("slug")
def roles_app(slug):
"""Gets a list of roles for this app
:param slug: str Slug of app queried
"""
current_app.logger.info(f"Getting roles for app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
return
current_app.logger.info("Roles: ")
for role in app.roles:
current_app.logger.info(role)
cli.cli.add_command(app_cli)
## CLI USER COMMANDS
@user_cli.command("setrole")
@click.argument("email")
@click.argument("app_slug")
@click.argument("role")
def setrole(email, app_slug, role):
"""Set role for a user
:param email: Email address of user to assign role
:param app_slug: Slug name of the app, for example 'nextcloud'
:param role: Role to assign. currently only 'admin', 'user'
"""
current_app.logger.info(f"Assigning role {role} to {email} for app {app_slug}")
# Find user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if role not in ("admin", "user"):
print("At this point only the roles 'admin' and 'user' are accepted")
return
if not user:
print("User not found. Abort")
return
app_obj = db.session.query(App).filter(App.slug == app_slug).first()
if not app_obj:
print("App not found. Abort.")
return
role_obj = (
db.session.query(AppRole)
.filter(AppRole.app_id == app_obj.id)
.filter(AppRole.user_id == user.uuid)
.first()
)
if role_obj:
db.session.delete(role_obj)
role = Role.query.filter(func.lower(Role.name) == func.lower(role)).first()
obj = AppRole()
obj.user_id = user.uuid
obj.app_id = app_obj.id
obj.role_id = role.id if role else None
db.session.add(obj)
db.session.commit()
@user_cli.command("show")
@click.argument("email")
def show_user(email):
"""Show user details. Output a table with the user and details about the
internal state/values of the user object
:param email: Email address of the user to show
"""
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if user is not None:
print(user)
print("")
print(f"UUID: {user.uuid}")
print(f"Username: {user.username}")
print(f"Updated: {user.updated_at}")
print(f"Created: {user.created_at}")
print(f"State: {user.state}")
print(f"Roles:")
results = db.session.query(AppRole, Role).join(App, Role)\
.add_entity(App).add_entity(Role)\
.filter(AppRole.user_id == user.uuid)
for entry in results:
app = entry[-2]
role = entry[-1]
print(f" {role.name: >9} on {app.name}")
else:
print(f"User with email address '{email}' was not found")
@user_cli.command("update")
@click.argument("email")
@click.argument("field")
@click.argument("value")
def update_user(email, field, value):
"""Update an user object. It can modify email and name currently
:param email: Email address of user to update
:param field: Field to update, supported [name|email]
:param value: The value to set the field with
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
if field == "name":
user.name = value
elif field == "email":
user.email = value
else:
current_app.logger.error(f"Field not found: {field}")
user.save()
@user_cli.command("delete")
@click.argument("email")
def delete_user(email):
"""Delete an user from the database
:param email: Email address of user to delete
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
user.delete()
@user_cli.command("create")
@click.argument("email")
def create_user(email):
"""Create a user in the kratos database. The argument must be an unique
email address
:param email: string Email address of user to add
"""
current_app.logger.info(f"Creating user with email: ({email})")
# Create a user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if user:
current_app.logger.info("User already exists. Not recreating")
return
user = KratosUser(KRATOS_ADMIN)
user.email = email
user.save()
@user_cli.command("setpassword")
@click.argument("email")
@click.argument("password")
def setpassword_user(email, password):
"""Set a password for an account
:param email: email address of account to set a password for
:param password: password to be set
:return: true on success, false if not set (too weak)
:rtype: boolean
:raise: exception if unexepted error happens
"""
current_app.logger.info(f"Setting password for: ({email})")
# Kratos does not provide an interface to set a password directly. However
# we still want to be able to set a password. So we have to hack our way
# a bit around this. We do this by creating a recovery link though the
# admin interface (which is not e-mailed) and then follow the recovery
# flow in the public facing pages of kratos
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if kratos_user is None:
current_app.logger.error(f"User with email '{email}' not found")
return False
# Get a recovery URL
url = kratos_user.get_recovery_link()
# Execute UI sequence to set password, given we have a recovery URL
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while setting password: {error}")
return False
if result:
current_app.logger.info("Success setting password")
else:
current_app.logger.error("Failed to set password. Password too weak?")
return result
@user_cli.command("list")
def list_user():
"""Show a list of users in the database"""
current_app.logger.info("Listing users")
users = KratosUser.find_all(KRATOS_ADMIN)
for obj in users:
print(obj)
@user_cli.command("recover")
@click.argument("email")
def recover_user(email):
"""Get recovery link for a user, to manual update the user/use
:param email: Email address of the user
"""
current_app.logger.info(f"Trying to send recover email for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
# Get a recovery URL
url = kratos_user.get_recovery_link()
print(url)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while getting reset link: {error}")
cli.cli.add_command(user_cli)