diff --git a/Makefile b/Makefile index 7818cc3..6125d1a 100644 --- a/Makefile +++ b/Makefile @@ -8,4 +8,6 @@ run: ./env/bin/uvicorn app.main:app --reload test: - ./env/bin/pytest app \ No newline at end of file + ./env/bin/pytest app + +.PHONY: init run test \ No newline at end of file diff --git a/README.md b/README.md index 9e0193a..5b97531 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,9 @@ does what nobody else want's to do. make init make test make run -``` \ No newline at end of file +``` + + + +https://pydantic-docs.helpmanual.io/ +https://jsontopydantic.com/ \ No newline at end of file diff --git a/app/authentik.py b/app/authentik.py new file mode 100644 index 0000000..bf6060b --- /dev/null +++ b/app/authentik.py @@ -0,0 +1,130 @@ +from typing import Dict +import requests +from requests import Request + + +class Authentik: + + def __init__(self, token, base="https://sso.lit.yksflip.de/"): + self.base = f"{base}api/v3/" + self.hook_endpoint = "https://webhook.site/0caf7e4d-c853-4e33-995e-dc12f82481f0" + self.token = token + self.headers = {"Authorization": f"Bearer {token}"} + self.property_mapping = None + self.event_matcher_policy = None + self.event_transport = None + self.admin_group = None + self.event_rule = None + self.event_rule_link = None + + def post(self, endpoint: str, data: Dict) -> Request: + return requests.post(url= f"{self.base}{endpoint}", json=data, headers=self.headers) + + def get(self, endpoint: str, params: Dict) -> Request: + return requests.get(url=f"{self.base}{endpoint}", params=params, headers=self.headers) + + def create_web_hook(self): + self.event_matcher_policy = self.create_event_matcher_policy() + self.property_mapping = self.create_property_mapping() + self.event_transport = self.create_event_transport(self.hook_endpoint, self.property_mapping["pk"]) + self.admin_group = self.get_admin_group() + self.event_rule = self.create_event_rule(self.admin_group, self.event_transport["pk"]) + self.event_rule_link = self.add_event_rule_link(self.event_matcher_policy["pk"], self.event_rule["pk"]) + return self + + def create_event_transport(self, hook_endpoint, property_mapping_pk): + url = "events/transports/" + data = { + "mode": "webhook", + "name": "my hook", + "send_once": False, + "webhook_mapping": property_mapping_pk, + "webhook_url": hook_endpoint + } + print(data) + # TODO: add check if model with same name already exists + r: Request = self.post(url, data) + if r.status_code == 201: + return r.json() + raise Exception(r.status_code, r.url, r.text) + + def create_event_rule(self, admin_group, event_transport): + url = "events/rules/" + data = { + "group": admin_group["pk"], + "name": "event-rule", + "severity": "notice", + "transports": [ + event_transport + ] + } + # TODO: add check if model with same name already exists + r = self.post(url, data) + if r.status_code == 201: + return r.json() + raise Exception(r.status_code, r.url, r.text) + + def add_event_rule_link(self, policy_pk, target_pk): + url = "policies/bindings/" + data = { + "enabled": True, + "group": "", + "negate": False, + "order": "0", + "policy": policy_pk, + "target": target_pk, + "timeout": "1", + "user": "" + } + r = self.post(url, data) + if r.status_code == 201: + return r.json() + raise Exception(r.status_code, r.url, r.text) + + + def get_admin_group(self): + url = "core/groups/" + args = { + "is_superuser": True, + "name": "authentik Admins" + } + r = self.get(url, args) + if r.status_code == 200: + groups = r.json()["results"] + if len(groups) == 1: + self.admin_group = groups[0] + return self.admin_group + raise Exception(r.status_code, r.url, r.text) + + + def create_event_matcher_policy(self): + url = "policies/event_matcher/" + data = { + "action": "model_created", + "app": "authentik.core", + "client_ip": "", + "execution_logging": True, + "name": "model created" + } + r = self.post(url, data) + if r.status_code == 201: + self.event_matcher_policy = r.json() + return r.json() + raise Exception(r.status_code, r.url, r.text) + + + def create_property_mapping(self): + url = "propertymappings/notification/" + data = { + "name": "new-mapper", + "expression": "fields = {}\nif notification:\n if notification.event:\n for k, v in notification.event.context.items():\n fields[k] = v\nreturn fields" + } + r = self.post(url, data) + if r.status_code == 201: + return r.json() + raise Exception(r.status_code, r.url, r.text) + + + def get_user(self, user_pk): + pass + diff --git a/app/main.py b/app/main.py index d275f0e..0630a09 100644 --- a/app/main.py +++ b/app/main.py @@ -1,8 +1,15 @@ -from fastapi import FastAPI +from fastapi import FastAPI, Request +from .wekan.api import Wekan +import json app = FastAPI() - @app.get("/") async def root(): - return {'message': 'Hello World'} \ No newline at end of file + return {'message': 'Hello World'} + +@app.post("/hook") +async def hook(request: Request): + r = await request.json() + # model_created = json.loads(r['body'].split("model_created: ")[1])["model"] + # return wekan.create_user(model_created["pk"]) diff --git a/app/test_authentik.py b/app/test_authentik.py new file mode 100644 index 0000000..c9945cd --- /dev/null +++ b/app/test_authentik.py @@ -0,0 +1,12 @@ +from .authentik import Authentik + + +def test_connection(): + a = Authentik(token="foobar123") + + # res = a.create_property_mapping() + # res = a.create_event_matcher_policy() + # res = a.create_event_transport(hook_endpoint="http://localhost:8000") + # res = a.get_admin_group() + res = a.create_web_hook() + print(res) diff --git a/app/test_main.py b/app/test_main.py index fd860bb..baaccd3 100644 --- a/app/test_main.py +++ b/app/test_main.py @@ -1,5 +1,7 @@ from fastapi.testclient import TestClient +from app.wekan.api import Wekan + from .main import app client = TestClient(app) @@ -9,3 +11,35 @@ def test_read_main(): response = client.get("/") assert response.status_code == 200 assert response.json() == {"message": "Hello World"} + + +def test_hook(): + d = """{ + "body": "Test Notification from transport hook", + "severity": "notice", + "user_email": "root@localhost", + "user_username": "akadmin" + }""" + response = client.post("/hook", data=d) + assert response.status_code == 200 + + +def test_hook_model_created(mocker): + mock = mocker.patch('app.wekan.api.Wekan.create_user', + return_value='fake user') + print(mock.mock_calls) + d = """ + { + "body": "model_created: {'model': {'pk': 18, 'app': 'authentik_core', 'name': 'bernd', 'model_name': 'user'}, 'http_request': {'args': {}, 'path': '/api/v3/core/users/', 'method': 'POST'}}", + "severity": "alert", + "user_email": "flip@yksflip.de", + "user_username": "akadmin" + } + """ + response = client.post("/hook", data=d, ) + assert response.status_code == 200 + assert len(mock.mock_calls) > 0 + kall = mock.call_args + assert kall.args[0] == "18" + # assert str(response.text) == 'fake user' + diff --git a/app/wekan/__init__.py b/app/wekan/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/wekan/api.py b/app/wekan/api.py new file mode 100644 index 0000000..5665548 --- /dev/null +++ b/app/wekan/api.py @@ -0,0 +1,78 @@ +from optparse import Option +import string +import requests +from requests import Request +from pydantic import BaseModel, Field +from typing import Any, Dict, List, Optional + +from app.wekan.models import User, UserBase + + +class Wekan: + + def __init__(self, base: str, api_user: str, password: str): + self.base = base + r = requests.post(f'{base}/users/login', data={ + "username": api_user, + "password": password + }, ) + if not r.status_code == 200: + raise Exception(r.url, r.text) + + t = r.json() + self.token = { + "token": t["token"], + "id": t["id"], + "expires": t["tokenExpires"], + } + + self.headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': f'Bearer { self.token["token"] }', + } + + def get(self, endpoint) -> Request: + return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers) + + def post(self, endpoint: str, data: Dict) -> Request: + return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers) + + def delete(self, endpoint: str) -> Request: + return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers) + + def get_user(self, _id: string) -> Optional[User]: + r = self.get(f"users/{_id}").json() + if "error" in r: + raise Exception(r) + if r == {}: + return None + print(r) + return User(**r) + + def get_all_users(self) -> List[UserBase]: + r = self.get("users") + if r.status_code == 200: + return [UserBase(**u) for u in r.json()] + raise Exception() + + def create_user(self, username: str, email: str, password: str) -> Optional[User]: + data = { + "username": username, + "email": email, + "password": "" + } + r = self.post("users/", data).json() + if "error" in r: + if r["reason"] == "Username already exists.": + return self.get_user(username) + raise Exception(r) + if "_id" in r: + return self.get_user(r["_id"]) + raise Exception() + + def delete_user(self, user: str): + r = self.delete(f"users/{user}").json() + print(r) + if "error" in r: + raise Exception(r) diff --git a/app/wekan/models.py b/app/wekan/models.py new file mode 100644 index 0000000..c436a8f --- /dev/null +++ b/app/wekan/models.py @@ -0,0 +1,79 @@ +from pydantic import BaseModel, Field +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + +class Org(BaseModel): + orgId: str + orgDisplayName: str + +class Team(BaseModel): + teamId: str + teamDisplayName: str + +class Email(BaseModel): + address: str + verified: bool + +class Notification(BaseModel): + activity: str + read: str + +class Profile(BaseModel): + avatarUrl: Optional[str] + emailBuffer: Optional[List[str]] + fullname: Optional[str] + showDesktopDragHandles: Optional[bool] + hideCheckedItems: Optional[bool] + cardMaximized: Optional[bool] + customFieldsGrid: Optional[bool] + hiddenSystemMessages: Optional[bool] + hiddenMinicardLabelText: Optional[bool] + initials: Optional[str] + invitedBoards: Optional[List[str]] + language: Optional[str] + moveAndCopyDialog: Optional[Dict[str, Any]] + moveChecklistDialog: Optional[Dict[str, Any]] + copyChecklistDialog: Optional[Dict[str, Any]] + notifications: Optional[List[Notification]] + showCardsCountAt: Optional[int] + startDayOfWeek: Optional[int] + starredBoards: Optional[List[str]] + icode: Optional[str] + boardView: str + listSortBy: str + templatesBoardId: str + cardTemplatesSwimlaneId: str + listTemplatesSwimlaneId: str + boardTemplatesSwimlaneId: str + +class SessionData(BaseModel): + totalHits: Optional[int] + +class Member(BaseModel): + id: str = Field(..., alias="userId") + isAdmin: bool = False + isNoComments: bool = False + isCommentOnly: bool = False + isWorker: Optional[bool] = False + +class UserBase(BaseModel): + id: str = Field(..., alias='_id') + username: Optional[str] = None + +class User(UserBase): + username: str + orgs: Optional[List[Org]] + teams: Optional[List[Team]] + emails: List[Email] + createdAt: str + modifiedAt: str + profile: Optional[Profile] + services: Dict[str, Any] + heartbeat: Optional[str] + isAdmin: Optional[bool] + createdThroughApi: Optional[bool] + loginDisabled: Optional[bool] + authenticationMethod: str + sessionData: SessionData + importUsernames: Optional[List[Optional[str]]] diff --git a/app/wekan/test_wekan.py b/app/wekan/test_wekan.py new file mode 100644 index 0000000..b35ecbc --- /dev/null +++ b/app/wekan/test_wekan.py @@ -0,0 +1,28 @@ +from app.wekan.models import User, UserBase +from .api import Wekan +import pytest + +@pytest.fixture +def api() -> Wekan: + return Wekan("https://board.lit.yksflip.de", "api", "foobar123") + +def test_get_user(api: Wekan): + user = api.get_user("api") + assert user.username == "api" + assert type(user) == User + + user = api.get_user("doesnotexist") + assert user == None + +def test_get_users(api: Wekan): + assert True if "api" in [u.username for u in api.get_all_users()] else False + +def test_create_user(api: Wekan): + user = api.create_user("foo", "foo@bar.com", "") + assert api.get_user("foo").username == "foo" + assert type(user) is User + +def test_delete_user(api: Wekan): + api.create_user("foo", "foo@bar.com", "") + api.delete_user("foo") # TODO: doesn't work? + assert api.get_user("foo") == None