62 lines
1.7 KiB
Python
62 lines
1.7 KiB
Python
from multiprocessing import current_process
|
|
from flask import jsonify, request
|
|
from flask_jwt_extended import create_access_token, jwt_required
|
|
from flask_cors import cross_origin
|
|
from datetime import timedelta
|
|
from helpers.authentik_api import AuthentikApi
|
|
|
|
from areas import api_v1
|
|
from config import *
|
|
from helpers import LITOauth, BadRequest
|
|
|
|
|
|
@api_v1.route("/logout", methods=["POST"])
|
|
@cross_origin()
|
|
@jwt_required()
|
|
def logout():
|
|
res = AuthentikApi.post("/flows/executor/default-invalidation-flow/")
|
|
print(res)
|
|
return jsonify({})
|
|
|
|
@api_v1.route("/login", methods=["POST"])
|
|
@cross_origin()
|
|
def login():
|
|
authorization_url = LITOauth.authorize()
|
|
return jsonify({"authorizationUrl": authorization_url})
|
|
|
|
|
|
@api_v1.route("/hydra/callback")
|
|
@cross_origin()
|
|
def hydra_callback():
|
|
state = request.args.get("state")
|
|
code = request.args.get("code")
|
|
if state == None:
|
|
raise BadRequest("Missing state query param")
|
|
|
|
if code == None:
|
|
raise BadRequest("Missing code query param")
|
|
|
|
token = LITOauth.get_token(state, code)
|
|
user_info = LITOauth.get_user_info()
|
|
access_token = create_access_token(
|
|
identity=token, expires_delta=timedelta(days=365))
|
|
isAdmin = "admin" in user_info["groups"]
|
|
app_roles = [
|
|
{
|
|
"name": "dashboard",
|
|
"role_id": 1 if isAdmin else 2
|
|
},
|
|
]
|
|
|
|
return jsonify(
|
|
{
|
|
"accessToken": access_token,
|
|
"userInfo": {
|
|
"id": user_info["email"],
|
|
"email": user_info["email"],
|
|
"name": user_info["name"],
|
|
"preferredUsername": user_info["preferred_username"],
|
|
"app_roles": app_roles
|
|
}
|
|
}
|
|
)
|