"""Flask application which provides the interface of a login panel. The application interacts with different backend, like the Kratos backend for users, Hydra for OIDC sessions and MariaDB for application and role specifications. The application provides also several command line options to interact with the user entries in the database(s)""" import urllib.parse import urllib.request import ast import hydra_client import ory_kratos_client from ory_kratos_client.api import v0alpha2_api as kratos_api from flask import abort, redirect, render_template, request, current_app from database import db from helpers import KratosUser from config import * from web import web from areas.apps import AppRole, App from areas.roles import RoleService # This is a circular import and should be solved differently # from app import db # APIs # Create HYDRA & KRATOS API interfaces HYDRA = hydra_client.HydraAdmin(HYDRA_ADMIN_URL) # Kratos has an admin and public end-point. We create an API for them # both. The kratos implementation has bugs, which forces us to set # the discard_unknown_keys to True. tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True) KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) tmp = ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL, discard_unknown_keys=True) KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) ############################################################################## # WEB ROUTES # ############################################################################## @web.route("/recovery", methods=["GET", "POST"]) def recovery(): """Start recovery flow If no active flow, redirect to kratos to create a flow, otherwise render the recovery template. :param flow: flow as given by Kratos :return: redirect or recovery page """ flow = request.args.get("flow") if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/recovery/browser") return render_template("recover.html", api_url=KRATOS_PUBLIC_URL) @web.route("/settings", methods=["GET", "POST"]) def settings(): """Start settings flow If no active flow, redirect to kratos to create a flow, otherwise render the settings template. :param flow: flow as given by Kratos :return: redirect or settings page """ flow = request.args.get("flow") if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser") return render_template("settings.html", api_url=KRATOS_PUBLIC_URL) @web.route("/error", methods=["GET"]) def error(): """Show error messages from Kratos Implements user-facing errors as described in https://www.ory.sh/docs/kratos/self-service/flows/user-facing-errors :param id: error ID as given by Kratos :return: redirect or settings page """ error_id = request.args.get("id") api_response="" try: # Get Self-Service Errors api_response = KRATOS_ADMIN.get_self_service_error(error_id) except ory_kratos_client.ApiException as ex: current_app.logger.error( "Exception when calling V0alpha2Api->get_self_service_error: %s\n", ex) return render_template("error.html", error_message=api_response) @web.route("/login", methods=["GET", "POST"]) def login(): """Start login flow If already logged in, shows the loggedin template. Otherwise creates a login flow, if no active flow will redirect to kratos to create a flow. :param flow: flow as given by Kratos :return: redirect or login page """ # Check if we are logged in: identity = get_auth() if identity: return render_template("loggedin.html", api_url=KRATOS_PUBLIC_URL, id=id) flow = request.args.get("flow") # If we do not have a flow, get one. if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/login/browser") return render_template("login.html", api_url=KRATOS_PUBLIC_URL) @web.route("/auth", methods=["GET", "POST"]) def auth(): """Authorize an user for an application If an application authenticated against the IdP (Idenitity Provider), if there are no active session, the user is forwarded to the login page. This is the entry point for those authorization requests. The challenge as provided, is verified. If an active user is logged in, the request is accepted and the user is returned to the application. If the user is not logged in yet, it redirects to the login page :param challenge: challenge as given by Hydra :return: redirect to login or application/idp """ challenge = None # Retrieve the challenge id from the request. Depending on the method it is # saved in the form (POST) or in a GET variable. If this variable is not set # we can not continue. if request.method == "GET": challenge = request.args.get("login_challenge") if request.method == "POST": challenge = request.args.post("login_challenge") if not challenge: current_app.logger.error("No challenge given. Error in request") abort(400, description="Challenge required when requesting authorization") # Check if we are logged in: identity = get_auth() # If the user is not logged in yet, we redirect to the login page # but before we do that, we set the "flow_state" cookie to auth. # so the UI knows it has to redirect after a successful login. # The redirect URL is back to this page (auth) with the same challenge # so we can pickup the flow where we left off. if not identity: url = LOGIN_PANEL_URL + "/auth?login_challenge=" + challenge url = urllib.parse.quote_plus(url) current_app.logger.info("Redirecting to login. Setting flow_state cookies") current_app.logger.info("auth_url: " + url) response = redirect(LOGIN_PANEL_URL + "/login") response.set_cookie("flow_state", "auth") response.set_cookie("auth_url", url) return response current_app.logger.info("User is logged in. We can authorize the user") try: login_request = HYDRA.login_request(challenge) except hydra_client.exceptions.NotFound: current_app.logger.error( f"Not Found. Login request not found. challenge={challenge}" ) abort(404, description="Login request not found. Please try again.") except hydra_client.exceptions.HTTPError: current_app.logger.error( f"Conflict. Login request has been used already. challenge={challenge}" ) abort(503, description="Login request already used. Please try again.") # Authorize the user # False positive: pylint: disable=no-member redirect_to = login_request.accept( identity.id, remember=True, # Remember session for 7d remember_for=60 * 60 * 24 * 7, ) return redirect(redirect_to) @web.route("/consent", methods=["GET", "POST"]) def consent(): """Get consent For now, it just allows every user. Eventually this function should check the roles and settings of a user and provide that information to the application. :param consent_challenge: challenge as given by Hydra :return: redirect to login or render error """ challenge = request.args.get("consent_challenge") if not challenge: abort( 403, description="Consent request required. Do not call this page directly" ) try: consent_request = HYDRA.consent_request(challenge) except hydra_client.exceptions.NotFound: current_app.logger.error(f"Not Found. Consent request {challenge} not found") abort(404, description="Consent request does not exist. Please try again") except hydra_client.exceptions.HTTPError: current_app.logger.error(f"Conflict. Consent request {challenge} already used") abort(503, description="Consent request already used. Please try again") # Get information about this consent request: # False positive: pylint: disable=no-member try: consent_client = consent_request.client # Some versions of Hydra module return a string object and need to be decoded if isinstance(consent_client, str): consent_client = ast.literal_eval(consent_client) app_id = consent_client.get("client_id") # False positive: pylint: disable=no-member kratos_id = consent_request.subject current_app.logger.error(f"Info: Found kratos_id {kratos_id}") current_app.logger.error(f"Info: Found app_id {app_id}") except Exception as ex: current_app.logger.error( "Error: Unable to extract information from consent request" ) current_app.logger.error(f"Error: {ex}") current_app.logger.error(f"Client: {consent_request.client}") current_app.logger.error(f"Subject: {consent_request.subject}") abort(501, description="Internal error occured") # Get the related user object current_app.logger.error(f"Info: Getting user from admin {kratos_id}") user = KratosUser(KRATOS_ADMIN, kratos_id) if not user: current_app.logger.error(f"User not found in database: {kratos_id}") abort(401, description="User not found. Please try again.") # Get role on this app app_obj = db.session.query(App).filter(App.slug == app_id).first() # Default access level roles = [] if app_obj: role_objects = ( db.session.query(AppRole) .filter(AppRole.app_id == app_obj.id) .filter(AppRole.user_id == user.uuid) ) for role_obj in role_objects: app_role = RoleService.get_role_by_id(role_obj.role_id) if (app_role is None): roles.append('user') continue roles.append(app_role.name) current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}") # Get claims for this user, provided the current app claims = user.get_claims(app_id, roles) # pylint: disable=fixme # TODO: Need to implement checking claims here, once the backend for that is # developed current_app.logger.info(f"Providing consent to {app_id} for {kratos_id}") current_app.logger.info(f"{kratos_id} was granted access to {app_id}") # False positive: pylint: disable=no-member return redirect( consent_request.accept( grant_scope=consent_request.requested_scope, grant_access_token_audience=consent_request.requested_access_token_audience, session=claims, ) ) @web.route("/status", methods=["GET", "POST"]) def status(): """Get status of current session Show if there is an user is logged in. If not shows: not-auth """ auth_status = get_auth() if auth_status: return auth_status.id return "not-auth" def get_auth(): """Checks if user is logged in Queries the cookies. If an authentication cookie is found, it checks with Kratos if the cookie is still valid. If so, the profile is returned. Otherwise False is returned. :return: Profile or False if not logged in """ cookie = get_kratos_cookie() if not cookie: return False # Given a cookie, check if it is valid and get the profile try: api_response = KRATOS_PUBLIC.to_session(cookie=cookie) # Get all traits from ID return api_response.identity except ory_kratos_client.ApiException as ex: current_app.logger.error( f"Exception when calling V0alpha2Api->to_session(): {ex}\n" ) return False def get_kratos_cookie(): """Retrieves the Kratos cookie from the session. Returns False if the cookie does not exist or is corrupted. """ try: cookie = request.cookies.get("ory_kratos_session") cookie = "ory_kratos_session=" + cookie except TypeError: current_app.logger.info("User not logged in or cookie corrupted") cookie = False return cookie @web.route("/logout", methods=["GET"]) def logout(): """Handles the Hydra OpenID Connect Logout flow as well as the Kratos logout flow Steps: 1. Hydra's /oauth2/sessions/logout endpoint is called by an application 2. Hydra calls this endpoint with a `logout_challenge` get parameter 3. We retrieve the logout request using the challenge 4. We retrieve the Kratos cookie from the browser 5. We generate a Kratos logout URL 6. We accept the Hydra logout request 7. We redirect to the Kratos logout URL Args: logout_challenge (string): Reference to a Hydra logout challenge object Returns: Redirect to the url that is provided by the LogoutRequest object. """ challenge = request.args.get("logout_challenge") current_app.logger.info("Logout request: challenge=%s", challenge) if not challenge: abort(403) try: logout_request = HYDRA.logout_request(challenge) except hydra_client.exceptions.NotFound: current_app.logger.error("Logout request with challenge '%s' not found", challenge) abort(404, "Hydra session invalid or not found") except hydra_client.exceptions.HTTPError: current_app.logger.error( "Conflict. Logout request with challenge '%s' has been used already.", challenge) abort(503) kratos_cookie = get_kratos_cookie() if not kratos_cookie: abort(404, "Kratos session invalid or not found") try: # Create a Logout URL for Browsers kratos_api_response = \ KRATOS_ADMIN.create_self_service_logout_flow_url_for_browsers( cookie=kratos_cookie) current_app.logger.info(kratos_api_response) except ory_kratos_client.ApiException as ex: current_app.logger.error("Exception when calling" " V0alpha2Api->create_self_service_logout_flow_url_for_browsers: %s\n", ex) hydra_return = logout_request.accept(subject=logout_request.subject) current_app.logger.info("Hydra info: %s", hydra_return) return redirect(kratos_api_response.logout_url)