dashboard/backend/areas/auth/lit_auth.py

63 lines
1.7 KiB
Python
Raw Permalink Normal View History

from multiprocessing import current_process
from flask import jsonify, request
2022-11-08 16:37:18 +01:00
from flask_jwt_extended import create_access_token, jwt_required
from flask_cors import cross_origin
from datetime import timedelta
2022-11-08 16:37:18 +01:00
from helpers.authentik_api import AuthentikApi
from areas import api_v1
from config import *
from helpers import LITOauth, BadRequest
2022-11-08 16:37:18 +01:00
@api_v1.route("/logout", methods=["POST"])
@cross_origin()
@jwt_required()
def logout():
res = AuthentikApi.post("/flows/executor/default-invalidation-flow/")
print(res)
return jsonify({})
@api_v1.route("/login", methods=["POST"])
@cross_origin()
def login():
authorization_url = LITOauth.authorize()
return jsonify({"authorizationUrl": authorization_url})
@api_v1.route("/hydra/callback")
@cross_origin()
def hydra_callback():
state = request.args.get("state")
code = request.args.get("code")
if state == None:
raise BadRequest("Missing state query param")
if code == None:
raise BadRequest("Missing code query param")
token = LITOauth.get_token(state, code)
user_info = LITOauth.get_user_info()
access_token = create_access_token(
identity=token, expires_delta=timedelta(days=365))
isAdmin = "admin" in user_info["groups"]
app_roles = [
{
"name": "dashboard",
"role_id": 1 if isAdmin else 2
},
]
return jsonify(
{
"accessToken": access_token,
"userInfo": {
"id": user_info["email"],
"email": user_info["email"],
"name": user_info["name"],
"preferredUsername": user_info["preferred_username"],
"app_roles": app_roles
}
}
)