introduce admin area

introduce admin area
first poc for connecting the authentik api

Co-authored-by: Philipp Rothmann <philipprothmann@posteo.de>
Reviewed-on: #2
This commit is contained in:
philipp 2022-11-08 16:36:16 +01:00
parent 8d760e588f
commit 44e4e4eb42
35 changed files with 367 additions and 133 deletions

View file

@ -2,3 +2,4 @@ from .kratos_api import *
from .error_handler import *
from .hydra_oauth import *
from .kratos_user import *
from .lit_oauth import *

View file

@ -12,6 +12,7 @@ def admin_required():
def decorator(*args, **kwargs):
verify_jwt_in_request()
claims = get_jwt()
user_id = claims["user_id"]
is_admin = RoleService.is_user_admin(user_id)
if is_admin:

View file

@ -0,0 +1,42 @@
from typing import List
from flask_jwt_extended import get_jwt
import requests
from config import AUTHENTIK_BASEURL
from .error_handler import AuthentikError
class AuthentikApi: # TODO: check if can be replaced with apispec generated api?
@staticmethod
def __handleError(res):
if res.status_code >= 400:
message = res.json()["error"]["message"]
raise AuthentikError(message, res.status_code)
@staticmethod
def __token():
jwt = get_jwt()
return jwt["sub"]["refresh_token"]
@staticmethod
def get(url):
try:
res = requests.get(f"{AUTHENTIK_BASEURL}{url}", headers={
"Authorization": f"Bearer {AuthentikApi.__token()}"})
AuthentikApi.__handleError(res)
if ("pagination" in res.json()):
return AuthentikApi.__paginate(res)
return res.json()
except AuthentikError as err:
raise err
except:
raise AuthentikError()
@staticmethod
def __paginate(res: requests.Response): # TODO: test this
results = res.json()["results"]
for page in range(1, res.json()["pagination"]["total_pages"]):
res = requests.get(
f"{res.request.url}", headers=res.request.headers, params={'page': page})
AuthentikApi.__handleError(res)
results.append(res.json()["results"])
return results

View file

@ -5,6 +5,8 @@ from jsonschema import ValidationError
class KratosError(Exception):
pass
class AuthentikError(Exception):
pass
class HydraError(Exception):
pass

View file

@ -9,7 +9,7 @@ class HydraOauth:
@staticmethod
def authorize():
try:
hydra = OAuth2Session(HYDRA_CLIENT_ID, redirect_uri=REDIRECT_URL)
hydra = OAuth2Session(HYDRA_CLIENT_ID)
authorization_url, state = hydra.authorization_url(
HYDRA_AUTHORIZATION_BASE_URL
)

View file

@ -4,7 +4,6 @@ import requests
from config import *
from .error_handler import KratosError
class KratosApi:
@staticmethod
def __handleError(res):

View file

@ -0,0 +1,51 @@
from flask import request, session
from requests_oauthlib import OAuth2Session
from config import *
from helpers import HydraError
class LITOauth:
@staticmethod
def authorize():
try:
scopes = ["openid", "email", "profile", "goauthentik.io/api"]
oauth = OAuth2Session(HYDRA_CLIENT_ID, redirect_uri=REDIRECT_URL, scope=scopes)
authorization_url, state = oauth.authorization_url(
HYDRA_AUTHORIZATION_BASE_URL
)
return authorization_url
except Exception as err:
raise HydraError(str(err), 500)
@staticmethod
def get_token(state, code):
try:
oauth = OAuth2Session(
client_id=HYDRA_CLIENT_ID,
state=state,
)
token = oauth.fetch_token(
token_url=TOKEN_URL,
code=code,
client_secret=HYDRA_CLIENT_SECRET,
include_client_id=True,
)
session["oauth_token"] = token
return token
except Exception as err:
raise HydraError(str(err), 500)
@staticmethod
def get_user_info():
try:
hydra = OAuth2Session(
client_id=HYDRA_CLIENT_ID, token=session["oauth_token"]
)
user_info = hydra.get("{}/userinfo".format(HYDRA_PUBLIC_URL))
return user_info.json()
except Exception as err:
raise HydraError(str(err), 500)