This commit is contained in:
Philipp Rothmann 2022-04-29 14:56:51 +02:00
parent 104a576908
commit 73c5440454
23 changed files with 94 additions and 78 deletions

View file

@ -1,2 +0,0 @@
from .wekan import main
from .nextcloud import main

View file

@ -4,6 +4,8 @@ from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from app.consumer.baseConsumer import BaseUser
class UsersObjItem(BaseModel):
pk: int
@ -26,7 +28,7 @@ class GroupsObjItem(BaseModel):
attributes: Dict[str, Any]
users_obj: List[UsersObjItem]
class User(BaseModel):
class User(BaseUser):
pk: Optional[str]
username: str
name: str

View file

@ -1,17 +1,19 @@
import email
import logging
from app.authentik.settings import AuthentikSettings
from .api import Authentik
from .models import User
import pytest
@pytest.fixture()
def settings() -> AuthentikSettings:
return AuthentikSettings(
baseurl="http://localhost:9000/", token="foobar123",)
@pytest.fixture
def api() -> Authentik:
try:
return Authentik(base="http://localhost:9000/", token="foobar123", )
except:
pytest.skip("API not reachable? Did you start docker-compose?")
def api(settings: AuthentikSettings) -> Authentik:
return Authentik(settings)
def test_create_event_transport_already_exists(api: Authentik):

2
app/consumer/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .wekan import main
from .nextcloud import main

View file

@ -4,7 +4,6 @@ from pydantic import BaseModel
class BaseUser(BaseModel):
email: str
id: str
name: str
username: str
@ -12,7 +11,7 @@ class BaseGroup(BaseModel):
name: str
class Sink(ABC):
class Consumer(ABC):
@abstractproperty
def api(self):

View file

@ -1,11 +1,11 @@
from unittest.mock import MagicMock
from app.sink import BaseGroup, BaseUser, Sink
from app.consumer.baseConsumer import BaseGroup, BaseUser, Consumer
class Api:
def create_user(self):
pass
class NextcloudSink(Sink):
class NextcloudConsumer(Consumer):
def __init__(self):
self._api: Api = Api()

View file

@ -0,0 +1,16 @@
from .nextcloud.main import BaseUser, Consumer
from .wekan import main
from .nextcloud import main
from pytest_mock import MockerFixture
import pytest
@pytest.fixture
def testUser():
return BaseUser(email="foo", id="asd", name="sd", username="sdfsfd")
def test_get_subclasses():
l = []
for sc in Consumer.__subclasses__():
l.append(sc.__name__)
assert "NextcloudConsumer" in l
assert "WekanConsumer" in l

View file

@ -1,12 +1,12 @@
from unittest.mock import MagicMock
from .settings import WekanSettings
from app.sink import BaseGroup, BaseUser, Sink
from app.consumer.baseConsumer import BaseGroup, BaseUser, Consumer
from .api import WekanApi
from .models import User
class WekanSink(Sink):
class WekanConsumer(Consumer):
def __init__(self):
self._settings = WekanSettings()

View file

@ -1,15 +1,17 @@
import requests
from app.wekan.models import User, UserBase
from app.consumer.wekan.models import User, UserBase
from app.consumer.wekan.settings import WekanSettings
from .api import WekanApi
import pytest
@pytest.fixture()
def settings() -> WekanSettings:
return WekanSettings(baseurl="http://localhost:3000", user="api", password="foobar123")
@pytest.fixture
def api() -> WekanApi:
try:
r = requests.post("http://localhost:3000/users/register", json={"username": "api", "password": "foobar123", "email": "foo@example.org"})
return WekanApi("http://localhost:3000", "api", "foobar123")
except:
pytest.skip("API not reachable? Did you start docker-compose?")
def api(settings: WekanSettings) -> WekanApi:
r = requests.post("http://localhost:3000/users/register", json={"username": "api", "password": "foobar123", "email": "foo@example.org"})
return WekanApi(settings)
def test_get_user(api: WekanApi):

