Compare commits

...

5 Commits

Author SHA1 Message Date
Philipp Rothmann bbf81e143e fails when user exists 2022-04-29 16:55:23 +02:00
Philipp Rothmann 23bdcc7957 testytest 2022-04-29 16:43:47 +02:00
Philipp Rothmann 3034bb3b69 fix wekan delete 2022-04-29 16:32:10 +02:00
Philipp Rothmann 5b85957012 asd 2022-04-29 15:59:09 +02:00
Philipp Rothmann 4b05414998 add dronefile 2022-04-29 15:58:11 +02:00
13 changed files with 53 additions and 50 deletions

View File

@ -3,8 +3,9 @@ name: default
steps:
- name: test
image: python
image: docker/compose:1.23.2
commands:
- pip install -r requirements.txt
- pytest
- make init
- make up
- make test

View File

@ -1,5 +1,6 @@
init:
cp .env.sample .env
python3 -m venv env
. ./env/bin/activate
pip3 install -r requirements.txt
@ -18,7 +19,7 @@ run:
test:
./env/bin/pytest app
./env/bin/pytest -s --sw app
integration:
./env/bin/uvicorn app.main:app --reload --host 0.0.0.0

View File

@ -5,7 +5,7 @@ from requests import Request
import structlog
from app.authentik.settings import AuthentikSettings
from .models import User
from .models import AuthentikUser
logging = structlog.get_logger()
@ -147,13 +147,13 @@ class Authentik:
return r.json()
raise Exception(r.status_code, r.url, r.text)
def get_user_by_pk(self, pk: str) -> Optional[User]:
def get_user_by_pk(self, pk: str) -> Optional[AuthentikUser]:
r = self.get(f"core/users/{pk}").json()
if "pk" in r:
return User(**r)
return AuthentikUser(**r)
raise Exception(r)
def get_user(self, user: User) -> Optional[User]:
def get_user(self, user: AuthentikUser) -> Optional[AuthentikUser]:
if user.pk:
r = self.get(f"core/users/{user.pk}").json()
else:
@ -165,17 +165,16 @@ class Authentik:
r = r["results"][0]
if "pk" in r:
print(r)
return User(**r)
return AuthentikUser(**r)
raise Exception(r)
def create_user(self, user: User) -> User:
def create_user(self, user: AuthentikUser) -> AuthentikUser:
r = self.post("core/users/", user.dict()).json()
if "pk" in r:
return User(**r)
return AuthentikUser(**r)
raise Exception(r)
def delete_user(self, user: User) -> bool:
def delete_user(self, user: AuthentikUser) -> bool:
if user == None or user.pk == None:
raise Exception("Not a valid user")

View File

@ -28,7 +28,7 @@ class GroupsObjItem(BaseModel):
attributes: Dict[str, Any]
users_obj: List[UsersObjItem]
class User(BaseUser):
class AuthentikUser(BaseUser):
pk: Optional[str]
username: str
name: str

View File

