from multiprocessing import current_process from flask import jsonify, request from flask_jwt_extended import create_access_token, jwt_required from flask_cors import cross_origin from datetime import timedelta from helpers.authentik_api import AuthentikApi from areas import api_v1 from config import * from helpers import LITOauth, BadRequest @api_v1.route("/logout", methods=["POST"]) @cross_origin() @jwt_required() def logout(): res = AuthentikApi.post("/flows/executor/default-invalidation-flow/") print(res) return jsonify({}) @api_v1.route("/login", methods=["POST"]) @cross_origin() def login(): authorization_url = LITOauth.authorize() return jsonify({"authorizationUrl": authorization_url}) @api_v1.route("/hydra/callback") @cross_origin() def hydra_callback(): state = request.args.get("state") code = request.args.get("code") if state == None: raise BadRequest("Missing state query param") if code == None: raise BadRequest("Missing code query param") token = LITOauth.get_token(state, code) user_info = LITOauth.get_user_info() access_token = create_access_token( identity=token, expires_delta=timedelta(days=365)) isAdmin = "admin" in user_info["groups"] app_roles = [ { "name": "dashboard", "role_id": 1 if isAdmin else 2 }, ] return jsonify( { "accessToken": access_token, "userInfo": { "id": user_info["email"], "email": user_info["email"], "name": user_info["name"], "preferredUsername": user_info["preferred_username"], "app_roles": app_roles } } )