View file

@ -4,10 +4,10 @@ from pydantic import BaseModel
from fastapi import FastAPI, Depends
from app.authentik.api import Authentik
from app.authentik.models import User
from app.sink import Sink
from app.wekan.api import WekanApi
from app.consumer.baseConsumer import Consumer
from app.consumer.wekan.api import WekanApi
from app.consumer.wekan.main import WekanConsumer
from app.authentik.settings import AuthentikSettings
from app.wekan.main import WekanSink
logging = structlog.get_logger()
@ -32,7 +32,7 @@ class EventController:
# try:
self._authentik = authentik_api
self._sinks = []
for sc in Sink.__subclasses__():
for sc in Consumer.__subclasses__():
obj = sc()
self._sinks.append(obj)
@ -42,7 +42,7 @@ class EventController:
self.jobs = []
pass
def register_api(self, authentik: Authentik, sinks: List[Sink]):
def register_api(self, authentik: Authentik, sinks: List[Consumer]):
self._authentik = authentik
self._sinks = sinks

View file

@ -3,12 +3,12 @@ from unicodedata import name
from urllib.error import HTTPError
from fastapi import Depends, FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
from app.sink import BaseUser, Sink
from app.consumer.baseConsumer import BaseUser, Consumer
from app.authentik.api import Authentik
from app.authentik.models import User
from app.event_controller import Authentik_Hook_Model, EventController, Http_request
from app.authentik.settings import AuthentikSettings
from .wekan.api import WekanApi
from .consumer.wekan.api import WekanApi
from .routers import identityProvider
import json
@ -22,7 +22,14 @@ app.include_router(identityProvider.router)
async def root():
return {'message': 'Hello World'}
@app.get("/consumer/")
async def get_consumer():
l = []
for sc in Consumer.__subclasses__():
l.append(sc.__name__)
return l
### for testing purposes
@app.get("/authentik/create_hook/")
async def hook(request: Request):

0
app/test_dependencies.py Normal file
View file

View file

@ -3,19 +3,29 @@ from pytest_mock import MockerFixture
from .event_controller import Authentik_Hook_Model, EventController
import pytest
@pytest.fixture()
def mock_user():
return User(pk="5", username="asd", name="asd", email="asd@example.org")
def test_handle_model_created_event(mocker: MockerFixture):
mock_user = User(pk="5", username="asd", name="asd", email="asd@example.org")
def test_handle_model_created_event(mocker: MockerFixture, mock_user: User):
wekan_mock = mocker.MagicMock()
wekan_mock.get_user.return_value = None
nextcloud_mock = mocker.MagicMock()
wekan_mock.get_user.return_value = None
authentik_mock = mocker.MagicMock()
authentik_mock.get_user_by_pk.return_value = mock_user
model = Authentik_Hook_Model(pk=mock_user.pk, app="authentik_core", name=mock_user.name, model_name="user")
ec = EventController(authentik_mock)
ec.register_api(authentik_mock, [wekan_mock])
ec.register_api(authentik_mock, [wekan_mock, nextcloud_mock])
ec.handle_model_created_event(model)
ec._authentik.get_user_by_pk.assert_called()
ec._authentik.get_user_by_pk.assert_called_with("5")
wekan_mock.create_user.assert_called()
wekan_mock.create_user.assert_called_with(mock_user)
nextcloud_mock.create_user.assert_called()
nextcloud_mock.create_user.assert_called_with(mock_user)

View file

