188 lines
6.2 KiB
Python
188 lines
6.2 KiB
Python
from http.client import HTTPException
|
|
from typing import Dict, Optional
|
|
import requests
|
|
from requests import Request
|
|
import structlog
|
|
|
|
from app.authentik.settings import AuthentikSettings
|
|
from .models import User
|
|
|
|
logging = structlog.get_logger()
|
|
|
|
|
|
class Authentik:
|
|
|
|
def __init__(self, settings: AuthentikSettings):
|
|
self.base = f"{settings.baseurl}api/v3/"
|
|
self.token = settings.token
|
|
self.headers = {"Authorization": f"Bearer {self.token}"}
|
|
self.hook_endpoint = None
|
|
self.property_mapping = None
|
|
self.event_matcher_policy = None
|
|
self.event_transport = None
|
|
self.admin_group = None
|
|
self.event_rule = None
|
|
self.event_rule_link = None
|
|
|
|
# check connection
|
|
r = requests.get(
|
|
url=f"{self.base}/admin/version", headers=self.headers)
|
|
assert r.status_code == 200
|
|
|
|
def get(self, endpoint: str, params: Dict = {}) -> Request:
|
|
return requests.get(url=f"{self.base}{endpoint}", params=params, headers=self.headers)
|
|
|
|
def post(self, endpoint: str, data: Dict = {}) -> Request:
|
|
return requests.post(url=f"{self.base}{endpoint}", json=data, headers=self.headers)
|
|
|
|
def delete(self, endpoint: str) -> Request:
|
|
return requests.delete(url=f"{self.base}{endpoint}", headers=self.headers)
|
|
|
|
def create_web_hook(self, hook_endpoint):
|
|
self.hook_endpoint = hook_endpoint
|
|
self.event_matcher_policy = self.create_event_matcher_policy()
|
|
self.property_mapping = self.create_property_mapping()
|
|
self.event_transport = self.create_event_transport(
|
|
self.hook_endpoint, self.property_mapping["pk"])
|
|
self.admin_group = self.get_admin_group()
|
|
self.event_rule = self.create_event_rule(
|
|
self.admin_group, self.event_transport["pk"])
|
|
self.event_rule_link = self.add_event_rule_link(
|
|
self.event_matcher_policy["pk"], self.event_rule["pk"])
|
|
return self
|
|
|
|
def create_event_transport(self, hook_endpoint, property_mapping_pk):
|
|
url = "events/transports/"
|
|
data = {
|
|
"mode": "webhook",
|
|
"name": "my hook",
|
|
"send_once": False,
|
|
"webhook_mapping": property_mapping_pk,
|
|
"webhook_url": hook_endpoint
|
|
}
|
|
|
|
for result in self.get(url).json()["results"]:
|
|
if result["name"] == "my hook":
|
|
raise Exception("Event Rule already exists")
|
|
|
|
r: Request = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def create_event_rule(self, admin_group, event_transport):
|
|
url = "events/rules/"
|
|
data = {
|
|
"group": admin_group["pk"],
|
|
"name": "event-rule",
|
|
"severity": "notice",
|
|
"transports": [
|
|
event_transport
|
|
]
|
|
}
|
|
|
|
for result in self.get(url).json()["results"]:
|
|
if result["name"] == "event-rule":
|
|
raise Exception("Event Rule already exists")
|
|
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def add_event_rule_link(self, policy_pk, target_pk):
|
|
url = "policies/bindings/"
|
|
data = {
|
|
"enabled": True,
|
|
"group": "",
|
|
"negate": False,
|
|
"order": "0",
|
|
"policy": policy_pk,
|
|
"target": target_pk,
|
|
"timeout": "1",
|
|
"user": ""
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def get_admin_group(self):
|
|
url = "core/groups/"
|
|
args = {
|
|
"is_superuser": True,
|
|
"name": "authentik Admins"
|
|
}
|
|
r = self.get(url, args)
|
|
if r.status_code == 200:
|
|
groups = r.json()["results"]
|
|
if len(groups) == 1:
|
|
self.admin_group = groups[0]
|
|
return self.admin_group
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def create_event_matcher_policy(self):
|
|
url = "policies/event_matcher/"
|
|
data = {
|
|
"action": "model_created",
|
|
"app": "authentik.core",
|
|
"client_ip": "",
|
|
"execution_logging": True,
|
|
"name": "model created"
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
self.event_matcher_policy = r.json()
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def create_property_mapping(self):
|
|
url = "propertymappings/notification/"
|
|
data = {
|
|
"name": "new-mapper",
|
|
"expression": "fields = {}\nif notification:\n if notification.event:\n for k, v in notification.event.context.items():\n fields[k] = v\nreturn fields"
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def get_user_by_pk(self, pk: str) -> Optional[User]:
|
|
r = self.get(f"core/users/{pk}").json()
|
|
if "pk" in r:
|
|
return User(**r)
|
|
raise Exception(r)
|
|
|
|
def get_user(self, user: User) -> Optional[User]:
|
|
if user.pk:
|
|
r = self.get(f"core/users/{user.pk}").json()
|
|
else:
|
|
r = self.get(f"core/users/?search={user.username}").json()
|
|
if len(r["results"]) == 0:
|
|
return None
|
|
if len(r["results"]) > 1:
|
|
raise Exception("More than one user with that username", r)
|
|
r = r["results"][0]
|
|
|
|
if "pk" in r:
|
|
print(r)
|
|
return User(**r)
|
|
raise Exception(r)
|
|
|
|
def create_user(self, user: User) -> User:
|
|
r = self.post("core/users/", user.dict()).json()
|
|
if "pk" in r:
|
|
return User(**r)
|
|
raise Exception(r)
|
|
|
|
def delete_user(self, user: User) -> bool:
|
|
if user == None or user.pk == None:
|
|
raise Exception("Not a valid user")
|
|
|
|
r = self.delete(f"core/users/{user.pk}")
|
|
if r.status_code == 204:
|
|
return True
|
|
r = r.json()
|
|
if("detail" in r):
|
|
if r["detail"] == "Not found.":
|
|
return False
|