Create /web router for login panel parts

Integrated helper classes and configuration
Create login "area"
This commit is contained in:
Mart van Santen 2022-03-21 15:02:29 +08:00
parent 78b4ec2e23
commit e8063b1de7
10 changed files with 747 additions and 0 deletions

6
app.py
View file

@ -6,9 +6,13 @@ from werkzeug.exceptions import BadRequest
# These imports are required # These imports are required
from areas import api_v1 from areas import api_v1
from areas import web
from areas import users from areas import users
from areas import apps from areas import apps
from areas import auth from areas import auth
from areas import login
from helpers import ( from helpers import (
BadRequest, BadRequest,
@ -19,6 +23,7 @@ from helpers import (
kratos_error, kratos_error,
global_error, global_error,
hydra_error, hydra_error,
KratosUser
) )
from config import * from config import *
@ -26,6 +31,7 @@ app = Flask(__name__)
cors = CORS(app) cors = CORS(app)
app.config["SECRET_KEY"] = SECRET_KEY app.config["SECRET_KEY"] = SECRET_KEY
app.register_blueprint(api_v1) app.register_blueprint(api_v1)
app.register_blueprint(web)
# Error handlers # Error handlers
app.register_error_handler(Exception, global_error) app.register_error_handler(Exception, global_error)

View file

@ -1,6 +1,7 @@
from flask import Blueprint from flask import Blueprint
api_v1 = Blueprint("api_v1", __name__, url_prefix="/api/v1") api_v1 = Blueprint("api_v1", __name__, url_prefix="/api/v1")
web = Blueprint("web", __name__, url_prefix="/web")
@api_v1.route("/") @api_v1.route("/")

1
areas/login/__init__.py Normal file
View file

@ -0,0 +1 @@
from .login import *

304
areas/login/login.py Normal file
View file

@ -0,0 +1,304 @@
"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and MariaDB for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
# Basic system imports
import logging
import os
import urllib.parse
import urllib.request
# Hydra, OIDC Identity Provider
import hydra_client
# Kratos, Identity manager
import ory_kratos_client
#from exceptions import BackendError
# Flask
from flask import Flask, abort, redirect, render_template, request
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
#from kratos import KratosUser
from ory_kratos_client.api import v0alpha2_api as kratos_api
# Import modules for external APIs
from areas import web
from config import *
# APIs
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL)
# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL,
discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL,
discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
##############################################################################
# WEB ROUTES #
##############################################################################
@web.route('/recovery', methods=['GET', 'POST'])
def recovery():
"""Start recovery flow
If no active flow, redirect to kratos to create a flow, otherwise render the
recovery template.
:param flow: flow as given by Kratos
:return: redirect or recovery page
"""
flow = request.args.get("flow")
if not flow:
return redirect(KRATOS_PUBLIC_URL + "self-service/recovery/browser")
return render_template(
'recover.html',
api_url = KRATOS_PUBLIC_URL
)
@web.route('/settings', methods=['GET', 'POST'])
def settings():
"""Start settings flow
If no active flow, redirect to kratos to create a flow, otherwise render the
settings template.
:param flow: flow as given by Kratos
:return: redirect or settings page
"""
flow = request.args.get("flow")
if not flow:
return redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser")
return render_template(
'settings.html',
api_url = KRATOS_PUBLIC_URL
)
@web.route('/login', methods=['GET', 'POST'])
def login():
"""Start login flow
If already logged in, shows the loggedin template. Otherwise creates a login
flow, if no active flow will redirect to kratos to create a flow.
:param flow: flow as given by Kratos
:return: redirect or login page
"""
# Check if we are logged in:
identity = get_auth()
if identity:
return render_template(
'loggedin.html',
api_url = KRATOS_PUBLIC_URL,
id = id)
flow = request.args.get("flow")
# If we do not have a flow, get one.
if not flow:
return redirect(KRATOS_PUBLIC_URL + "self-service/login/browser")
return render_template(
'login.html',
api_url = KRATOS_PUBLIC_URL
)
@web.route('/auth', methods=['GET', 'POST'])
def auth():
"""Authorize an user for an application
If an application authenticated against the IdP (Idenitity Provider), if
there are no active session, the user is forwarded to the login page.
This is the entry point for those authorization requests. The challenge
as provided, is verified. If an active user is logged in, the request
is accepted and the user is returned to the application. If the user is not
logged in yet, it redirects to the login page
:param challenge: challenge as given by Hydra
:return: redirect to login or application/idp
"""
challenge = None
# Retrieve the challenge id from the request. Depending on the method it is
# saved in the form (POST) or in a GET variable. If this variable is not set
# we can not continue.
if request.method == 'GET':
challenge = request.args.get("login_challenge")
if request.method == 'POST':
challenge = request.args.post("login_challenge")
if not challenge:
app.logger.error("No challenge given. Error in request")
abort(400, description="Challenge required when requesting authorization")
# Check if we are logged in:
identity = get_auth()
# If the user is not logged in yet, we redirect to the login page
# but before we do that, we set the "flow_state" cookie to auth.
# so the UI knows it has to redirect after a successful login.
# The redirect URL is back to this page (auth) with the same challenge
# so we can pickup the flow where we left off.
if not identity:
url = PUBLIC_URL + "/auth?login_challenge=" + challenge
url = urllib.parse.quote_plus(url)
app.logger.info("Redirecting to login. Setting flow_state cookies")
app.logger.info("auth_url: " + url)
response = redirect(app.config["PUBLIC_URL"] + "/login")
response.set_cookie('flow_state', 'auth')
response.set_cookie('auth_url', url)
return response
app.logger.info("User is logged in. We can authorize the user")
try:
login_request = HYDRA.login_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error(f"Not Found. Login request not found. challenge={challenge}")
abort(404, description="Login request not found. Please try again.")
except hydra_client.exceptions.HTTPError:
app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}")
abort(503, description="Login request already used. Please try again.")
# Authorize the user
# False positive: pylint: disable=no-member
redirect_to = login_request.accept(
identity.id,
remember=True,
# Remember session for 7d
remember_for=60*60*24*7)
return redirect(redirect_to)
@web.route('/consent', methods=['GET', 'POST'])
def consent():
"""Get consent
For now, it just allows every user. Eventually this function should check
the roles and settings of a user and provide that information to the
application.
:param consent_challenge: challenge as given by Hydra
:return: redirect to login or render error
"""
challenge = request.args.get("consent_challenge")
if not challenge:
abort(403, description="Consent request required. Do not call this page directly")
try:
consent_request = HYDRA.consent_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error(f"Not Found. Consent request {challenge} not found")
abort(404, description="Consent request does not exist. Please try again")
except hydra_client.exceptions.HTTPError:
app.logger.error(f"Conflict. Consent request {challenge} already used")
abort(503, description="Consent request already used. Please try again")
# Get information about this consent request:
# False positive: pylint: disable=no-member
app_id = consent_request.client.client_id
# False positive: pylint: disable=no-member
kratos_id = consent_request.subject
# Get the related user object
user = KratosUser(KRATOS_ADMIN, kratos_id)
if not user:
app.logger.error(f"User not found in database: {kratos_id}")
abort(401, description="User not found. Please try again.")
# Get role on this app
app_obj = db.session.query(App).filter(App.slug == app_id).first()
# Default access level
roles = []
if app_obj:
role_objects = (
db.session.query(AppRole)
.filter(AppRole.app_id == app_obj.id)
.filter(AppRole.user_id == user.uuid)
)
for role_obj in role_objects:
roles.append(role_obj.role)
app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}")
# Get claims for this user, provided the current app
claims = user.get_claims(app_id, roles)
# pylint: disable=fixme
# TODO: Need to implement checking claims here, once the backend for that is
# developed
app.logger.info(f"Providing consent to {app_id} for {kratos_id}")
app.logger.info(f"{kratos_id} was granted access to {app_id}")
# False positive: pylint: disable=no-member
return redirect(consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
session=claims,
))
@web.route('/status', methods=['GET', 'POST'])
def status():
"""Get status of current session
Show if there is an user is logged in. If not shows: not-auth
"""
auth_status = get_auth()
if auth_status:
return auth_status.id
return "not-auth"
def get_auth():
"""Checks if user is logged in
Queries the cookies. If an authentication cookie is found, it
checks with Kratos if the cookie is still valid. If so,
the profile is returned. Otherwise False is returned.
:return: Profile or False if not logged in
"""
try:
cookie = request.cookies.get('ory_kratos_session')
cookie = "ory_kratos_session=" + cookie
except TypeError:
app.logger.info("User not logged in or cookie corrupted")
return False
# Given a cookie, check if it is valid and get the profile
try:
api_response = KRATOS_PUBLIC.to_session(
cookie=cookie)
# Get all traits from ID
return api_response.identity
except ory_kratos_client.ApiException as error:
app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n")
return False