@ -1,58 +1,61 @@
from cmath import log
from email.mime import base
import logging
from re import A
from time import sleep
import pytest
import requests
from app import event_controller
from app.authentik.api import Authentik
from app.authentik.models import User
from app.authentik.settings import AuthentikSettings
from app.wekan.models import User as WekanUser
from app.wekan.api import WekanApi
from app.consumer.wekan.models import User as WekanUser
from app.consumer.wekan.api import WekanApi
@pytest.fixture()
def wekan_api():
w = None
try:
r = requests.post("http://localhost:3000/users/register", json={"username": "api", "password": "foobar123", "email": "foo@example.org"})
w = WekanApi("http://localhost:3000","api", "foobar123")
r = requests.post("http://localhost:3000/users/register", json={
"username": "api", "password": "foobar123", "email": "foo@example.org"})
w = WekanApi("http://localhost:3000", "api", "foobar123")
except Exception as e:
logging.error(e)
return w
@pytest.fixture()
def authentik_api():
settings = AuthentikSettings(baseurl="http://localhost:9000/", token="foobar123")
@pytest.fixture()
def authentik_api(settings: AuthentikSettings):
a = Authentik(settings)
try:
r = a.create_web_hook(hook_endpoint="http://172.17.0.1:8000/authentik/hook/") # docker localhost
r = a.create_web_hook(
hook_endpoint="http://172.17.0.1:8000/authentik/hook/") # docker localhost
except Exception as e:
logging.info(e)
return a
@pytest.mark.skip()
def test_create_user(wekan_api: WekanApi, authentik_api: Authentik):
user = authentik_api.create_user(User(username="banane43", email="banane@example.org", name="Herr Banane"))
user = authentik_api.create_user(
User(username="banane43", email="banane@example.org", name="Herr Banane"))
print(user)
sleep(5)
authentik_api.delete_user(user)
# authentik username == wekan username
# user must be created from authentik user and noch api to trigger the notiftication rule?
logging.error("authentik notifcation rule doesn't work with api?? , plz create user in authentik")
logging.error(
"authentik notifcation rule doesn't work with api?? , plz create user in authentik")
assert False
@pytest.mark.skip()
def test_create_user_with_same_email(wekan_api, authentik_api):
logging.error("authentik notifcation rule doesn't work with api?? , create two user with identical email in authentik")
logging.error(
"authentik notifcation rule doesn't work with api?? , create two user with identical email in authentik")
assert False
@pytest.mark.skip()
def test_user_already_exists_excepts():
assert False
assert False

View file

@ -1,7 +1,7 @@
from fastapi.testclient import TestClient
import pytest
from app.wekan.api import WekanApi
from app.consumer.wekan.api import WekanApi
from .main import Authentik_Hook_Model, app

View file

@ -1,28 +0,0 @@
from .nextcloud.main import BaseUser, Sink
from .wekan import main
from .nextcloud import main
from pytest_mock import MockerFixture
import pytest
# TODO how to import all sink from smth like a "addons" folder?
@pytest.fixture
def testUser():
return BaseUser(email="foo", id="asd", name="sd", username="sdfsfd")
def test_get_subclasses():
l = []
for sc in Sink.__subclasses__():
l.append(sc.__name__)
assert "NextcloudSink" in l
assert "WekanSink" in l
@pytest.mark.skip()
def test_create_user(mocker: MockerFixture, testUser: BaseUser):
u = []
for sc in Sink.__subclasses__():
print(sc.__name__)
# app = sc()
# app.api = mocker.MagicMock()
# app.create_user(testUser)
# app.api.create_user.assert_called()

View file

@ -1,6 +1,7 @@
anyio==3.5.0
asgiref==3.5.0
attrs==21.4.0
autopep8==1.6.0
certifi==2021.10.8
charset-normalizer==2.0.12
click==8.1.2
@ -11,6 +12,7 @@ iniconfig==1.1.1
packaging==21.3
pluggy==1.0.0
py==1.11.0
pycodestyle==2.8.0
pydantic==1.9.0
pyparsing==3.0.8
pytest==7.1.2
@ -20,6 +22,7 @@ requests==2.27.1
sniffio==1.2.0
starlette==0.17.1
structlog==21.5.0
toml==0.10.2
tomli==2.0.1
typing_extensions==4.2.0
urllib3==1.26.9