@ -3,7 +3,7 @@ import logging
from app.authentik.settings import AuthentikSettings
from .api import Authentik
from .models import User
from .models import AuthentikUser
import pytest
@pytest.fixture()
@ -34,7 +34,7 @@ def test_create_event_rule_already_exists(api: Authentik):
assert not exception == None # TODO create exception types
def test_get_user_by_username(api: Authentik):
u = User(username="akadmin",
u = AuthentikUser(username="akadmin",
name="",
groups=[],
email="",
@ -47,7 +47,7 @@ def test_get_user_by_username(api: Authentik):
def test_create_user(api: Authentik):
u = User(username="banane",
u = AuthentikUser(username="banane",
name="banane",
groups=[],
email="foo@example.org",

View File

@ -49,7 +49,6 @@ class WekanApi:
raise Exception(r)
if r == {}:
return None
print(r)
return User(**r)
def get_all_users(self) -> List[UserBase]:
@ -73,8 +72,7 @@ class WekanApi:
return self.get_user(r["_id"])
raise Exception()
def delete_user(self, user: str):
r = self.delete(f"users/{user}").json()
print(r)
def delete_user(self, id: str):
r = self.delete(f"users/{id}").json()
if "error" in r:
raise Exception(r)

View File

@ -25,13 +25,9 @@ def test_get_user(api: WekanApi):
def test_get_users(api: WekanApi):
assert True if "api" in [u.username for u in api.get_all_users()] else False
def test_create_user(api: WekanApi):
user = api.create_user("foo", "foo@bar.com", "")
def test_create_and_delete_user(api: WekanApi):
user = api.create_user("foo", "foo42@bar.com", "")
assert api.get_user("foo").username == "foo"
assert type(user) is User
def test_delete_user(api: WekanApi):
api.create_user("foo", "foo@bar.com", "")
api.delete_user("foo") # TODO: doesn't work?
pytest.skip("smth wrong with wekan api")
api.delete_user(user.id)
assert api.get_user("foo") == None

View File

@ -3,7 +3,7 @@ from typing import List
from pydantic import BaseModel
from fastapi import FastAPI, Depends
from app.authentik.api import Authentik
from app.authentik.models import User
from app.authentik.models import AuthentikUser
from app.consumer.baseConsumer import Consumer
from app.consumer.wekan.api import WekanApi
from app.consumer.wekan.main import WekanConsumer
@ -32,6 +32,7 @@ class EventController:
# try:
self._authentik = authentik_api
self._sinks = []
self._exceptions = []
for sc in Consumer.__subclasses__():
obj = sc()
self._sinks.append(obj)
@ -47,11 +48,12 @@ class EventController:
self._sinks = sinks
def handle_model_created_event(self, model: Authentik_Hook_Model):
user: User = self._authentik.get_user_by_pk(model.pk)
user: AuthentikUser = self._authentik.get_user_by_pk(model.pk)
for sink in self._sinks: # TODO this could run async
logging.info(f"Creating User {user.username} in {sink.__class__}")
try:
sink.create_user(user)
except Exception as e:
logging.error("create user", exception=str(e), sink=sink, user=user)
self._exceptions.append(str(e))
return True

View File

@ -5,7 +5,7 @@ from fastapi import Depends, FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
from app.consumer.baseConsumer import BaseUser, Consumer
from app.authentik.api import Authentik
from app.authentik.models import User
from app.authentik.models import AuthentikUser
from app.event_controller import Authentik_Hook_Model, EventController, Http_request
from app.authentik.settings import AuthentikSettings
from .consumer.wekan.api import WekanApi
@ -42,7 +42,7 @@ async def create_demo_user(request: Request):
a = Authentik(base="http://localhost:9000/", token="foobar123")
try:
user = a.create_user(
User(username="demo", name="dmeo", email="foo@example.org"))
AuthentikUser(username="demo", name="dmeo", email="foo@example.org"))
except Exception as e: # TODO
return e
logging.info(user)

View File

@ -7,6 +7,7 @@ from app.authentik.settings import AuthentikSettings
router = APIRouter()
ec = EventController(Authentik(AuthentikSettings()))
@router.post("/authentik/hook/")
async def hook(model: Authentik_Hook_Model,
@ -15,7 +16,10 @@ async def hook(model: Authentik_Hook_Model,
):
logging.info(model)
logging.info(http_request)
ec = EventController(Authentik(AuthentikSettings()))
if http_request.path == "/api/v3/core/users/":
background_tasks.add_task(ec.handle_model_created_event, model)
return 200
@router.get("/authentik/errors/")
async def errors():
return ec._exceptions

View File

@ -1,14 +1,14 @@
from app.authentik.models import User
from app.authentik.models import AuthentikUser
from pytest_mock import MockerFixture
from .event_controller import Authentik_Hook_Model, EventController
import pytest
@pytest.fixture()
def mock_user():
return User(pk="5", username="asd", name="asd", email="asd@example.org")
return AuthentikUser(pk="5", username="asd", name="asd", email="asd@example.org")
def test_handle_model_created_event(mocker: MockerFixture, mock_user: User):
def test_handle_model_created_event(mocker: MockerFixture, mock_user: AuthentikUser):
wekan_mock = mocker.MagicMock()
wekan_mock.get_user.return_value = None
nextcloud_mock = mocker.MagicMock()

View File

@ -7,7 +7,7 @@ from fastapi.testclient import TestClient
from .main import app
from app.authentik.api import Authentik
from app.authentik.models import User
from app.authentik.models import AuthentikUser
from app.authentik.settings import AuthentikSettings
from app.consumer.wekan.models import User as WekanUser
from app.consumer.wekan.api import WekanApi
@ -36,33 +36,30 @@ def authentik(settings: AuthentikSettings):
r = a.create_web_hook(
hook_endpoint="http://172.17.0.1:8000/authentik/hook/") # docker localhost
except Exception as e:
logging.info(e)
logging.error(e)
return a
@pytest.fixture()
def authentik_user(authentik):
user = authentik.create_user(User(username="foobar", name="Foo Bar", email="foo@bar.com"))
user = authentik.create_user(AuthentikUser(username="foobar", name="Foo Bar", email="foo@bar.com"))
yield user
authentik.delete_user(user)
print("DELETING USER")
def test_create_user(mocker, authentik: Authentik, authentik_user: WekanUser, wekan: WekanApi):
# Actually this should already trigger the hook, but in authentik it doesn't trigger when come from api
def test_create_user(mocker, authentik_user: AuthentikUser, wekan: WekanApi):
# Actually authentik user creation should already trigger the hook, but in authentik it doesn't trigger when come from api
# mock = mocker.patch("app.event_controller.EventController.handle_model_created_event")
authentik_message = {"model": {"pk": authentik_user.pk, "app": "authentik_core", "name": authentik_user.name,
"model_name": "user"}, "http_request": {"args": {}, "path": "/api/v3/core/users/", "method": "POST"}}
response = client.post("/authentik/hook/", json=authentik_message)
print(response.text)
assert response.status_code == 200
wu = wekan.get_user(authentik_user.username)
assert not wu == None
assert wu.username == authentik_user.username
assert authentik_user.email in [i.address for i in wu.emails]
wekan.delete_user(authentik_user.username) # TODO WTF THIS NOT WORK?
wekan.delete_user(wu.id)
@pytest.mark.skip()
@ -71,7 +68,13 @@ def test_create_user_with_same_email(wekan_api, authentik_api):
"authentik notifcation rule doesn't work with api?? , create two user with identical email in authentik")
assert False
@pytest.mark.skip()
def test_user_already_exists_excepts():
assert False
def test_user_already_exists_excepts(authentik_user: AuthentikUser, wekan: WekanApi):
authentik_message = {"model": {"pk": authentik_user.pk, "app": "authentik_core", "name": authentik_user.name,
"model_name": "user"}, "http_request": {"args": {}, "path": "/api/v3/core/users/", "method": "POST"}}
response = client.post("/authentik/hook/", json=authentik_message)
response = client.post("/authentik/hook/", json=authentik_message)
assert response.status_code == 200
errors = client.get("/authentik/errors/")
assert errors.status_code == 200
assert errors.json() == ["[Wekan] User already exists"] # TODO introduce error models
wekan.delete_user(wekan.get_user(authentik_user.username).id)

View File

@ -21,7 +21,6 @@ def test_hook_fails_for_wrong_input():
"user_username": "akadmin"
}"""
response = client.post("/authentik/hook/", data=d)
print(response.text)
assert response.status_code == 422
def test_hook_model_created(mocker):