View file

@ -7,3 +7,11 @@ HYDRA_CLIENT_SECRET = os.environ.get("HYDRA_CLIENT_SECRET")
HYDRA_AUTHORIZATION_BASE_URL = os.environ.get("HYDRA_AUTHORIZATION_BASE_URL") HYDRA_AUTHORIZATION_BASE_URL = os.environ.get("HYDRA_AUTHORIZATION_BASE_URL")
HYDRA_URL = os.environ.get("HYDRA_URL") HYDRA_URL = os.environ.get("HYDRA_URL")
TOKEN_URL = os.environ.get("TOKEN_URL") TOKEN_URL = os.environ.get("TOKEN_URL")
PUBLIC_URL = os.environ.get('PUBLIC_URL')
HYDRA_ADMIN_URL = os.environ.get('HYDRA_ADMIN_URL')
KRATOS_ADMIN_URL = os.environ.get('KRATOS_ADMIN_URL')
KRATOS_PUBLIC_URL = str(os.environ.get('KRATOS_PUBLIC_URL')) + "/"
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SQLALCHEMY_TRACK_MODIFICATIONS = False

View file

@ -1,3 +1,4 @@
from .kratos_api import * from .kratos_api import *
from .error_handler import * from .error_handler import *
from .hydra_oauth import * from .hydra_oauth import *
from .kratos import *

