integrate/app/event_controller.py

55 lines
1.4 KiB
Python

import structlog
from typing import List
from pydantic import BaseModel
from fastapi import FastAPI, Depends
from app.authentik.api import Authentik
from app.authentik.models import User
from app.sink import Sink
from app.wekan.api import WekanApi
from app.authentik.settings import AuthentikSettings
from app.wekan.main import WekanSink
logging = structlog.get_logger()
class Authentik_Hook_Model(BaseModel):
pk: str
app: str
name: str
model_name: str
class Http_request(BaseModel):
args: dict
path: str
method: str
authentikSettings = AuthentikSettings()
class EventController:
def __init__(self, authentik_api: Authentik):
# try:
self._authentik = authentik_api
self._sinks = []
for sc in Sink.__subclasses__():
obj = sc()
self._sinks.append(obj)
# except Exception as e:
# raise Exception("Failed to init Api", e)
self.jobs = []
pass
def register_api(self, authentik: Authentik, sinks: List[Sink]):
self._authentik = authentik
self._sinks = sinks
def handle_model_created_event(self, model: Authentik_Hook_Model):
user: User = self._authentik.get_user_by_pk(model.pk)
for sink in self._sinks:
logging.info(f"Creating User {user.username} in {sink.__class__}")
sink.create_user(user)
return True