2022-04-29 14:13:54 +02:00
|
|
|
import logging
|
|
|
|
from time import sleep
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
import requests
|
2022-04-29 15:50:33 +02:00
|
|
|
from fastapi.testclient import TestClient
|
2022-04-29 14:13:54 +02:00
|
|
|
|
2022-04-29 15:50:33 +02:00
|
|
|
from .main import app
|
2022-04-29 14:13:54 +02:00
|
|
|
from app.authentik.api import Authentik
|
|
|
|
from app.authentik.models import User
|
|
|
|
from app.authentik.settings import AuthentikSettings
|
2022-04-29 14:56:51 +02:00
|
|
|
from app.consumer.wekan.models import User as WekanUser
|
|
|
|
from app.consumer.wekan.api import WekanApi
|
2022-04-29 14:13:54 +02:00
|
|
|
|
2022-04-29 15:50:33 +02:00
|
|
|
from app.authentik.test_authentik import settings
|
|
|
|
from app.consumer.wekan.test_wekan import settings as wekan_settings
|
|
|
|
client = TestClient(app)
|
|
|
|
|
2022-04-29 14:13:54 +02:00
|
|
|
|
|
|
|
@pytest.fixture()
|
2022-04-29 15:50:33 +02:00
|
|
|
def wekan(wekan_settings):
|
2022-04-29 14:13:54 +02:00
|
|
|
w = None
|
|
|
|
try:
|
2022-04-29 14:56:51 +02:00
|
|
|
r = requests.post("http://localhost:3000/users/register", json={
|
|
|
|
"username": "api", "password": "foobar123", "email": "foo@example.org"})
|
2022-04-29 15:50:33 +02:00
|
|
|
w = WekanApi(wekan_settings)
|
2022-04-29 14:13:54 +02:00
|
|
|
except Exception as e:
|
|
|
|
logging.error(e)
|
|
|
|
return w
|
|
|
|
|
|
|
|
|
2022-04-29 14:56:51 +02:00
|
|
|
@pytest.fixture()
|
2022-04-29 15:50:33 +02:00
|
|
|
def authentik(settings: AuthentikSettings):
|
2022-04-29 14:13:54 +02:00
|
|
|
a = Authentik(settings)
|
|
|
|
try:
|
2022-04-29 14:56:51 +02:00
|
|
|
r = a.create_web_hook(
|
|
|
|
hook_endpoint="http://172.17.0.1:8000/authentik/hook/") # docker localhost
|
2022-04-29 14:13:54 +02:00
|
|
|
except Exception as e:
|
|
|
|
logging.info(e)
|
|
|
|
return a
|
|
|
|
|
|
|
|
|
2022-04-29 15:50:33 +02:00
|
|
|
@pytest.fixture()
|
|
|
|
def authentik_user(authentik):
|
|
|
|
user = authentik.create_user(User(username="foobar", name="Foo Bar", email="foo@bar.com"))
|
|
|
|
yield user
|
|
|
|
authentik.delete_user(user)
|
|
|
|
print("DELETING USER")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_user(mocker, authentik: Authentik, authentik_user: WekanUser, wekan: WekanApi):
|
|
|
|
# Actually this should already trigger the hook, but in authentik it doesn't trigger when come from api
|
|
|
|
# mock = mocker.patch("app.event_controller.EventController.handle_model_created_event")
|
|
|
|
authentik_message = {"model": {"pk": authentik_user.pk, "app": "authentik_core", "name": authentik_user.name,
|
|
|
|
"model_name": "user"}, "http_request": {"args": {}, "path": "/api/v3/core/users/", "method": "POST"}}
|
|
|
|
response = client.post("/authentik/hook/", json=authentik_message)
|
|
|
|
print(response.text)
|
|
|
|
assert response.status_code == 200
|
|
|
|
wu = wekan.get_user(authentik_user.username)
|
|
|
|
assert not wu == None
|
|
|
|
assert wu.username == authentik_user.username
|
|
|
|
assert authentik_user.email in [i.address for i in wu.emails]
|
|
|
|
|
|
|
|
wekan.delete_user(authentik_user.username) # TODO WTF THIS NOT WORK?
|
2022-04-29 14:13:54 +02:00
|
|
|
|
2022-04-29 14:56:51 +02:00
|
|
|
|
2022-04-29 14:13:54 +02:00
|
|
|
@pytest.mark.skip()
|
|
|
|
def test_create_user_with_same_email(wekan_api, authentik_api):
|
2022-04-29 14:56:51 +02:00
|
|
|
logging.error(
|
|
|
|
"authentik notifcation rule doesn't work with api?? , create two user with identical email in authentik")
|
2022-04-29 14:13:54 +02:00
|
|
|
assert False
|
|
|
|
|
2022-04-29 14:56:51 +02:00
|
|
|
|
2022-04-29 14:13:54 +02:00
|
|
|
@pytest.mark.skip()
|
|
|
|
def test_user_already_exists_excepts():
|
2022-04-29 14:56:51 +02:00
|
|
|
assert False
|