add authentik api get users
This commit is contained in:
parent
143ea888c8
commit
9b2e82d6e5
18 changed files with 223 additions and 81 deletions
|
|
@ -2,3 +2,4 @@ from .kratos_api import *
|
|||
from .error_handler import *
|
||||
from .hydra_oauth import *
|
||||
from .kratos_user import *
|
||||
from .lit_oauth import *
|
||||
|
|
@ -11,9 +11,10 @@ def admin_required():
|
|||
@wraps(fn)
|
||||
def decorator(*args, **kwargs):
|
||||
verify_jwt_in_request()
|
||||
claims = get_jwt()
|
||||
user_id = claims["user_id"]
|
||||
is_admin = RoleService.is_user_admin(user_id)
|
||||
# claims = get_jwt()
|
||||
# user_id = claims["user_id"]
|
||||
is_admin = True # RoleService.is_user_admin(user_id)
|
||||
# TODO: actually check if admin
|
||||
if is_admin:
|
||||
return fn(*args, **kwargs)
|
||||
else:
|
||||
|
|
|
|||
43
backend/helpers/authentik_api.py
Normal file
43
backend/helpers/authentik_api.py
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
from typing import List
|
||||
from flask_jwt_extended import get_jwt
|
||||
import requests
|
||||
from .error_handler import AuthentikError
|
||||
|
||||
AUTHENTIK_BASEURL = "https://dev.local-it.cloud/api/v3"
|
||||
|
||||
|
||||
class AuthentikApi: # TODO: check if can be replaced with apispec generated api?
|
||||
@staticmethod
|
||||
def __handleError(res):
|
||||
if res.status_code >= 400:
|
||||
message = res.json()["error"]["message"]
|
||||
raise AuthentikError(message, res.status_code)
|
||||
|
||||
@staticmethod
|
||||
def __token():
|
||||
jwt = get_jwt()
|
||||
return jwt["sub"]["refresh_token"]
|
||||
|
||||
@staticmethod
|
||||
def get(url):
|
||||
try:
|
||||
res = requests.get(f"{AUTHENTIK_BASEURL}{url}", headers={
|
||||
"Authorization": f"Bearer {AuthentikApi.__token()}"})
|
||||
AuthentikApi.__handleError(res)
|
||||
if (res.json()["pagination"]):
|
||||
return AuthentikApi.__paginate(res)
|
||||
return res.json()
|
||||
except AuthentikError as err:
|
||||
raise err
|
||||
except:
|
||||
raise AuthentikError()
|
||||
|
||||
@staticmethod
|
||||
def __paginate(res: requests.Response): # TODO: test this
|
||||
results = res.json()["results"]
|
||||
for page in range(1, res.json()["pagination"]["total_pages"]):
|
||||
res = requests.get(
|
||||
f"{res.request.url}", headers=res.request.headers, params={'page': page})
|
||||
AuthentikApi.__handleError(res)
|
||||
results.append(res.json()["results"])
|
||||
return results
|
||||
|
|
@ -5,6 +5,8 @@ from jsonschema import ValidationError
|
|||
class KratosError(Exception):
|
||||
pass
|
||||
|
||||
class AuthentikError(Exception):
|
||||
pass
|
||||
|
||||
class HydraError(Exception):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ class HydraOauth:
|
|||
@staticmethod
|
||||
def authorize():
|
||||
try:
|
||||
hydra = OAuth2Session(HYDRA_CLIENT_ID, redirect_uri=REDIRECT_URL)
|
||||
hydra = OAuth2Session(HYDRA_CLIENT_ID)
|
||||
authorization_url, state = hydra.authorization_url(
|
||||
HYDRA_AUTHORIZATION_BASE_URL
|
||||
)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import requests
|
|||
from config import *
|
||||
from .error_handler import KratosError
|
||||
|
||||
|
||||
class KratosApi:
|
||||
@staticmethod
|
||||
def __handleError(res):
|
||||
|
|
|
|||
|
|
@ -178,7 +178,7 @@ class KratosUser():
|
|||
cookie_csrf = None
|
||||
cookie_session = None
|
||||
for cookie in cookies:
|
||||
search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie)
|
||||
search = re.match(r'`ory_kratos_session`=([^;]*);.*$', cookie)
|
||||
if search:
|
||||
cookie_session = "ory_kratos_session=" + search.group(1)
|
||||
search = re.match(r'(csrf_token[^;]*);.*$', cookie)
|
||||
|
|
|
|||
51
backend/helpers/lit_oauth.py
Normal file
51
backend/helpers/lit_oauth.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
from flask import request, session
|
||||
from requests_oauthlib import OAuth2Session
|
||||
|
||||
from config import *
|
||||
from helpers import HydraError
|
||||
|
||||
|
||||
class LITOauth:
|
||||
@staticmethod
|
||||
def authorize():
|
||||
try:
|
||||
scopes = ["openid", "email", "profile", "goauthentik.io/api"]
|
||||
oauth = OAuth2Session(HYDRA_CLIENT_ID, redirect_uri=REDIRECT_URL, scope=scopes)
|
||||
authorization_url, state = oauth.authorization_url(
|
||||
HYDRA_AUTHORIZATION_BASE_URL
|
||||
)
|
||||
return authorization_url
|
||||
except Exception as err:
|
||||
raise HydraError(str(err), 500)
|
||||
|
||||
@staticmethod
|
||||
def get_token(state, code):
|
||||
try:
|
||||
oauth = OAuth2Session(
|
||||
client_id=HYDRA_CLIENT_ID,
|
||||
state=state,
|
||||
)
|
||||
token = oauth.fetch_token(
|
||||
token_url=TOKEN_URL,
|
||||
code=code,
|
||||
client_secret=HYDRA_CLIENT_SECRET,
|
||||
include_client_id=True,
|
||||
)
|
||||
|
||||
session["oauth_token"] = token
|
||||
|
||||
return token
|
||||
except Exception as err:
|
||||
raise HydraError(str(err), 500)
|
||||
|
||||
@staticmethod
|
||||
def get_user_info():
|
||||
try:
|
||||
hydra = OAuth2Session(
|
||||
client_id=HYDRA_CLIENT_ID, token=session["oauth_token"]
|
||||
)
|
||||
user_info = hydra.get("{}/userinfo".format(HYDRA_PUBLIC_URL))
|
||||
|
||||
return user_info.json()
|
||||
except Exception as err:
|
||||
raise HydraError(str(err), 500)
|
||||
Reference in a new issue