340 lines
9.3 KiB
Python
340 lines
9.3 KiB
Python
|
|
||
|
"""Flask application which provides the interface of a login panel. The
|
||
|
application interacts with different backend, like the Kratos backend for users,
|
||
|
Hydra for OIDC sessions and MariaDB for application and role specifications.
|
||
|
The application provides also several command line options to interact with
|
||
|
the user entries in the database(s)"""
|
||
|
|
||
|
|
||
|
# Basic system imports
|
||
|
import logging
|
||
|
import os
|
||
|
import urllib.parse
|
||
|
import urllib.request
|
||
|
|
||
|
import click
|
||
|
|
||
|
# Hydra, OIDC Identity Provider
|
||
|
import hydra_client
|
||
|
|
||
|
# Kratos, Identity manager
|
||
|
import ory_kratos_client
|
||
|
#from exceptions import BackendError
|
||
|
|
||
|
# Flask
|
||
|
from flask import Flask, abort, redirect, render_template, request
|
||
|
from flask.cli import AppGroup
|
||
|
from ory_kratos_client.api import v0alpha2_api as kratos_api
|
||
|
|
||
|
from areas import cli
|
||
|
from config import *
|
||
|
from flask import current_app
|
||
|
|
||
|
from helpers import (
|
||
|
BadRequest,
|
||
|
KratosError,
|
||
|
HydraError,
|
||
|
bad_request_error,
|
||
|
validation_error,
|
||
|
kratos_error,
|
||
|
global_error,
|
||
|
hydra_error,
|
||
|
KratosUser,
|
||
|
App,
|
||
|
AppRole
|
||
|
)
|
||
|
|
||
|
from database import db
|
||
|
|
||
|
# APIs
|
||
|
# Create HYDRA & KRATOS API interfaces
|
||
|
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
|
||
|
|
||
|
# Kratos has an admin and public end-point. We create an API for them
|
||
|
# both. The kratos implementation has bugs, which forces us to set
|
||
|
# the discard_unknown_keys to True.
|
||
|
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
|
||
|
discard_unknown_keys= True)
|
||
|
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||
|
|
||
|
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
|
||
|
discard_unknown_keys = True)
|
||
|
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||
|
|
||
|
##############################################################################
|
||
|
# CLI INTERFACE #
|
||
|
##############################################################################
|
||
|
# Define Flask CLI command groups and commands
|
||
|
user_cli = AppGroup('user')
|
||
|
app_cli = AppGroup('app')
|
||
|
|
||
|
## CLI APP COMMANDS
|
||
|
|
||
|
@app_cli.command('create')
|
||
|
@click.argument('slug')
|
||
|
@click.argument('name')
|
||
|
def create_app(slug, name):
|
||
|
"""Adds an app into the database
|
||
|
:param slug: str short name of the app
|
||
|
:param name: str name of the application
|
||
|
"""
|
||
|
current_app.logger.info(f"Creating app definition: {name} ({slug})")
|
||
|
|
||
|
obj = App()
|
||
|
obj.name = name
|
||
|
obj.slug = slug
|
||
|
|
||
|
db.session.add(obj)
|
||
|
db.session.commit()
|
||
|
|
||
|
|
||
|
|
||
|
@app_cli.command('list')
|
||
|
def list_app():
|
||
|
"""List all apps found in the database"""
|
||
|
current_app.logger.info("Listing configured apps")
|
||
|
apps = App.query.all()
|
||
|
|
||
|
for obj in apps:
|
||
|
print(f"App name: {obj.name} \t Slug: {obj.slug}")
|
||
|
|
||
|
|
||
|
@app_cli.command('delete',)
|
||
|
@click.argument('slug')
|
||
|
def delete_app(slug):
|
||
|
"""Removes app from database
|
||
|
:param slug: str Slug of app to remove
|
||
|
"""
|
||
|
current_app.logger.info(f"Trying to delete app: {slug}")
|
||
|
obj = App.query.filter_by(slug=slug).first()
|
||
|
|
||
|
if not obj:
|
||
|
current_app.logger.info("Not found")
|
||
|
return
|
||
|
|
||
|
|
||
|
# Deleting will (probably) fail if there are still roles attached. This is a
|
||
|
# PoC implementation only. Actually management of apps and roles will be
|
||
|
# done by the backend application
|
||
|
db.session.delete(obj)
|
||
|
db.session.commit()
|
||
|
current_app.logger.info("Success")
|
||
|
return
|
||
|
|
||
|
|
||
|
cli.cli.add_command(app_cli)
|
||
|
|
||
|
|
||
|
## CLI USER COMMANDS
|
||
|
@user_cli.command("setrole")
|
||
|
@click.argument("email")
|
||
|
@click.argument("app_slug")
|
||
|
@click.argument("role")
|
||
|
def setrole(email, app_slug, role):
|
||
|
"""Set role for a sure
|
||
|
:param email: Email address of user to assign role
|
||
|
:param app_slug: Slug name of the app, for example 'nextcloud'
|
||
|
:param role: Role to assign. currently only 'admin', 'user'
|
||
|
"""
|
||
|
|
||
|
current_app.logger.info(f"Assiging role {role} to {email} for app {app_slug}")
|
||
|
|
||
|
# Find user
|
||
|
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
|
||
|
if role not in ("admin", "user"):
|
||
|
print("At this point only the roles 'admin' and 'user' are accepted")
|
||
|
return
|
||
|
|
||
|
if not user:
|
||
|
print("User not found. Abort")
|
||
|
return
|
||
|
|
||
|
app_obj = db.session.query(App).filter(App.slug == app_slug).first()
|
||
|
if not app_obj:
|
||
|
print("App not found. Abort.")
|
||
|
return
|
||
|
|
||
|
role_obj = (
|
||
|
db.session.query(AppRole)
|
||
|
.filter(AppRole.app_id == app_obj.id)
|
||
|
.filter(AppRole.user_id == user.uuid)
|
||
|
.first()
|
||
|
)
|
||
|
|
||
|
if role_obj:
|
||
|
db.session.delete(role_obj)
|
||
|
|
||
|
obj = AppRole()
|
||
|
obj.user_id = user.uuid
|
||
|
obj.app_id = app_obj.id
|
||
|
obj.role = role
|
||
|
|
||
|
db.session.add(obj)
|
||
|
db.session.commit()
|
||
|
|
||
|
|
||
|
@user_cli.command("show")
|
||
|
@click.argument("email")
|
||
|
def show_user(email):
|
||
|
"""Show user details. Output a table with the user and details about the
|
||
|
internal state/values of the user object
|
||
|
:param email: Email address of the user to show
|
||
|
"""
|
||
|
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
print(user)
|
||
|
print("")
|
||
|
print(f"UUID: {user.uuid}")
|
||
|
print(f"Username: {user.username}")
|
||
|
print(f"Updated: {user.updated_at}")
|
||
|
print(f"Created: {user.created_at}")
|
||
|
print(f"State: {user.state}")
|
||
|
|
||
|
@user_cli.command('update')
|
||
|
@click.argument('email')
|
||
|
@click.argument('field')
|
||
|
@click.argument('value')
|
||
|
def update_user(email, field, value):
|
||
|
"""Update an user object. It can modify email and name currently
|
||
|
:param email: Email address of user to update
|
||
|
:param field: Field to update, supported [name|email]
|
||
|
:param value: The value to set the field with
|
||
|
"""
|
||
|
current_app.logger.info(f"Looking for user with email: {email}")
|
||
|
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
if not user:
|
||
|
current_app.logger.error(f"User with email {email} not found.")
|
||
|
return
|
||
|
|
||
|
if field == 'name':
|
||
|
user.name = value
|
||
|
elif field == 'email':
|
||
|
user.email = value
|
||
|
else:
|
||
|
current_app.logger.error(f"Field not found: {field}")
|
||
|
|
||
|
user.save()
|
||
|
|
||
|
|
||
|
@user_cli.command('delete')
|
||
|
@click.argument('email')
|
||
|
def delete_user(email):
|
||
|
"""Delete an user from the database
|
||
|
:param email: Email address of user to delete
|
||
|
"""
|
||
|
current_app.logger.info(f"Looking for user with email: {email}")
|
||
|
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
if not user:
|
||
|
current_app.logger.error(f"User with email {email} not found.")
|
||
|
return
|
||
|
user.delete()
|
||
|
|
||
|
|
||
|
|
||
|
@user_cli.command('create')
|
||
|
@click.argument('email')
|
||
|
def create_user(email):
|
||
|
"""Create a user in the kratos database. The argument must be an unique
|
||
|
email address
|
||
|
:param email: string Email address of user to add
|
||
|
"""
|
||
|
current_app.logger.info(f"Creating user with email: ({email})")
|
||
|
|
||
|
# Create a user
|
||
|
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
if user:
|
||
|
current_app.logger.info("User already exists. Not recreating")
|
||
|
return
|
||
|
|
||
|
user = KratosUser(KRATOS_ADMIN)
|
||
|
user.email = email
|
||
|
user.save()
|
||
|
|
||
|
@user_cli.command('setpassword')
|
||
|
@click.argument('email')
|
||
|
@click.argument('password')
|
||
|
def setpassword_user(email, password):
|
||
|
"""Set a password for an account
|
||
|
:param email: email address of account to set a password for
|
||
|
:param password: password to be set
|
||
|
:return: true on success, false if not set (too weak)
|
||
|
:rtype: boolean
|
||
|
:raise: exception if unexepted error happens
|
||
|
"""
|
||
|
|
||
|
current_app.logger.info(f"Setting password for: ({email})")
|
||
|
|
||
|
# Kratos does not provide an interface to set a password directly. However
|
||
|
# we still want to be able to set a password. So we have to hack our way
|
||
|
# a bit around this. We do this by creating a recovery link though the
|
||
|
# admin interface (which is not e-mailed) and then follow the recovery
|
||
|
# flow in the public facing pages of kratos
|
||
|
|
||
|
try:
|
||
|
# Get the ID of the user
|
||
|
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
if kratos_user is None:
|
||
|
current_app.logger.error(f"User with email '{email}' not found")
|
||
|
return False
|
||
|
|
||
|
|
||
|
# Get a recovery URL
|
||
|
url = kratos_user.get_recovery_link()
|
||
|
|
||
|
# Execute UI sequence to set password, given we have a recovery URL
|
||
|
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
|
||
|
|
||
|
except Exception as error:
|
||
|
current_app.logger.error(f"Error while setting password: {error}")
|
||
|
return False
|
||
|
|
||
|
if result:
|
||
|
current_app.logger.info("Success setting password")
|
||
|
else:
|
||
|
current_app.logger.error("Failed to set password. Password too weak?")
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
|
||
|
@user_cli.command('list')
|
||
|
def list_user():
|
||
|
"""Show a list of users in the database"""
|
||
|
current_app.logger.info("Listing users")
|
||
|
users = KratosUser.find_all(KRATOS_ADMIN)
|
||
|
|
||
|
for obj in users:
|
||
|
print(obj)
|
||
|
|
||
|
|
||
|
@user_cli.command('recover')
|
||
|
@click.argument('email')
|
||
|
def recover_user(email):
|
||
|
"""Get recovery link for a user, to manual update the user/use
|
||
|
:param email: Email address of the user
|
||
|
"""
|
||
|
|
||
|
current_app.logger.info(f"Trying to send recover email for user: {email}")
|
||
|
|
||
|
try:
|
||
|
# Get the ID of the user
|
||
|
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||
|
|
||
|
# Get a recovery URL
|
||
|
url = kratos_user.get_recovery_link()
|
||
|
|
||
|
print(url)
|
||
|
except BackendError as error:
|
||
|
current_app.logger.error(f"Error while getting reset link: {error}")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
cli.cli.add_command(user_cli)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|