159 lines
No EOL
5.3 KiB
Python
159 lines
No EOL
5.3 KiB
Python
from typing import Dict
|
|
import requests
|
|
from requests import Request
|
|
|
|
from app.authentik.models import User
|
|
|
|
|
|
class Authentik:
|
|
|
|
def __init__(self, token, base):
|
|
self.base = f"{base}api/v3/"
|
|
self.token = token
|
|
self.headers = {"Authorization": f"Bearer {token}"}
|
|
self.hook_endpoint = None
|
|
self.property_mapping = None
|
|
self.event_matcher_policy = None
|
|
self.event_transport = None
|
|
self.admin_group = None
|
|
self.event_rule = None
|
|
self.event_rule_link = None
|
|
|
|
def get(self, endpoint: str, params: Dict = {}) -> Request:
|
|
return requests.get(url=f"{self.base}{endpoint}", params=params, headers=self.headers)
|
|
|
|
def post(self, endpoint: str, data: Dict = {}) -> Request:
|
|
return requests.post(url=f"{self.base}{endpoint}", json=data, headers=self.headers)
|
|
|
|
def delete(self, endpoint: str) -> Request:
|
|
return requests.delete(url=f"{self.base}{endpoint}", headers=self.headers)
|
|
|
|
|
|
def create_web_hook(self, hook_endpoint):
|
|
self.hook_endpoint = hook_endpoint
|
|
self.event_matcher_policy = self.create_event_matcher_policy()
|
|
self.property_mapping = self.create_property_mapping()
|
|
self.event_transport = self.create_event_transport(self.hook_endpoint, self.property_mapping["pk"])
|
|
self.admin_group = self.get_admin_group()
|
|
self.event_rule = self.create_event_rule(self.admin_group, self.event_transport["pk"])
|
|
self.event_rule_link = self.add_event_rule_link(self.event_matcher_policy["pk"], self.event_rule["pk"])
|
|
return self
|
|
|
|
def create_event_transport(self, hook_endpoint, property_mapping_pk):
|
|
url = "events/transports/"
|
|
data = {
|
|
"mode": "webhook",
|
|
"name": "my hook",
|
|
"send_once": False,
|
|
"webhook_mapping": property_mapping_pk,
|
|
"webhook_url": hook_endpoint
|
|
}
|
|
print(data)
|
|
# TODO: add check if model with same name already exists
|
|
r: Request = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def create_event_rule(self, admin_group, event_transport):
|
|
url = "events/rules/"
|
|
data = {
|
|
"group": admin_group["pk"],
|
|
"name": "event-rule",
|
|
"severity": "notice",
|
|
"transports": [
|
|
event_transport
|
|
]
|
|
}
|
|
# TODO: add check if model with same name already exists
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
def add_event_rule_link(self, policy_pk, target_pk):
|
|
url = "policies/bindings/"
|
|
data = {
|
|
"enabled": True,
|
|
"group": "",
|
|
"negate": False,
|
|
"order": "0",
|
|
"policy": policy_pk,
|
|
"target": target_pk,
|
|
"timeout": "1",
|
|
"user": ""
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
|
|
def get_admin_group(self):
|
|
url = "core/groups/"
|
|
args = {
|
|
"is_superuser": True,
|
|
"name": "authentik Admins"
|
|
}
|
|
r = self.get(url, args)
|
|
if r.status_code == 200:
|
|
groups = r.json()["results"]
|
|
if len(groups) == 1:
|
|
self.admin_group = groups[0]
|
|
return self.admin_group
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
|
|
def create_event_matcher_policy(self):
|
|
url = "policies/event_matcher/"
|
|
data = {
|
|
"action": "model_created",
|
|
"app": "authentik.core",
|
|
"client_ip": "",
|
|
"execution_logging": True,
|
|
"name": "model created"
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
self.event_matcher_policy = r.json()
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
|
|
def create_property_mapping(self):
|
|
url = "propertymappings/notification/"
|
|
data = {
|
|
"name": "new-mapper",
|
|
"expression": "fields = {}\nif notification:\n if notification.event:\n for k, v in notification.event.context.items():\n fields[k] = v\nreturn fields"
|
|
}
|
|
r = self.post(url, data)
|
|
if r.status_code == 201:
|
|
return r.json()
|
|
raise Exception(r.status_code, r.url, r.text)
|
|
|
|
|
|
def get_user(self, user: User):
|
|
if user.pk:
|
|
r = self.get(f"core/users/{user.pk}").json()
|
|
else:
|
|
r = self.get(f"core/users/?search={user.username}").json()
|
|
if len(r["results"]) > 1 :
|
|
raise Exception("More than one user with that username", r)
|
|
if len(r["results"]) == 0 :
|
|
raise Exception("No user with that username", r)
|
|
r = r["results"][0]
|
|
|
|
if "pk" in r:
|
|
return User(**r)
|
|
raise Exception(r)
|
|
|
|
|
|
def create_user(self, user: User) -> User:
|
|
r = self.post("core/users/", user.dict()).json()
|
|
if "pk" in r:
|
|
return User(**r)
|
|
raise Exception(r)
|
|
|
|
def delete_user(self, user: User):
|
|
r = self.delete(f"core/users/{user.username}")
|
|
print(r.json()) |