49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
from pydantic import BaseModel
|
|
from fastapi import FastAPI
|
|
from app.authentik.api import Authentik
|
|
from app.authentik.models import User
|
|
from app.wekan.api import Wekan
|
|
from app.settings import AuthentikSettings, WekanSettings
|
|
|
|
|
|
class Authentik_Hook_Model(BaseModel):
|
|
pk: str
|
|
app: str
|
|
name: str
|
|
model_name: str
|
|
|
|
|
|
class Http_request(BaseModel):
|
|
args: dict
|
|
path: str
|
|
method: str
|
|
|
|
|
|
authentikSettings = AuthentikSettings()
|
|
wekanSettings = WekanSettings()
|
|
|
|
|
|
class EventController:
|
|
|
|
def __init__(self):
|
|
try:
|
|
self.authentik = Authentik(
|
|
authentikSettings.token, authentikSettings.baseurl)
|
|
self.wekan = Wekan(wekanSettings.baseurl,
|
|
wekanSettings.user, wekanSettings.password)
|
|
except Exception as e:
|
|
raise Exception("Failed to init Api", e)
|
|
self.jobs = []
|
|
pass
|
|
|
|
def register_api(self, authentik: Authentik, wekan: Wekan):
|
|
self.authentik = authentik
|
|
self.wekan = wekan
|
|
|
|
def handle_model_created_event(self, model: Authentik_Hook_Model):
|
|
user: User = self.authentik.get_user_by_pk(model.pk)
|
|
if not self.wekan.get_user(user.name):
|
|
self.wekan.create_user(
|
|
username=user.username, email=user.email, password="")
|
|
return True
|