dashboard/backend/helpers/authentik_api.py
2022-11-08 09:55:05 +01:00

43 lines
No EOL
1.4 KiB
Python

from typing import List
from flask_jwt_extended import get_jwt
import requests
from .error_handler import AuthentikError
AUTHENTIK_BASEURL = "https://dev.local-it.cloud/api/v3"
class AuthentikApi: # TODO: check if can be replaced with apispec generated api?
@staticmethod
def __handleError(res):
if res.status_code >= 400:
message = res.json()["error"]["message"]
raise AuthentikError(message, res.status_code)
@staticmethod
def __token():
jwt = get_jwt()
return jwt["sub"]["refresh_token"]
@staticmethod
def get(url):
try:
res = requests.get(f"{AUTHENTIK_BASEURL}{url}", headers={
"Authorization": f"Bearer {AuthentikApi.__token()}"})
AuthentikApi.__handleError(res)
if (res.json()["pagination"]):
return AuthentikApi.__paginate(res)
return res.json()
except AuthentikError as err:
raise err
except:
raise AuthentikError()
@staticmethod
def __paginate(res: requests.Response): # TODO: test this
results = res.json()["results"]
for page in range(1, res.json()["pagination"]["total_pages"]):
res = requests.get(
f"{res.request.url}", headers=res.request.headers, params={'page': page})
AuthentikApi.__handleError(res)
results.append(res.json()["results"])
return results