from http.client import HTTPException from typing import Dict, Optional import requests from requests import Request import structlog from app.authentik.settings import AuthentikSettings from .models import User logging = structlog.get_logger() class Authentik: def __init__(self, settings: AuthentikSettings): self.base = f"{settings.baseurl}api/v3/" self.token = settings.token self.headers = {"Authorization": f"Bearer {self.token}"} self.hook_endpoint = None self.property_mapping = None self.event_matcher_policy = None self.event_transport = None self.admin_group = None self.event_rule = None self.event_rule_link = None # check connection r = requests.get( url=f"{self.base}/admin/version", headers=self.headers) assert r.status_code == 200 def get(self, endpoint: str, params: Dict = {}) -> Request: return requests.get(url=f"{self.base}{endpoint}", params=params, headers=self.headers) def post(self, endpoint: str, data: Dict = {}) -> Request: return requests.post(url=f"{self.base}{endpoint}", json=data, headers=self.headers) def delete(self, endpoint: str) -> Request: return requests.delete(url=f"{self.base}{endpoint}", headers=self.headers) def create_web_hook(self, hook_endpoint): self.hook_endpoint = hook_endpoint self.event_matcher_policy = self.create_event_matcher_policy() self.property_mapping = self.create_property_mapping() self.event_transport = self.create_event_transport( self.hook_endpoint, self.property_mapping["pk"]) self.admin_group = self.get_admin_group() self.event_rule = self.create_event_rule( self.admin_group, self.event_transport["pk"]) self.event_rule_link = self.add_event_rule_link( self.event_matcher_policy["pk"], self.event_rule["pk"]) return self def create_event_transport(self, hook_endpoint, property_mapping_pk): url = "events/transports/" data = { "mode": "webhook", "name": "my hook", "send_once": False, "webhook_mapping": property_mapping_pk, "webhook_url": hook_endpoint } for result in self.get(url).json()["results"]: if result["name"] == "my hook": raise Exception("Event Rule already exists") r: Request = self.post(url, data) if r.status_code == 201: return r.json() raise Exception(r.status_code, r.url, r.text) def create_event_rule(self, admin_group, event_transport): url = "events/rules/" data = { "group": admin_group["pk"], "name": "event-rule", "severity": "notice", "transports": [ event_transport ] } for result in self.get(url).json()["results"]: if result["name"] == "event-rule": raise Exception("Event Rule already exists") r = self.post(url, data) if r.status_code == 201: return r.json() raise Exception(r.status_code, r.url, r.text) def add_event_rule_link(self, policy_pk, target_pk): url = "policies/bindings/" data = { "enabled": True, "group": "", "negate": False, "order": "0", "policy": policy_pk, "target": target_pk, "timeout": "1", "user": "" } r = self.post(url, data) if r.status_code == 201: return r.json() raise Exception(r.status_code, r.url, r.text) def get_admin_group(self): url = "core/groups/" args = { "is_superuser": True, "name": "authentik Admins" } r = self.get(url, args) if r.status_code == 200: groups = r.json()["results"] if len(groups) == 1: self.admin_group = groups[0] return self.admin_group raise Exception(r.status_code, r.url, r.text) def create_event_matcher_policy(self): url = "policies/event_matcher/" data = { "action": "model_created", "app": "authentik.core", "client_ip": "", "execution_logging": True, "name": "model created" } r = self.post(url, data) if r.status_code == 201: self.event_matcher_policy = r.json() return r.json() raise Exception(r.status_code, r.url, r.text) def create_property_mapping(self): url = "propertymappings/notification/" data = { "name": "new-mapper", "expression": "fields = {}\nif notification:\n if notification.event:\n for k, v in notification.event.context.items():\n fields[k] = v\nreturn fields" } r = self.post(url, data) if r.status_code == 201: return r.json() raise Exception(r.status_code, r.url, r.text) def get_user_by_pk(self, pk: str) -> Optional[User]: r = self.get(f"core/users/{pk}").json() if "pk" in r: return User(**r) raise Exception(r) def get_user(self, user: User) -> Optional[User]: if user.pk: r = self.get(f"core/users/{user.pk}").json() else: r = self.get(f"core/users/?search={user.username}").json() if len(r["results"]) == 0: return None if len(r["results"]) > 1: raise Exception("More than one user with that username", r) r = r["results"][0] if "pk" in r: print(r) return User(**r) raise Exception(r) def create_user(self, user: User) -> User: r = self.post("core/users/", user.dict()).json() if "pk" in r: return User(**r) raise Exception(r) def delete_user(self, user: User) -> bool: if user == None or user.pk == None: raise Exception("Not a valid user") r = self.delete(f"core/users/{user.pk}") if r.status_code == 204: return True r = r.json() if("detail" in r): if r["detail"] == "Not found.": return False