341 lines
11 KiB
Python
341 lines
11 KiB
Python
|
|
"""Flask application which provides the interface of a login panel. The
|
|
application interacts with different backend, like the Kratos backend for users,
|
|
Hydra for OIDC sessions and MariaDB for application and role specifications.
|
|
The application provides also several command line options to interact with
|
|
the user entries in the database(s)"""
|
|
|
|
|
|
# Basic system imports
|
|
import logging
|
|
import os
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
# Hydra, OIDC Identity Provider
|
|
import hydra_client
|
|
|
|
# Kratos, Identity manager
|
|
import ory_kratos_client
|
|
|
|
# Flask
|
|
from flask import Flask, abort, redirect, render_template, request
|
|
from flask_migrate import Migrate
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
#from kratos import KratosUser
|
|
from ory_kratos_client.api import v0alpha2_api as kratos_api
|
|
|
|
# Import modules for external APIs
|
|
|
|
from areas import web
|
|
from config import *
|
|
from flask import current_app
|
|
|
|
from helpers import (
|
|
BadRequest,
|
|
KratosError,
|
|
HydraError,
|
|
bad_request_error,
|
|
validation_error,
|
|
kratos_error,
|
|
global_error,
|
|
hydra_error,
|
|
KratosUser,
|
|
App,
|
|
AppRole
|
|
)
|
|
|
|
import ast
|
|
# This is a circular import and should be solved differently
|
|
#from app import db
|
|
from database import db
|
|
|
|
# APIs
|
|
# Create HYDRA & KRATOS API interfaces
|
|
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
|
|
|
|
# Kratos has an admin and public end-point. We create an API for them
|
|
# both. The kratos implementation has bugs, which forces us to set
|
|
# the discard_unknown_keys to True.
|
|
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
|
|
discard_unknown_keys= True)
|
|
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
|
|
|
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
|
|
discard_unknown_keys = True)
|
|
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
|
|
|
|
##############################################################################
|
|
# WEB ROUTES #
|
|
##############################################################################
|
|
|
|
@web.route('/recovery', methods=['GET', 'POST'])
|
|
def recovery():
|
|
"""Start recovery flow
|
|
If no active flow, redirect to kratos to create a flow, otherwise render the
|
|
recovery template.
|
|
:param flow: flow as given by Kratos
|
|
:return: redirect or recovery page
|
|
"""
|
|
|
|
flow = request.args.get("flow")
|
|
if not flow:
|
|
return redirect(KRATOS_PUBLIC_URL + "self-service/recovery/browser")
|
|
|
|
return render_template(
|
|
'recover.html',
|
|
api_url = KRATOS_PUBLIC_URL
|
|
)
|
|
|
|
|
|
@web.route('/settings', methods=['GET', 'POST'])
|
|
def settings():
|
|
"""Start settings flow
|
|
If no active flow, redirect to kratos to create a flow, otherwise render the
|
|
settings template.
|
|
:param flow: flow as given by Kratos
|
|
:return: redirect or settings page
|
|
"""
|
|
|
|
flow = request.args.get("flow")
|
|
if not flow:
|
|
return redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser")
|
|
|
|
return render_template(
|
|
'settings.html',
|
|
api_url = KRATOS_PUBLIC_URL
|
|
)
|
|
|
|
|
|
@web.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Start login flow
|
|
If already logged in, shows the loggedin template. Otherwise creates a login
|
|
flow, if no active flow will redirect to kratos to create a flow.
|
|
|
|
:param flow: flow as given by Kratos
|
|
:return: redirect or login page
|
|
"""
|
|
|
|
# Check if we are logged in:
|
|
identity = get_auth()
|
|
|
|
if identity:
|
|
return render_template(
|
|
'loggedin.html',
|
|
api_url = KRATOS_PUBLIC_URL,
|
|
id = id)
|
|
|
|
flow = request.args.get("flow")
|
|
|
|
# If we do not have a flow, get one.
|
|
if not flow:
|
|
return redirect(KRATOS_PUBLIC_URL + "self-service/login/browser")
|
|
|
|
return render_template(
|
|
'login.html',
|
|
api_url = KRATOS_PUBLIC_URL
|
|
)
|
|
|
|
|
|
@web.route('/auth', methods=['GET', 'POST'])
|
|
def auth():
|
|
"""Authorize an user for an application
|
|
If an application authenticated against the IdP (Idenitity Provider), if
|
|
there are no active session, the user is forwarded to the login page.
|
|
This is the entry point for those authorization requests. The challenge
|
|
as provided, is verified. If an active user is logged in, the request
|
|
is accepted and the user is returned to the application. If the user is not
|
|
logged in yet, it redirects to the login page
|
|
:param challenge: challenge as given by Hydra
|
|
:return: redirect to login or application/idp
|
|
"""
|
|
|
|
challenge = None
|
|
|
|
# Retrieve the challenge id from the request. Depending on the method it is
|
|
# saved in the form (POST) or in a GET variable. If this variable is not set
|
|
# we can not continue.
|
|
if request.method == 'GET':
|
|
challenge = request.args.get("login_challenge")
|
|
if request.method == 'POST':
|
|
challenge = request.args.post("login_challenge")
|
|
|
|
if not challenge:
|
|
current_app.logger.error("No challenge given. Error in request")
|
|
abort(400, description="Challenge required when requesting authorization")
|
|
|
|
|
|
# Check if we are logged in:
|
|
identity = get_auth()
|
|
|
|
|
|
# If the user is not logged in yet, we redirect to the login page
|
|
# but before we do that, we set the "flow_state" cookie to auth.
|
|
# so the UI knows it has to redirect after a successful login.
|
|
# The redirect URL is back to this page (auth) with the same challenge
|
|
# so we can pickup the flow where we left off.
|
|
if not identity:
|
|
url = PUBLIC_URL + "/auth?login_challenge=" + challenge
|
|
url = urllib.parse.quote_plus(url)
|
|
|
|
current_app.logger.info("Redirecting to login. Setting flow_state cookies")
|
|
current_app.logger.info("auth_url: " + url)
|
|
|
|
response = redirect(PUBLIC_URL + "/login")
|
|
response.set_cookie('flow_state', 'auth')
|
|
response.set_cookie('auth_url', url)
|
|
return response
|
|
|
|
|
|
|
|
current_app.logger.info("User is logged in. We can authorize the user")
|
|
|
|
try:
|
|
login_request = HYDRA.login_request(challenge)
|
|
except hydra_client.exceptions.NotFound:
|
|
current_app.logger.error(f"Not Found. Login request not found. challenge={challenge}")
|
|
abort(404, description="Login request not found. Please try again.")
|
|
except hydra_client.exceptions.HTTPError:
|
|
current_app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}")
|
|
abort(503, description="Login request already used. Please try again.")
|
|
|
|
# Authorize the user
|
|
# False positive: pylint: disable=no-member
|
|
redirect_to = login_request.accept(
|
|
identity.id,
|
|
remember=True,
|
|
# Remember session for 7d
|
|
remember_for=60*60*24*7)
|
|
|
|
return redirect(redirect_to)
|
|
|
|
|
|
@web.route('/consent', methods=['GET', 'POST'])
|
|
def consent():
|
|
"""Get consent
|
|
For now, it just allows every user. Eventually this function should check
|
|
the roles and settings of a user and provide that information to the
|
|
application.
|
|
:param consent_challenge: challenge as given by Hydra
|
|
:return: redirect to login or render error
|
|
"""
|
|
|
|
challenge = request.args.get("consent_challenge")
|
|
if not challenge:
|
|
abort(403, description="Consent request required. Do not call this page directly")
|
|
try:
|
|
consent_request = HYDRA.consent_request(challenge)
|
|
except hydra_client.exceptions.NotFound:
|
|
current_app.logger.error(f"Not Found. Consent request {challenge} not found")
|
|
abort(404, description="Consent request does not exist. Please try again")
|
|
except hydra_client.exceptions.HTTPError:
|
|
current_app.logger.error(f"Conflict. Consent request {challenge} already used")
|
|
abort(503, description="Consent request already used. Please try again")
|
|
|
|
# Get information about this consent request:
|
|
# False positive: pylint: disable=no-member
|
|
try:
|
|
consent_client = consent_request.client
|
|
|
|
# Some versions of Hydra module return a string object and need to be decoded
|
|
if isinstance(consent_client, str):
|
|
consent_client = ast.literal_eval(consent_client)
|
|
|
|
app_id = consent_client.get('client_id')
|
|
# False positive: pylint: disable=no-member
|
|
kratos_id = consent_request.subject
|
|
current_app.logger.error(f"Info: Found kratos_id {kratos_id}")
|
|
current_app.logger.error(f"Info: Found app_id {app_id}")
|
|
|
|
except Exception as error:
|
|
current_app.logger.error(f"Error: Unable to extract information from consent request")
|
|
current_app.logger.error(f"Error: {error}")
|
|
current_app.logger.error(f"Client: {consent_request.client}")
|
|
current_app.logger.error(f"Subject: {consent_request.subject}")
|
|
abort(501, description="Internal error occured")
|
|
|
|
# Get the related user object
|
|
current_app.logger.error(f"Info: Getting user from admin {kratos_id}")
|
|
user = KratosUser(KRATOS_ADMIN, kratos_id)
|
|
if not user:
|
|
current_app.logger.error(f"User not found in database: {kratos_id}")
|
|
abort(401, description="User not found. Please try again.")
|
|
|
|
# Get role on this app
|
|
app_obj = db.session.query(App).filter(App.slug == app_id).first()
|
|
|
|
# Default access level
|
|
roles = []
|
|
if app_obj:
|
|
role_objects = (
|
|
db.session.query(AppRole)
|
|
.filter(AppRole.app_id == app_obj.id)
|
|
.filter(AppRole.user_id == user.uuid)
|
|
)
|
|
for role_obj in role_objects:
|
|
roles.append(role_obj.role)
|
|
|
|
current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}")
|
|
|
|
# Get claims for this user, provided the current app
|
|
claims = user.get_claims(app_id, roles)
|
|
|
|
# pylint: disable=fixme
|
|
# TODO: Need to implement checking claims here, once the backend for that is
|
|
# developed
|
|
current_app.logger.info(f"Providing consent to {app_id} for {kratos_id}")
|
|
current_app.logger.info(f"{kratos_id} was granted access to {app_id}")
|
|
|
|
# False positive: pylint: disable=no-member
|
|
return redirect(consent_request.accept(
|
|
grant_scope=consent_request.requested_scope,
|
|
grant_access_token_audience=consent_request.requested_access_token_audience,
|
|
session=claims,
|
|
))
|
|
|
|
|
|
|
|
@web.route('/status', methods=['GET', 'POST'])
|
|
def status():
|
|
"""Get status of current session
|
|
Show if there is an user is logged in. If not shows: not-auth
|
|
"""
|
|
|
|
auth_status = get_auth()
|
|
|
|
if auth_status:
|
|
return auth_status.id
|
|
return "not-auth"
|
|
|
|
|
|
|
|
def get_auth():
|
|
"""Checks if user is logged in
|
|
Queries the cookies. If an authentication cookie is found, it
|
|
checks with Kratos if the cookie is still valid. If so,
|
|
the profile is returned. Otherwise False is returned.
|
|
:return: Profile or False if not logged in
|
|
"""
|
|
|
|
try:
|
|
cookie = request.cookies.get('ory_kratos_session')
|
|
cookie = "ory_kratos_session=" + cookie
|
|
except TypeError:
|
|
current_app.logger.info("User not logged in or cookie corrupted")
|
|
return False
|
|
|
|
# Given a cookie, check if it is valid and get the profile
|
|
try:
|
|
api_response = KRATOS_PUBLIC.to_session(
|
|
cookie=cookie)
|
|
|
|
# Get all traits from ID
|
|
return api_response.identity
|
|
|
|
except ory_kratos_client.ApiException as error:
|
|
current_app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n")
|
|
|
|
return False
|
|
|
|
|