From 9c75d36b71ae920ef5f527f7db0b2780cb17ffca Mon Sep 17 00:00:00 2001 From: Davor Date: Tue, 28 Jun 2022 12:23:41 +0200 Subject: [PATCH] if user has admin dashboard role allow admin access --- areas/auth/auth.py | 2 +- web/login/login.py | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/areas/auth/auth.py b/areas/auth/auth.py index a119ffa..c972752 100644 --- a/areas/auth/auth.py +++ b/areas/auth/auth.py @@ -4,7 +4,7 @@ from flask_cors import cross_origin from datetime import timedelta from areas import api_v1 -from areas.apps import AppRole, App +from areas.apps import App, AppRole from config import * from helpers import HydraOauth, BadRequest, KratosApi diff --git a/web/login/login.py b/web/login/login.py index ef54a18..e5d2cc0 100644 --- a/web/login/login.py +++ b/web/login/login.py @@ -73,6 +73,7 @@ def settings(): return render_template("settings.html", api_url=KRATOS_PUBLIC_URL) + @web.route("/error", methods=["GET"]) def error(): """Show error messages from Kratos @@ -85,7 +86,7 @@ def error(): """ error_id = request.args.get("id") - api_response="" + api_response = "" try: # Get Self-Service Errors api_response = KRATOS_ADMIN.get_self_service_error(error_id) @@ -96,6 +97,7 @@ def error(): return render_template("error.html", error_message=api_response) + @web.route("/login", methods=["GET", "POST"]) def login(): """Start login flow @@ -231,8 +233,8 @@ def consent(): app_id = consent_client.get("client_id") # False positive: pylint: disable=no-member kratos_id = consent_request.subject - current_app.logger.error(f"Info: Found kratos_id {kratos_id}") - current_app.logger.error(f"Info: Found app_id {app_id}") + current_app.logger.info(f"Info: Found kratos_id {kratos_id}") + current_app.logger.info(f"Info: Found app_id {app_id}") except Exception as ex: current_app.logger.error( @@ -244,12 +246,34 @@ def consent(): abort(501, description="Internal error occured") # Get the related user object - current_app.logger.error(f"Info: Getting user from admin {kratos_id}") + current_app.logger.info(f"Info: Getting user from admin {kratos_id}") user = KratosUser(KRATOS_ADMIN, kratos_id) if not user: current_app.logger.error(f"User not found in database: {kratos_id}") abort(401, description="User not found. Please try again.") + # Get role on dashboard + dashboard_app = db.session.query(App).filter( + App.slug == 'dashboard').first() + if dashboard_app: + role_object = ( + db.session.query(AppRole) + .filter(AppRole.app_id == dashboard_app.id) + .filter(AppRole.user_id == user.uuid) + .first() + ) + # If the user is dashboard admin admin is for all + if role_object is not None and role_object.role_id == 1: + # Get claims for this user, provided the current app + claims = user.get_claims(app_id, ['admin']) + return redirect( + consent_request.accept( + grant_scope=consent_request.requested_scope, + grant_access_token_audience=consent_request.requested_access_token_audience, + session=claims, + ) + ) + # Get role on this app app_obj = db.session.query(App).filter(App.slug == app_id).first() @@ -337,6 +361,7 @@ def get_auth(): return False + def get_kratos_cookie(): """Retrieves the Kratos cookie from the session.