Added CLI commands

This commit is contained in:
Mart van Santen 2022-04-01 15:15:30 +08:00
parent 2d877d91cc
commit a4981c8c52
7 changed files with 350 additions and 5 deletions

3
app.py
View file

@ -8,11 +8,13 @@ from flask_sqlalchemy import SQLAlchemy
# These imports are required
from areas import api_v1
from areas import web
from areas import cli
from areas import users
from areas import apps
from areas import auth
from areas import login
from areas import cliapp
from database import db
@ -52,6 +54,7 @@ app.logger.setLevel(logging.INFO)
app.register_blueprint(api_v1)
app.register_blueprint(web)
app.register_blueprint(cli)
# Error handlers
app.register_error_handler(Exception, global_error)

View file

@ -2,7 +2,7 @@ from flask import Blueprint
api_v1 = Blueprint("api_v1", __name__, url_prefix="/api/v1")
web = Blueprint("web", __name__, url_prefix="/web")
# cli = Blueprint('cli', __name__)
cli = Blueprint('cli', __name__)
@api_v1.route("/")
@api_v1.route("/health")

2
areas/cliapp/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .cli import *

341
areas/cliapp/cli.py Normal file
View file

@ -0,0 +1,341 @@
"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and MariaDB for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
# Basic system imports
import logging
import os
import urllib.parse
import urllib.request
import click
# Hydra, OIDC Identity Provider
import hydra_client
# Kratos, Identity manager
import ory_kratos_client
#from exceptions import BackendError
# Flask
from flask import Flask, abort, redirect, render_template, request
from flask.cli import AppGroup
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from ory_kratos_client.api import v0alpha2_api as kratos_api
from areas import cli
from config import *
from flask import current_app
from helpers import (
BadRequest,
KratosError,
HydraError,
bad_request_error,
validation_error,
kratos_error,
global_error,
hydra_error,
KratosUser,
App,
AppRole
)
from database import db
# APIs
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
##############################################################################
# CLI INTERFACE #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup('user')
app_cli = AppGroup('app')
## CLI APP COMMANDS
@app_cli.command('create')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
"""Adds an app into the database
:param slug: str short name of the app
:param name: str name of the application
"""
current_app.logger.info(f"Creating app definition: {name} ({slug})")
obj = App()
obj.name = name
obj.slug = slug
db.session.add(obj)
db.session.commit()
@app_cli.command('list')
def list_app():
"""List all apps found in the database"""
current_app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print(f"App name: {obj.name} \t Slug: {obj.slug}")
@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
"""Removes app from database
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to delete app: {slug}")
obj = App.query.filter_by(slug=slug).first()
if not obj:
current_app.logger.info("Not found")
return
# Deleting will (probably) fail if there are still roles attached. This is a
# PoC implementation only. Actually management of apps and roles will be
# done by the backend application
db.session.delete(obj)
db.session.commit()
current_app.logger.info("Success")
return
cli.cli.add_command(app_cli)
## CLI USER COMMANDS
@user_cli.command("setrole")
@click.argument("email")
@click.argument("app_slug")
@click.argument("role")
def setrole(email, app_slug, role):
"""Set role for a sure
:param email: Email address of user to assign role
:param app_slug: Slug name of the app, for example 'nextcloud'
:param role: Role to assign. currently only 'admin', 'user'
"""
current_app.logger.info(f"Assiging role {role} to {email} for app {app_slug}")
# Find user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if role not in ("admin", "user"):
print("At this point only the roles 'admin' and 'user' are accepted")
return
if not user:
print("User not found. Abort")
return
app_obj = db.session.query(App).filter(App.slug == app_slug).first()
if not app_obj:
print("App not found. Abort.")
return
role_obj = (
db.session.query(AppRole)
.filter(AppRole.app_id == app_obj.id)
.filter(AppRole.user_id == user.uuid)
.first()
)
if role_obj:
db.session.delete(role_obj)
obj = AppRole()
obj.user_id = user.uuid
obj.app_id = app_obj.id
obj.role = role
db.session.add(obj)
db.session.commit()
@user_cli.command("show")
@click.argument("email")
def show_user(email):
"""Show user details. Output a table with the user and details about the
internal state/values of the user object
:param email: Email address of the user to show
"""
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
print(user)
print("")
print(f"UUID: {user.uuid}")
print(f"Username: {user.username}")
print(f"Updated: {user.updated_at}")
print(f"Created: {user.created_at}")
print(f"State: {user.state}")
@user_cli.command('update')
@click.argument('email')
@click.argument('field')
@click.argument('value')
def update_user(email, field, value):
"""Update an user object. It can modify email and name currently
:param email: Email address of user to update
:param field: Field to update, supported [name|email]
:param value: The value to set the field with
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
if field == 'name':
user.name = value
elif field == 'email':
user.email = value
else:
current_app.logger.error(f"Field not found: {field}")
user.save()
@user_cli.command('delete')
@click.argument('email')
def delete_user(email):
"""Delete an user from the database
:param email: Email address of user to delete
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
return
user.delete()
@user_cli.command('create')
@click.argument('email')
def create_user(email):
"""Create a user in the kratos database. The argument must be an unique
email address
:param email: string Email address of user to add
"""
current_app.logger.info(f"Creating user with email: ({email})")
# Create a user
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if user:
current_app.logger.info("User already exists. Not recreating")
return
user = KratosUser(KRATOS_ADMIN)
user.email = email
user.save()
@user_cli.command('setpassword')
@click.argument('email')
@click.argument('password')
def setpassword_user(email, password):
"""Set a password for an account
:param email: email address of account to set a password for
:param password: password to be set
:return: true on success, false if not set (too weak)
:rtype: boolean
:raise: exception if unexepted error happens
"""
current_app.logger.info(f"Setting password for: ({email})")
# Kratos does not provide an interface to set a password directly. However
# we still want to be able to set a password. So we have to hack our way
# a bit around this. We do this by creating a recovery link though the
# admin interface (which is not e-mailed) and then follow the recovery
# flow in the public facing pages of kratos
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if kratos_user is None:
current_app.logger.error(f"User with email '{email}' not found")
return False
# Get a recovery URL
url = kratos_user.get_recovery_link()
# Execute UI sequence to set password, given we have a recovery URL
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
except Exception as error:
current_app.logger.error(f"Error while setting password: {error}")
return False
if result:
current_app.logger.info("Success setting password")
else:
current_app.logger.error("Failed to set password. Password too weak?")
return result
@user_cli.command('list')
def list_user():
"""Show a list of users in the database"""
current_app.logger.info("Listing users")
users = KratosUser.find_all(KRATOS_ADMIN)
for obj in users:
print(obj)
@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
"""Get recovery link for a user, to manual update the user/use
:param email: Email address of the user
"""
current_app.logger.info(f"Trying to send recover email for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
# Get a recovery URL
url = kratos_user.get_recovery_link()
print(url)
except BackendError as error:
current_app.logger.error(f"Error while getting reset link: {error}")
cli.cli.add_command(user_cli)

View file

@ -17,7 +17,6 @@ import hydra_client
# Kratos, Identity manager
import ory_kratos_client
#from exceptions import BackendError
# Flask
from flask import Flask, abort, redirect, render_template, request

View file

@ -14,7 +14,7 @@ from flask_sqlalchemy import SQLAlchemy
# from sqlalchemy.orm import relationship
from sqlalchemy import ForeignKey, Integer, String
db = SQLAlchemy()
from database import db
# Pylint complains about too-few-public-methods. Methods will be added once
# this is implemented.

View file

@ -33,7 +33,7 @@ export KRATOS_PUBLIC_URL=http://localhost/kratos
export KRATOS_ADMIN_URL=http://localhost:8000
export HYDRA_ADMIN_URL=http://localhost:4445
export PUBLIC_URL=http://localhost/web/
export DATABASE_URL="mysql+pymysql://stackspin:stackspin@localhost/stackspin?charset=utf8mb4"
#export DATABASE_URL="mysql+pymysql://stackspin:stackspin@localhost/stackspin?charset=utf8mb4"
export DATABASE_URL="mysql+pymysql://stackspin:IRvqAzhKMEdIBUUAWulIfZJLQgclLQDm@localhost/stackspin"
flask run