17
helpers/classes.py Normal file
View file

@ -0,0 +1,17 @@
"""Generic classes used by different parts of the application"""
import urllib.request
# Instead of processing the redirect, we return, so the application
# can handle the redirect itself. This is needed to extract cookies
# etc.
class RedirectFilter(urllib.request.HTTPRedirectHandler):
"""Overrides the standard redirect handler so it does not automatically
redirect. This allows for inspecting the return values before redirecting or
override the redirect action"""
# pylint: disable=too-many-arguments
# This amount of arguments is expected by the HTTPRedirectHandler
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None

8
helpers/exceptions.py Normal file
View file

@ -0,0 +1,8 @@
"""Custom exception handler to raise consistent exceptions, as different backend
raise different exceptions"""
class BackendError(Exception):
"""The backend error is raised when interacting with
the backend fails or gives an unexpected result. The
error contains a oneliner description of the problem"""

392
helpers/kratos.py Normal file
View file

@ -0,0 +1,392 @@
"""
Implement the Kratos model to interact with kratos users
"""
import json
import re
import urllib.parse
import urllib.request
from typing import Dict
from urllib.request import Request
# Some imports commented out to satisfy pylint. They will be used once more
# functions are migrated to this model
from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body \
import AdminCreateSelfServiceRecoveryLinkBody
from ory_kratos_client.model.admin_update_identity_body import AdminUpdateIdentityBody
from ory_kratos_client.rest import ApiException as KratosApiException
from .classes import RedirectFilter
from .exceptions import BackendError
# pylint: disable=too-many-instance-attributes
class KratosUser():
"""
The User object, interact with the User. It both calls to Kratos as to
the database for storing and retrieving data.
"""
api = None
__uuid = None
email = None
name = None
username = None
state = None
created_at = None
updated_at = None
def __init__(self, api, uuid = None):
self.api = api
self.state = 'active'
if uuid:
try:
obj = api.admin_get_identity(uuid)
if obj:
self.__uuid = uuid
try:
self.name = obj.traits['name']
except KeyError:
self.name = ""
try:
self.username = obj.traits['username']
except KeyError:
self.username = ""
self.email = obj.traits['email']
self.state = obj.state
self.created_at = obj.created_at
self.updated_at = obj.updated_at
except KratosApiException as error:
raise BackendError(f"Unable to get entry, kratos replied with: {error}") from error
def __repr__(self):
return f"\"{self.name}\" <{self.email}>"
@property
def uuid(self):
"""Gets the protected UUID propery"""
return self.__uuid
def save(self):
"""Saves this object into the kratos backend database. If the object
is new, it will create, otherwise update an entry.
:raise: BackendError is an error with Kratos happened.
"""
# Traits are the "profile" values we will set, kratos will complain on
# empty values, so we check if "name" is set and only add it if so.
traits = {'email':self.email}
if self.name:
traits['name'] = self.name
# If we have a UUID, we are updating
if self.__uuid:
body = AdminUpdateIdentityBody(
schema_id="default",
state=self.state,
traits=traits,
)
try:
api_response = self.api.admin_update_identity(self.__uuid,
admin_update_identity_body=body)
except KratosApiException as error:
raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error
else:
body = AdminCreateIdentityBody(
schema_id="default",
traits=traits,
)
try:
# Create an Identity
api_response = self.api.admin_create_identity(
admin_create_identity_body=body)
if api_response.id:
self.__uuid = api_response.id
except KratosApiException as error:
raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error
def delete(self):
"""Deletes the object from kratos
:raise: BackendError if Krator API call fails
"""
if self.__uuid:
try:
self.api.admin_delete_identity(self.__uuid)
return True
except KratosApiException as error:
raise BackendError(
f"Unable to delete entry, kratos replied with: {error}"
) from error
return False
@staticmethod
def find_by_email(api, email):
"""Queries Kratos to find kratos ID for this given identifier
:param api: Kratos ADMIN API Object
:param email: Identifier to look for
:return: Return none or string with ID
"""
kratos_id = None
# Get out user ID by iterating over all available IDs
data = api.admin_list_identities()
for kratos_obj in data.value:
# Unique identifier we use
if kratos_obj.traits['email'] == email:
kratos_id = str(kratos_obj.id)
return KratosUser(api, kratos_id)
return None
@staticmethod
def find_all(api):
"""Queries Kratos to find all kratos users and return them
as a list of KratosUser objects
:return: Return list
"""
kratos_id = None
return_list = []
# Get out user ID by iterating over all available IDs
data = api.admin_list_identities()
for kratos_obj in data.value:
kratos_id = str(kratos_obj.id)
return_list.append(KratosUser(api, kratos_id))
return return_list
@staticmethod
def extract_cookies(cookies):
"""Extract session and CSRF cookie from a list of cookies.
Iterate over a list of cookies and extract the session
cookies required for Kratos User Panel UI
:param cookies: str[], list of cookies
:return: Cookies concatenated as string
:rtype: str
"""
# Find kratos session cookie & csrf
cookie_csrf = None
cookie_session = None
for cookie in cookies:
search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie)
if search:
cookie_session = "ory_kratos_session=" + search.group(1)
search = re.match(r'(csrf_token[^;]*);.*$', cookie)
if search:
cookie_csrf = search.group(1)
if not cookie_csrf or not cookie_session:
raise BackendError("Flow started, but expected cookies not found")
# Combined the relevant cookies
cookie = cookie_csrf + "; " + cookie_session
return cookie
def get_recovery_link(self):
"""Call the kratos API to create a recovery URL for a kratos ID
:param: api Kratos ADMIN API Object
:param: kratos_id UUID of kratos object
:return: Return none or string with recovery URL
"""
try:
# Create body request to get recovery link with admin API
body = AdminCreateSelfServiceRecoveryLinkBody(
expires_in="15m",
identity_id=self.__uuid
)
# Get recovery link from admin API
call = self.api.admin_create_self_service_recovery_link(
admin_create_self_service_recovery_link_body=body)
url = call.recovery_link
except KratosApiException:
return None
return url
def ui_set_password(self, api_url, recovery_url, password):
"""Follow a Kratos UI sequence to set password
Kratos does not provide an interface to set a password directly. However
we still can set a password by following the UI sequence. To so so we
to follow the steps which are normally done in a browser once someone
clicks the recovery link.
:param: api_url URL to public endpoint of API
:param: recovery_url Recovery URL as generated by Kratos
:param: password Password
:raise: Exception with error message as first argument
:return: boolean True on success, False on failure (usualy password
to simple)
"""
# Step 1: Open the recovery link and extract the cookies, as we need them
# for the next steps
try:
# We override the default Redirect handler with our custom handler to
# be able to catch the cookies.
opener = urllib.request.build_opener(RedirectFilter)
# We rewrite the URL we got. It can be we run this from an enviroment
# with different KRATUS_PUBLIC_URL API endpoint then kratos provide
# itself. For example in the case running as a job to create an admin
# account before TLS is setup/working
search = re.match(r'.*(self-service.recovery.flow.*)$', recovery_url)
if search:
recovery_url = api_url + search.group(1)
else:
raise BackendError('Did not find recovery flow')
opener.open(recovery_url)
# If we do not have a 2xx status, urllib throws an error, as we "stopped"
# at our redirect, we expect a 3xx status
except urllib.error.HTTPError as http_error:
# Kratos pre-0.8 returned 302, kratos 0.8 returns 303
if http_error.status in (302, 303):
# Get the cookie and redirect location from the response
# headers
cookies = http_error.headers.get_all('Set-Cookie')
url = http_error.headers.get('Location')
else:
raise BackendError('Unable to fetch recovery link') from http_error
else:
raise BackendError('Recovery link returned unexpected data')
# Step 2: Extract cookies and data for next step. We expect to have an
# authorized session now. We need the cookies for followup calls
# to make changes to the account (set password)
# Get flow id
search = re.match(r'.*\?flow=(.*)', url)
if search:
flow = search.group(1)
else:
raise BackendError('No Flow ID found for recovery sequence')
# Extract cookies with helper function
cookie = self.extract_cookies(cookies)
# Step 3: Get the "UI", kratos expect us to call the API to get the UI
# elements which contains the CSRF token, which is needed when
# posting the password data
try:
url = api_url + "/self-service/settings/flows?id=" + flow
req = Request(url, headers={'Cookie':cookie})
opener = urllib.request.build_opener()
# Execute the request, read the data, decode the JSON, get the
# right CSRF token out of the decoded JSON
obj = json.loads(opener.open(req).read())
csrf_token = obj['ui']['nodes'][0]['attributes']['value']
except Exception as error:
raise BackendError("Unable to get password reset UI") from error
# Step 4: Post out password
url = api_url + "self-service/settings?flow=" + flow
# Create POST data as form data
data = {
'method': 'password',
'password': password,
'csrf_token': csrf_token
}
data = urllib.parse.urlencode(data)
data = data.encode('ascii')
# POST the new password
try:
req = Request(url, data = data, headers={'Cookie':cookie}, method="POST")
opener = urllib.request.build_opener(RedirectFilter)
opener.open(req)
# If we do not have a 2xx status, urllib throws an error, as we "stopped"
# at our redirect, we expect a 3xx status
except urllib.error.HTTPError as http_error:
# Kratos pre-0.8 returned 302, kratos 0.8 returns 303
if http_error.status in (302, 303):
# Kratos only sends HTTP codes after our submission. We should
# now call the `settings` endpoint to see if our call
# succeeded, or else, if there are any messages about why it
# failed
try:
url = api_url + "/self-service/settings/flows?id=" + flow
req = Request(url, headers={'Cookie':cookie, "Accept": "application/json"})
opener = urllib.request.build_opener()
# Execute the request, read the data, decode the JSON
obj = json.loads(opener.open(req).read())
# If the 'state' has changed to 'success', the password was
# set successfully
if obj['state'] == 'success':
return True
# Failure: we check if there are error messages
for node in obj['ui']['nodes']:
if node['messages']:
print(f"Problems with field '{node['meta']['label']['text']}':")
for message in node['messages']:
print(message['text'])
raise BackendError("Password not set") from http_error
except Exception as error:
raise BackendError("Unable to get password reset UI") from error
return False
raise BackendError("Unable to set password by submitting form")
# Pylint complains about app not used. That is correct, but we will use that
# in the future. Ignore this error
# pylint: disable=unused-argument
def get_claims(self, app, roles, mapping=None) -> Dict[str, Dict[str, str]]:
"""Create openID Connect token
Use the userdata stored in the user object to create an OpenID Connect token.
The token returned by this function can be passed to Hydra,
which will store it and serve it to OpenID Connect Clients to retrieve user information.
If you need to relabel a field pass an array of tuples to mapping.
Example: getClaims('nextcloud', mapping=[("name", "username"),("roles", "groups")])
Attributes:
appname - Name or ID of app to connect to
roles - List of roles to add to the `stackspin_roles` claim
mapping - Mapping of the fields
Returns:
OpenID Connect token of type dict
"""
# Name should be set, however, we do not enforce this yet.
# if somebody does not set it's name, we use the email address
# as name
if self.name:
name = self.name
else:
name = self.email
if self.username:
username = self.username
else:
username = self.email
token = {
"name": name,
"preferred_username": username,
"email": self.email,
"stackspin_roles": roles,
}
# Relabel field names
if mapping:
for old_field_name, new_field_name in mapping:
token[new_field_name] = token[old_field_name]
del token[old_field_name]
return dict(id_token=token)

View file

@ -27,4 +27,13 @@ export HYDRA_CLIENT_SECRET="gDSEuakxzybHBHJocnmtDOLMwlWWEvPh"
export HYDRA_URL="https://sso.init.stackspin.net" export HYDRA_URL="https://sso.init.stackspin.net"
export HYDRA_AUTHORIZATION_BASE_URL="https://sso.init.stackspin.net/oauth2/auth" export HYDRA_AUTHORIZATION_BASE_URL="https://sso.init.stackspin.net/oauth2/auth"
export TOKEN_URL="https://sso.init.stackspin.net/oauth2/token" export TOKEN_URL="https://sso.init.stackspin.net/oauth2/token"
# Login facilitator paths
export KRATOS_PUBLIC_URL=http://localhost/kapi
export KRATOS_ADMIN_URL=http://localhost:8000
export HYDRA_ADMIN_URL=http://localhost:4445
export PUBLIC_URL=http://localhost/login
export DATABASE_URL="mysql+pymysql://stackspin:stackspin@localhost/stackspin?charset=utf8mb4"
flask run flask run