Refactor integrations of sso
This commit is contained in:
parent
ce5a7d05ac
commit
f377b4ce45
46 changed files with 154 additions and 249 deletions
|
|
@ -1,8 +1,7 @@
|
|||
from flask import Blueprint
|
||||
|
||||
api_v1 = Blueprint("api_v1", __name__, url_prefix="/api/v1")
|
||||
web = Blueprint("web", __name__, url_prefix="/web")
|
||||
cli = Blueprint('cli', __name__)
|
||||
|
||||
|
||||
@api_v1.route("/")
|
||||
@api_v1.route("/health")
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
from .apps import *
|
||||
from .apps import *
|
||||
from .models import *
|
||||
|
|
|
|||
29
areas/apps/models.py
Normal file
29
areas/apps/models.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
from sqlalchemy import ForeignKey, Integer, String
|
||||
from database import db
|
||||
|
||||
|
||||
class App(db.Model):
|
||||
"""
|
||||
The App object, interact with the App database object. Data is stored in
|
||||
the local database.
|
||||
"""
|
||||
|
||||
id = db.Column(Integer, primary_key=True)
|
||||
name = db.Column(String(length=64))
|
||||
slug = db.Column(String(length=64), unique=True)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.id} <{self.name}>"
|
||||
|
||||
|
||||
class AppRole(db.Model):
|
||||
"""
|
||||
The AppRole object, stores the roles Users have on Apps
|
||||
"""
|
||||
|
||||
user_id = db.Column(String(length=64), primary_key=True)
|
||||
app_id = db.Column(Integer, ForeignKey("app.id"), primary_key=True)
|
||||
role = db.Column(String(length=64))
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.role} for {self.user_id} on {self.app_id}"
|
||||
|
|
@ -1,2 +0,0 @@
|
|||
|
||||
from .cli import *
|
||||
|
|
@ -1,339 +0,0 @@
|
|||
|
||||
"""Flask application which provides the interface of a login panel. The
|
||||
application interacts with different backend, like the Kratos backend for users,
|
||||
Hydra for OIDC sessions and MariaDB for application and role specifications.
|
||||
The application provides also several command line options to interact with
|
||||
the user entries in the database(s)"""
|
||||
|
||||
|
||||
# Basic system imports
|
||||
import logging
|
||||
import os
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
import click
|
||||
|
||||
# Hydra, OIDC Identity Provider
|
||||
import hydra_client
|
||||
|
||||
# Kratos, Identity manager
|
||||
import ory_kratos_client
|
||||
#from exceptions import BackendError
|
||||
|
||||
# Flask
|
||||
from flask import Flask, abort, redirect, render_template, request
|
||||
from flask.cli import AppGroup
|
||||
from ory_kratos_client.api import v0alpha2_api as kratos_api
|
||||
|
||||
from areas import cli
|
||||
from config import *
|
||||
from flask import current_app
|
||||
|
||||
from helpers import (
|
||||
BadRequest,
|
||||
KratosError,
|
||||
HydraError,
|
||||
bad_request_error,
|
||||
validation_error,
|
||||
kratos_error,
|
||||
global_error,
|
||||
hydra_error,
|
||||
KratosUser,
|
||||
App,
|
||||
AppRole
|
||||
)
|
||||
|
||||
from database import db
|
||||
|
||||
# APIs
|
||||
# Create HYDRA & KRATOS API interfaces
|
||||
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
|
||||
|
||||
# Kratos has an admin and public end-point. We create an API for them
|
||||
# both. The kratos implementation has bugs, which forces us to set
|
||||
# the discard_unknown_keys to True.
|
||||
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
|
||||
discard_unknown_keys= True)
|
||||
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||||
|
||||
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
|
||||
discard_unknown_keys = True)
|
||||
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||||
|
||||
##############################################################################
|
||||
# CLI INTERFACE #
|
||||
##############################################################################
|
||||
# Define Flask CLI command groups and commands
|
||||
user_cli = AppGroup('user')
|
||||
app_cli = AppGroup('app')
|
||||
|
||||
## CLI APP COMMANDS
|
||||
|
||||
@app_cli.command('create')
|
||||
@click.argument('slug')
|
||||
@click.argument('name')
|
||||
def create_app(slug, name):
|
||||
"""Adds an app into the database
|
||||
:param slug: str short name of the app
|
||||
:param name: str name of the application
|
||||
"""
|
||||
current_app.logger.info(f"Creating app definition: {name} ({slug})")
|
||||
|
||||
obj = App()
|
||||
obj.name = name
|
||||
obj.slug = slug
|
||||
|
||||
db.session.add(obj)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
|
||||
@app_cli.command('list')
|
||||
def list_app():
|
||||
"""List all apps found in the database"""
|
||||
current_app.logger.info("Listing configured apps")
|
||||
apps = App.query.all()
|
||||
|
||||
for obj in apps:
|
||||
print(f"App name: {obj.name} \t Slug: {obj.slug}")
|
||||
|
||||
|
||||
@app_cli.command('delete',)
|
||||
@click.argument('slug')
|
||||
def delete_app(slug):
|
||||
"""Removes app from database
|
||||
:param slug: str Slug of app to remove
|
||||
"""
|
||||
current_app.logger.info(f"Trying to delete app: {slug}")
|
||||
obj = App.query.filter_by(slug=slug).first()
|
||||
|
||||
if not obj:
|
||||
current_app.logger.info("Not found")
|
||||
return
|
||||
|
||||
|
||||
# Deleting will (probably) fail if there are still roles attached. This is a
|
||||
# PoC implementation only. Actually management of apps and roles will be
|
||||
# done by the backend application
|
||||
db.session.delete(obj)
|
||||
db.session.commit()
|
||||
current_app.logger.info("Success")
|
||||
return
|
||||
|
||||
|
||||
cli.cli.add_command(app_cli)
|
||||
|
||||
|
||||
## CLI USER COMMANDS
|
||||
@user_cli.command("setrole")
|
||||
@click.argument("email")
|
||||
@click.argument("app_slug")
|
||||
@click.argument("role")
|
||||
def setrole(email, app_slug, role):
|
||||
"""Set role for a sure
|
||||
:param email: Email address of user to assign role
|
||||
:param app_slug: Slug name of the app, for example 'nextcloud'
|
||||
:param role: Role to assign. currently only 'admin', 'user'
|
||||
"""
|
||||
|
||||
current_app.logger.info(f"Assiging role {role} to {email} for app {app_slug}")
|
||||
|
||||
# Find user
|
||||
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
|
||||
if role not in ("admin", "user"):
|
||||
print("At this point only the roles 'admin' and 'user' are accepted")
|
||||
return
|
||||
|
||||
if not user:
|
||||
print("User not found. Abort")
|
||||
return
|
||||
|
||||
app_obj = db.session.query(App).filter(App.slug == app_slug).first()
|
||||
if not app_obj:
|
||||
print("App not found. Abort.")
|
||||
return
|
||||
|
||||
role_obj = (
|
||||
db.session.query(AppRole)
|
||||
.filter(AppRole.app_id == app_obj.id)
|
||||
.filter(AppRole.user_id == user.uuid)
|
||||
.first()
|
||||
)
|
||||
|
||||
if role_obj:
|
||||
db.session.delete(role_obj)
|
||||
|
||||
obj = AppRole()
|
||||
obj.user_id = user.uuid
|
||||
obj.app_id = app_obj.id
|
||||
obj.role = role
|
||||
|
||||
db.session.add(obj)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@user_cli.command("show")
|
||||
@click.argument("email")
|
||||
def show_user(email):
|
||||
"""Show user details. Output a table with the user and details about the
|
||||
internal state/values of the user object
|
||||
:param email: Email address of the user to show
|
||||
"""
|
||||
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
print(user)
|
||||
print("")
|
||||
print(f"UUID: {user.uuid}")
|
||||
print(f"Username: {user.username}")
|
||||
print(f"Updated: {user.updated_at}")
|
||||
print(f"Created: {user.created_at}")
|
||||
print(f"State: {user.state}")
|
||||
|
||||
@user_cli.command('update')
|
||||
@click.argument('email')
|
||||
@click.argument('field')
|
||||
@click.argument('value')
|
||||
def update_user(email, field, value):
|
||||
"""Update an user object. It can modify email and name currently
|
||||
:param email: Email address of user to update
|
||||
:param field: Field to update, supported [name|email]
|
||||
:param value: The value to set the field with
|
||||
"""
|
||||
current_app.logger.info(f"Looking for user with email: {email}")
|
||||
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
if not user:
|
||||
current_app.logger.error(f"User with email {email} not found.")
|
||||
return
|
||||
|
||||
if field == 'name':
|
||||
user.name = value
|
||||
elif field == 'email':
|
||||
user.email = value
|
||||
else:
|
||||
current_app.logger.error(f"Field not found: {field}")
|
||||
|
||||
user.save()
|
||||
|
||||
|
||||
@user_cli.command('delete')
|
||||
@click.argument('email')
|
||||
def delete_user(email):
|
||||
"""Delete an user from the database
|
||||
:param email: Email address of user to delete
|
||||
"""
|
||||
current_app.logger.info(f"Looking for user with email: {email}")
|
||||
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
if not user:
|
||||
current_app.logger.error(f"User with email {email} not found.")
|
||||
return
|
||||
user.delete()
|
||||
|
||||
|
||||
|
||||
@user_cli.command('create')
|
||||
@click.argument('email')
|
||||
def create_user(email):
|
||||
"""Create a user in the kratos database. The argument must be an unique
|
||||
email address
|
||||
:param email: string Email address of user to add
|
||||
"""
|
||||
current_app.logger.info(f"Creating user with email: ({email})")
|
||||
|
||||
# Create a user
|
||||
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
if user:
|
||||
current_app.logger.info("User already exists. Not recreating")
|
||||
return
|
||||
|
||||
user = KratosUser(KRATOS_ADMIN)
|
||||
user.email = email
|
||||
user.save()
|
||||
|
||||
@user_cli.command('setpassword')
|
||||
@click.argument('email')
|
||||
@click.argument('password')
|
||||
def setpassword_user(email, password):
|
||||
"""Set a password for an account
|
||||
:param email: email address of account to set a password for
|
||||
:param password: password to be set
|
||||
:return: true on success, false if not set (too weak)
|
||||
:rtype: boolean
|
||||
:raise: exception if unexepted error happens
|
||||
"""
|
||||
|
||||
current_app.logger.info(f"Setting password for: ({email})")
|
||||
|
||||
# Kratos does not provide an interface to set a password directly. However
|
||||
# we still want to be able to set a password. So we have to hack our way
|
||||
# a bit around this. We do this by creating a recovery link though the
|
||||
# admin interface (which is not e-mailed) and then follow the recovery
|
||||
# flow in the public facing pages of kratos
|
||||
|
||||
try:
|
||||
# Get the ID of the user
|
||||
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
if kratos_user is None:
|
||||
current_app.logger.error(f"User with email '{email}' not found")
|
||||
return False
|
||||
|
||||
|
||||
# Get a recovery URL
|
||||
url = kratos_user.get_recovery_link()
|
||||
|
||||
# Execute UI sequence to set password, given we have a recovery URL
|
||||
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
|
||||
|
||||
except Exception as error:
|
||||
current_app.logger.error(f"Error while setting password: {error}")
|
||||
return False
|
||||
|
||||
if result:
|
||||
current_app.logger.info("Success setting password")
|
||||
else:
|
||||
current_app.logger.error("Failed to set password. Password too weak?")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
||||
@user_cli.command('list')
|
||||
def list_user():
|
||||
"""Show a list of users in the database"""
|
||||
current_app.logger.info("Listing users")
|
||||
users = KratosUser.find_all(KRATOS_ADMIN)
|
||||
|
||||
for obj in users:
|
||||
print(obj)
|
||||
|
||||
|
||||
@user_cli.command('recover')
|
||||
@click.argument('email')
|
||||
def recover_user(email):
|
||||
"""Get recovery link for a user, to manual update the user/use
|
||||
:param email: Email address of the user
|
||||
"""
|
||||
|
||||
current_app.logger.info(f"Trying to send recover email for user: {email}")
|
||||
|
||||
try:
|
||||
# Get the ID of the user
|
||||
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
|
||||
|
||||
# Get a recovery URL
|
||||
url = kratos_user.get_recovery_link()
|
||||
|
||||
print(url)
|
||||
except BackendError as error:
|
||||
current_app.logger.error(f"Error while getting reset link: {error}")
|
||||
|
||||
|
||||
|
||||
|
||||
cli.cli.add_command(user_cli)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1 +0,0 @@
|
|||
from .login import *
|
||||
|
|
@ -1,341 +0,0 @@
|
|||
|
||||
"""Flask application which provides the interface of a login panel. The
|
||||
application interacts with different backend, like the Kratos backend for users,
|
||||
Hydra for OIDC sessions and MariaDB for application and role specifications.
|
||||
The application provides also several command line options to interact with
|
||||
the user entries in the database(s)"""
|
||||
|
||||
|
||||
# Basic system imports
|
||||
import logging
|
||||
import os
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
# Hydra, OIDC Identity Provider
|
||||
import hydra_client
|
||||
|
||||
# Kratos, Identity manager
|
||||
import ory_kratos_client
|
||||
|
||||
# Flask
|
||||
from flask import Flask, abort, redirect, render_template, request
|
||||
from flask_migrate import Migrate
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
#from kratos import KratosUser
|
||||
from ory_kratos_client.api import v0alpha2_api as kratos_api
|
||||
|
||||
# Import modules for external APIs
|
||||
|
||||
from areas import web
|
||||
from config import *
|
||||
from flask import current_app
|
||||
|
||||
from helpers import (
|
||||
BadRequest,
|
||||
KratosError,
|
||||
HydraError,
|
||||
bad_request_error,
|
||||
validation_error,
|
||||
kratos_error,
|
||||
global_error,
|
||||
hydra_error,
|
||||
KratosUser,
|
||||
App,
|
||||
AppRole
|
||||
)
|
||||
|
||||
import ast
|
||||
# This is a circular import and should be solved differently
|
||||
#from app import db
|
||||
from database import db
|
||||
|
||||
# APIs
|
||||
# Create HYDRA & KRATOS API interfaces
|
||||
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
|
||||
|
||||
# Kratos has an admin and public end-point. We create an API for them
|
||||
# both. The kratos implementation has bugs, which forces us to set
|
||||
# the discard_unknown_keys to True.
|
||||
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
|
||||
discard_unknown_keys= True)
|
||||
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||||
|
||||
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
|
||||
discard_unknown_keys = True)
|
||||
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
||||
|
||||
##############################################################################
|
||||
# WEB ROUTES #
|
||||
##############################################################################
|
||||
|
||||
@web.route('/recovery', methods=['GET', 'POST'])
|
||||
def recovery():
|
||||
"""Start recovery flow
|
||||
If no active flow, redirect to kratos to create a flow, otherwise render the
|
||||
recovery template.
|
||||
:param flow: flow as given by Kratos
|
||||
:return: redirect or recovery page
|
||||
"""
|
||||
|
||||
flow = request.args.get("flow")
|
||||
if not flow:
|
||||
return redirect(KRATOS_PUBLIC_URL + "self-service/recovery/browser")
|
||||
|
||||
return render_template(
|
||||
'recover.html',
|
||||
api_url = KRATOS_PUBLIC_URL
|
||||
)
|
||||
|
||||
|
||||
@web.route('/settings', methods=['GET', 'POST'])
|
||||
def settings():
|
||||
"""Start settings flow
|
||||
If no active flow, redirect to kratos to create a flow, otherwise render the
|
||||
settings template.
|
||||
:param flow: flow as given by Kratos
|
||||
:return: redirect or settings page
|
||||
"""
|
||||
|
||||
flow = request.args.get("flow")
|
||||
if not flow:
|
||||
return redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser")
|
||||
|
||||
return render_template(
|
||||
'settings.html',
|
||||
api_url = KRATOS_PUBLIC_URL
|
||||
)
|
||||
|
||||
|
||||
@web.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
"""Start login flow
|
||||
If already logged in, shows the loggedin template. Otherwise creates a login
|
||||
flow, if no active flow will redirect to kratos to create a flow.
|
||||
|
||||
:param flow: flow as given by Kratos
|
||||
:return: redirect or login page
|
||||
"""
|
||||
|
||||
# Check if we are logged in:
|
||||
identity = get_auth()
|
||||
|
||||
if identity:
|
||||
return render_template(
|
||||
'loggedin.html',
|
||||
api_url = KRATOS_PUBLIC_URL,
|
||||
id = id)
|
||||
|
||||
flow = request.args.get("flow")
|
||||
|
||||
# If we do not have a flow, get one.
|
||||
if not flow:
|
||||
return redirect(KRATOS_PUBLIC_URL + "self-service/login/browser")
|
||||
|
||||
return render_template(
|
||||
'login.html',
|
||||
api_url = KRATOS_PUBLIC_URL
|
||||
)
|
||||
|
||||
|
||||
@web.route('/auth', methods=['GET', 'POST'])
|
||||
def auth():
|
||||
"""Authorize an user for an application
|
||||
If an application authenticated against the IdP (Idenitity Provider), if
|
||||
there are no active session, the user is forwarded to the login page.
|
||||
This is the entry point for those authorization requests. The challenge
|
||||
as provided, is verified. If an active user is logged in, the request
|
||||
is accepted and the user is returned to the application. If the user is not
|
||||
logged in yet, it redirects to the login page
|
||||
:param challenge: challenge as given by Hydra
|
||||
:return: redirect to login or application/idp
|
||||
"""
|
||||
|
||||
challenge = None
|
||||
|
||||
# Retrieve the challenge id from the request. Depending on the method it is
|
||||
# saved in the form (POST) or in a GET variable. If this variable is not set
|
||||
# we can not continue.
|
||||
if request.method == 'GET':
|
||||
challenge = request.args.get("login_challenge")
|
||||
if request.method == 'POST':
|
||||
challenge = request.args.post("login_challenge")
|
||||
|
||||
if not challenge:
|
||||
current_app.logger.error("No challenge given. Error in request")
|
||||
abort(400, description="Challenge required when requesting authorization")
|
||||
|
||||
|
||||
# Check if we are logged in:
|
||||
identity = get_auth()
|
||||
|
||||
|
||||
# If the user is not logged in yet, we redirect to the login page
|
||||
# but before we do that, we set the "flow_state" cookie to auth.
|
||||
# so the UI knows it has to redirect after a successful login.
|
||||
# The redirect URL is back to this page (auth) with the same challenge
|
||||
# so we can pickup the flow where we left off.
|
||||
if not identity:
|
||||
url = LOGIN_PANEL_URL + "/auth?login_challenge=" + challenge
|
||||
url = urllib.parse.quote_plus(url)
|
||||
|
||||
current_app.logger.info("Redirecting to login. Setting flow_state cookies")
|
||||
current_app.logger.info("auth_url: " + url)
|
||||
|
||||
response = redirect(LOGIN_PANEL_URL + "/login")
|
||||
response.set_cookie('flow_state', 'auth')
|
||||
response.set_cookie('auth_url', url)
|
||||
return response
|
||||
|
||||
|
||||
|
||||
current_app.logger.info("User is logged in. We can authorize the user")
|
||||
|
||||
try:
|
||||
login_request = HYDRA.login_request(challenge)
|
||||
except hydra_client.exceptions.NotFound:
|
||||
current_app.logger.error(f"Not Found. Login request not found. challenge={challenge}")
|
||||
abort(404, description="Login request not found. Please try again.")
|
||||
except hydra_client.exceptions.HTTPError:
|
||||
current_app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}")
|
||||
abort(503, description="Login request already used. Please try again.")
|
||||
|
||||
# Authorize the user
|
||||
# False positive: pylint: disable=no-member
|
||||
redirect_to = login_request.accept(
|
||||
identity.id,
|
||||
remember=True,
|
||||
# Remember session for 7d
|
||||
remember_for=60*60*24*7)
|
||||
|
||||
return redirect(redirect_to)
|
||||
|
||||
|
||||
@web.route('/consent', methods=['GET', 'POST'])
|
||||
def consent():
|
||||
"""Get consent
|
||||
For now, it just allows every user. Eventually this function should check
|
||||
the roles and settings of a user and provide that information to the
|
||||
application.
|
||||
:param consent_challenge: challenge as given by Hydra
|
||||
:return: redirect to login or render error
|
||||
"""
|
||||
|
||||
challenge = request.args.get("consent_challenge")
|
||||
if not challenge:
|
||||
abort(403, description="Consent request required. Do not call this page directly")
|
||||
try:
|
||||
consent_request = HYDRA.consent_request(challenge)
|
||||
except hydra_client.exceptions.NotFound:
|
||||
current_app.logger.error(f"Not Found. Consent request {challenge} not found")
|
||||
abort(404, description="Consent request does not exist. Please try again")
|
||||
except hydra_client.exceptions.HTTPError:
|
||||
current_app.logger.error(f"Conflict. Consent request {challenge} already used")
|
||||
abort(503, description="Consent request already used. Please try again")
|
||||
|
||||
# Get information about this consent request:
|
||||
# False positive: pylint: disable=no-member
|
||||
try:
|
||||
consent_client = consent_request.client
|
||||
|
||||
# Some versions of Hydra module return a string object and need to be decoded
|
||||
if isinstance(consent_client, str):
|
||||
consent_client = ast.literal_eval(consent_client)
|
||||
|
||||
app_id = consent_client.get('client_id')
|
||||
# False positive: pylint: disable=no-member
|
||||
kratos_id = consent_request.subject
|
||||
current_app.logger.error(f"Info: Found kratos_id {kratos_id}")
|
||||
current_app.logger.error(f"Info: Found app_id {app_id}")
|
||||
|
||||
except Exception as error:
|
||||
current_app.logger.error(f"Error: Unable to extract information from consent request")
|
||||
current_app.logger.error(f"Error: {error}")
|
||||
current_app.logger.error(f"Client: {consent_request.client}")
|
||||
current_app.logger.error(f"Subject: {consent_request.subject}")
|
||||
abort(501, description="Internal error occured")
|
||||
|
||||
# Get the related user object
|
||||
current_app.logger.error(f"Info: Getting user from admin {kratos_id}")
|
||||
user = KratosUser(KRATOS_ADMIN, kratos_id)
|
||||
if not user:
|
||||
current_app.logger.error(f"User not found in database: {kratos_id}")
|
||||
abort(401, description="User not found. Please try again.")
|
||||
|
||||
# Get role on this app
|
||||
app_obj = db.session.query(App).filter(App.slug == app_id).first()
|
||||
|
||||
# Default access level
|
||||
roles = []
|
||||
if app_obj:
|
||||
role_objects = (
|
||||
db.session.query(AppRole)
|
||||
.filter(AppRole.app_id == app_obj.id)
|
||||
.filter(AppRole.user_id == user.uuid)
|
||||
)
|
||||
for role_obj in role_objects:
|
||||
roles.append(role_obj.role)
|
||||
|
||||
current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}")
|
||||
|
||||
# Get claims for this user, provided the current app
|
||||
claims = user.get_claims(app_id, roles)
|
||||
|
||||
# pylint: disable=fixme
|
||||
# TODO: Need to implement checking claims here, once the backend for that is
|
||||
# developed
|
||||
current_app.logger.info(f"Providing consent to {app_id} for {kratos_id}")
|
||||
current_app.logger.info(f"{kratos_id} was granted access to {app_id}")
|
||||
|
||||
# False positive: pylint: disable=no-member
|
||||
return redirect(consent_request.accept(
|
||||
grant_scope=consent_request.requested_scope,
|
||||
grant_access_token_audience=consent_request.requested_access_token_audience,
|
||||
session=claims,
|
||||
))
|
||||
|
||||
|
||||
|
||||
@web.route('/status', methods=['GET', 'POST'])
|
||||
def status():
|
||||
"""Get status of current session
|
||||
Show if there is an user is logged in. If not shows: not-auth
|
||||
"""
|
||||
|
||||
auth_status = get_auth()
|
||||
|
||||
if auth_status:
|
||||
return auth_status.id
|
||||
return "not-auth"
|
||||
|
||||
|
||||
|
||||
def get_auth():
|
||||
"""Checks if user is logged in
|
||||
Queries the cookies. If an authentication cookie is found, it
|
||||
checks with Kratos if the cookie is still valid. If so,
|
||||
the profile is returned. Otherwise False is returned.
|
||||
:return: Profile or False if not logged in
|
||||
"""
|
||||
|
||||
try:
|
||||
cookie = request.cookies.get('ory_kratos_session')
|
||||
cookie = "ory_kratos_session=" + cookie
|
||||
except TypeError:
|
||||
current_app.logger.info("User not logged in or cookie corrupted")
|
||||
return False
|
||||
|
||||
# Given a cookie, check if it is valid and get the profile
|
||||
try:
|
||||
api_response = KRATOS_PUBLIC.to_session(
|
||||
cookie=cookie)
|
||||
|
||||
# Get all traits from ID
|
||||
return api_response.identity
|
||||
|
||||
except ory_kratos_client.ApiException as error:
|
||||
current_app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n")
|
||||
|
||||
return False
|
||||
|
||||
|
||||
Reference in a new issue