new-features (#5)

* refactoring and rework: runner now has setups / tests / cleanups as lists
* add nextcloud runner
* add email testing prototype with imap fixture
* add dependency resolution (sort env files in input so that test order is correct)

Reviewed-on: local-it-infrastructure/e2e_tests#5
Co-authored-by: Daniel <d.brummerloh@gmail.com>
Co-committed-by: Daniel <d.brummerloh@gmail.com>
This commit is contained in:
Daniel 2023-12-04 12:46:30 +01:00 committed by dan
parent 2e33f8f014
commit d3dc0f942a
32 changed files with 573 additions and 247 deletions

View file

@ -1,4 +1,4 @@
# Clone
# GIT Clone
To clone with submodules, use these git commands:
@ -8,7 +8,24 @@ git submodule update --init // add submodule after normal cloning
git submodule update --remote // update submodules
```
# Run
# Run without Docker
### Installation
Create a python environment and install all dependencies via
```bash
pip install -r requirements.txt
playwright install
```
Run the script with
```bash
python main.py
```
# Run with Docker
```bash
docker compose build
@ -28,3 +45,11 @@ Force rebuild wtihtout cache
```bash
docker-compose build --no-cache
```
# Codegen
Use playwright codegen to create code for new testes easily https://playwright.dev/python/docs/codegen
```bash
playwright codegen demo.playwright.dev/todomvc
```

View file

@ -17,6 +17,8 @@ from src.utils import get_session_id
# requires that authentik ran first to create the admin session and the user
# session. At the moment, wrong ordering results in unsuccessful test
# (wrong ordering would be wordpress env file is before authentik env file).
# At the moment, functionailty is only guaranteed if each env file use
# a unique TYPE var.
ENV_FILES = [
Path("envfiles/login.test.dev.local-it.cloud.env"), # authentik
@ -38,12 +40,13 @@ OUTPUT_DIR = Path("./test-output").resolve()
# --------------------- load credentials to env variables -------------------- #
cred_file = Path("credentials.json")
with open(cred_file, "r") as f:
CREDENTIALS = json.load(f)
os.environ["ADMIN_USER"] = CREDENTIALS["admin_user"]
os.environ["ADMIN_PASS"] = CREDENTIALS["admin_pass"]
for key, value in CREDENTIALS.items():
os.environ[key] = value
# ----------------------------- define session_id ---------------------------- #
@ -56,7 +59,7 @@ session_id = get_session_id()
# ------------------------------- setup logging ------------------------------ #
DIR = DirManager(output_dir=OUTPUT_DIR, session_id=session_id)
log_file = DIR.RESULTS / "full.log"
log_file = DIR.RECORDS / "coordinator.log"
logger.add(log_file)

0
prototyping/__init__.py Normal file
View file

View file

@ -0,0 +1,47 @@
# %%
import email
import json
from email.header import decode_header
from imaplib import IMAP4, IMAP4_SSL
from pathlib import Path
# -------------------------------- credentials ------------------------------- #
cred_file = Path("../credentials.json")
with open(cred_file, "r") as f:
CREDENTIALS = json.load(f)
username = CREDENTIALS["imap_user"]
password = CREDENTIALS["imap_pass"]
# ----------------------------------- imap ----------------------------------- #
host = "mail.local-it.org"
imap_port = 143
imap_ssl_port = 993
with IMAP4_SSL(host=host) as imap_server:
imap_server.login(username, password)
imap_server.select("INBOX")
# Search for all emails in the folder
status, email_ids = imap_server.search(None, "ALL")
email_ids = email_ids[0].split()
# Fetch email details using the retrieved IDs
for email_id in email_ids:
result, data = imap_server.fetch(email_id, "(RFC822)")
raw_email = data[0][1] # Raw content of the email
email_message = email.message_from_bytes(raw_email)
# Extract the subject
subject_encoded = email_message.get("Subject")
decoded_subject = decode_header(subject_encoded)[0][0]
if isinstance(decoded_subject, bytes):
decoded_subject = decoded_subject.decode()
# Print or use the subject as needed
print("Subject:", decoded_subject)

View file

@ -0,0 +1,10 @@
# %%
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
string = "blog.dev.local-it.cloud"
parsed_url = urlparse(string, scheme="https")
print(parsed_url)
print(urlunparse(parsed_url))

View file

@ -19,4 +19,5 @@ line-length = 120
target-version = "py311"
[tool.pytest.ini_options]
python_files = "test_*.py setup*.py"
python_functions = "test_* setup_*"
norecursedirs = "previous-work src"

View file

@ -1,4 +1,5 @@
pytest
pytest-html
pytest-playwright
python-dotenv
icecream

View file

@ -5,15 +5,28 @@
# sys.path. It is thus good practise for projects to either put conftest.py under
# a package scope or to never import anything from a conftest.py file.
import os
from imaplib import IMAP4_SSL
from pathlib import Path
import pytest
from dotenv import dotenv_values
from playwright.sync_api import BrowserContext, expect
from pytest import Parser
from src.dirmanager import DirManager
TIMEOUT = 5000
# global timeout and LOCALE
LOCALE = {"Accept-Language": "de_DE"}
TIMEOUT = 7_000
expect.set_options(timeout=TIMEOUT)
@pytest.fixture
def context(context: BrowserContext) -> BrowserContext:
context.set_default_timeout(TIMEOUT)
context.set_extra_http_headers(LOCALE)
return context
def pytest_addoption(parser: Parser):
@ -58,3 +71,18 @@ def dotenv_config(request) -> dict[str, str]:
dotenv_path = Path(dotenv_path)
assert dotenv_path.is_file()
return dotenv_values(dotenv_path) # type: ignore
@pytest.fixture(scope="session")
def imap_ssl_email_client() -> None:
assert os.environ["IMAP_HOST"]
assert os.environ["IMAP_PORT"]
assert os.environ["IMAP_USER"]
assert os.environ["IMAP_PASS"]
port = int(os.environ["IMAP_PORT"])
imap_client = IMAP4_SSL(host=os.environ["IMAP_HOST"], port=port)
imap_client.login(os.environ["IMAP_USER"], os.environ["IMAP_PASS"])
imap_client.select("INBOX")
yield imap_client
imap_client.close()
imap_client.logout()

View file

@ -5,21 +5,24 @@ from dotenv import dotenv_values
from loguru import logger
from src.dirmanager import DirManager
from src.env_file_helper import DependencyRule, EnvFile, sort_env_files_by_rule
from src.html_helper import merge_html_files
from src.runner import Runner
from src.tests_authentik.runner_authentik import RunnerAuthentik
from src.tests_nextcloud.runner_nextcloud import RunnerNextcloud
from src.tests_wordpress.runner_wordpress import RunnerWordpress
from src.utils import rmtree
# Register all runners here. Each .env file with TYPE=authentik will be ran with RunnerAuthentik
# Register all runners here. Each .env file with TYPE=authentik will be run with RunnerAuthentik
RUNNER_DICT: dict[str, type[Runner]] = {
"authentik": RunnerAuthentik,
"wordpress": RunnerWordpress,
"nextcloud": RunnerNextcloud,
}
class Coordinator:
def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str):
def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str) -> None:
out_string = "".join([e.name + "\n" for e in env_paths_list])
out_string += f"output_dir = {output_dir}\n"
out_string += f"session_id = {session_id}"
@ -29,47 +32,67 @@ class Coordinator:
self.output_dir = output_dir
self.session_id = session_id
self.env_paths: dict[str, Path] = dict()
self.env_configs: dict[str, dict[str, str]] = dict() # todo: needed?
self._parse_env_files(env_paths_list)
# parse env files
self.env_files: list[EnvFile] = self._getn_env_files_list(env_paths_list)
self.dependency_rules: list[DependencyRule] = self._get_dependency_rules(self.env_files)
def _parse_env_files(self, env_paths: list[Path]):
@staticmethod
def _getn_env_files_list(env_paths: list[Path]) -> list[EnvFile]:
"""Returns a list of EnvFile objects created from the given env files"""
env_files: list[EnvFile] = []
for env_path in env_paths:
assert env_path.is_file(), f"the env file {env_path} does not exist"
config: dict[str, str] = dotenv_values(env_path) # type: ignore
assert "TYPE" in config, f"the env file {env_path} does not specify the required TYPE key."
env_type = config["TYPE"]
self.env_paths[env_type] = env_path
self.env_configs[env_type] = config # todo: needed?
env_files.append(EnvFile(env_path=env_path, config=config, env_type=env_type))
return env_files
def setup_test(self):
@staticmethod
def _get_dependency_rules(env_files: list[EnvFile]) -> list[DependencyRule]:
dependency_rules: list[DependencyRule] = []
for env_file in env_files:
child_runner_class = RUNNER_DICT[env_file.env_type]
for dependency in child_runner_class.dependencies:
dependency_rule = DependencyRule(child=child_runner_class.name, dependency=dependency.name)
dependency_rules.append(dependency_rule)
return dependency_rules
def setup_test(self) -> None:
logger.info("calling setup_test()")
self.DIR.create_all_dirs()
self._copy_env_files()
def _copy_env_files(self):
def _copy_env_files(self) -> None:
"""Copies all env files to STATES/env_files. Files will be renamed to their own TYPE value."""
env_files_dir = self.DIR.STATES / "env_files"
env_files_dir.mkdir(exist_ok=True)
for type_key, env_path in self.env_paths.items():
shutil.copy(env_path, env_files_dir / type_key)
for env_file in self.env_files:
shutil.copy(env_file.env_path, env_files_dir / env_file.env_type)
def run_test(self):
def run_test(self) -> None:
logger.info("calling run_test()")
self.runners: list[Runner] = self._load_runners(self.env_paths.values())
self.runners: list[Runner] = self._load_runners(self.env_files)
for runner in self.runners:
runner.run_setups()
for runner in self.runners:
runner.run_tests()
for runner in self.runners:
runner.run_cleanups()
logger.info("run_test() finished")
def _load_runners(self, env_files: list[Path]) -> list[Runner]:
def _load_runners(self, env_files: list[EnvFile]) -> list[Runner]:
"""Creates an instance of the correct Runner class for each given env file"""
runners = []
for env_file in env_files:
config: dict[str, str] = dotenv_values(env_file) # type: ignore
RunnerClass = RUNNER_DICT[config["TYPE"]]
runners.append(RunnerClass(dotenv_path=env_file, output_dir=self.output_dir, session_id=self.session_id))
RunnerClass = RUNNER_DICT[env_file.config["TYPE"]]
runners.append(
RunnerClass(dotenv_path=env_file.env_path, output_dir=self.output_dir, session_id=self.session_id)
)
return runners
def combine_html(self):
def combine_html(self) -> None:
"""combines all generated pytest html reports into one"""
in_path = str(self.DIR.RECORDS / "html")
out_path = str(self.DIR.RECORDS / "full-report.html")
title = "combined.html"

View file

@ -30,12 +30,12 @@ class DirManager:
)
@property
def OUTPUT(self):
def OUTPUT_DIR(self):
return self._output_dir
@property
def SESSION(self):
return self._output_dir / f"test-{self.session_id}"
return self.OUTPUT_DIR / f"test-{self.session_id}"
@property
def RECORDS(self):

52
src/env_file_helper.py Normal file
View file

@ -0,0 +1,52 @@
from pathlib import Path
from typing import NamedTuple
from loguru import logger
class EnvFile(NamedTuple):
env_path: Path
config: dict[str, str]
env_type: str
def __repr__(self) -> str:
return f"EnvFile(type={self.env_type})"
class DependencyRule(NamedTuple):
child: str
dependency: str
def _is_rule_satisfied(in_list: list, rule: DependencyRule) -> tuple[bool, int]:
child_indices = [index for index, element in enumerate(in_list) if element.env_type == rule.child]
child_index = min(child_indices)
# child_index = in_list.index(rule.child)
parent_indices = [index for index, element in enumerate(in_list) if element.env_type == rule.dependency]
parent_index = max(parent_indices)
# parent_index = in_list.index(rule.dependency)
return parent_index < child_index, parent_index
def sort_env_files_by_rule(env_list: list[EnvFile], rules: list[DependencyRule]) -> list:
in_list = env_list.copy()
def swap_item_with_previous(in_list: list[EnvFile], index: int):
"""swaps item at index N with item at index N-1"""
assert index > 0, "cannot swap with negative index"
in_list[index], in_list[index - 1] = in_list[index - 1], in_list[index]
for _ in range(10_000):
rule_satisfied: list[bool] = []
for rule in rules:
is_rule_satisfied, parent_index = _is_rule_satisfied(in_list, rule)
if is_rule_satisfied:
rule_satisfied.append(True)
else:
rule_satisfied.append(False)
# parent_index = in_list.index(rule.dependency)
swap_item_with_previous(in_list, parent_index)
if all(rule_satisfied):
return in_list
logger.error("could not find order that satisfys all rules")
raise ValueError

View file

@ -174,10 +174,8 @@ def get_html_files(path, output_file_path):
output_file_path = os.path.abspath(output_file_path)
for p in pathlib.Path(path).rglob("*.html"):
print(p)
res = str(p.absolute())
if output_file_path in res:
print("damn")
continue
tmp = BeautifulSoup("".join(open(res)), features="html.parser")

View file

@ -1,5 +1,6 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional, TypedDict
from typing import Callable
import pytest
from dotenv import dotenv_values
@ -8,18 +9,20 @@ from loguru import logger
from src.dirmanager import DirManager
class SubTest(TypedDict):
condition: Callable[[dict[str, str]], bool]
@dataclass
class Test:
test_file: str
condition: Callable[[dict[str, str]], bool] | None = None
prevent_skip: bool = False
class Runner:
name: str = ""
test_dir_name: str = ""
main_setup_name: Optional[str] = None
main_test_name: Optional[str] = None
setups: list[Test] = []
tests: list[Test] = []
cleanups: list[Test] = []
dependencies: list[type["Runner"]] = []
sub_tests: list[SubTest] = []
prevent_skip = False
def __init__(self, dotenv_path: Path, output_dir: Path, session_id: str):
@ -33,50 +36,63 @@ class Runner:
assert self.test_dir_name
self.root_dir = Path(__file__).parent
def run_setups(self):
"""runs the setup scripts if available"""
self._execute_test_list(self.setups)
def run_tests(self):
"""runs the test scripts if available"""
self._execute_test_list(self.tests)
def run_cleanups(self):
"""runs the cleanup scripts if available"""
self._execute_test_list(self.cleanups)
def _execute_test_list(self, test_list: list[Test]):
"""runs the main test script and if available and sub test scripts if their running condition is met"""
# check if required dependencies have passed
self._assert_dependencies_passed()
if not self._dependencies_passed():
logger.warning(f"skipping run_tests() of {self.name}, because some dependencies have not passed")
return
# run main setup if available
if isinstance(self.main_setup_name, str):
self._run_or_skip_test(
identifier_string=self.combine_names(self.name, self.main_setup_name),
test_path=self.root_dir / self.test_dir_name / self.main_setup_name,
)
for test in test_list:
self._run_test_with_checks(test)
# run main test if available
if isinstance(self.main_test_name, str):
self._run_or_skip_test(
identifier_string=self.combine_names(self.name, self.main_test_name),
test_path=self.root_dir / self.test_dir_name / self.main_test_name,
)
def _run_test_with_checks(self, test: Test):
# dependency passed: true / false
# already_passed: true / false
# prevent_skip: true / false
# condition_available: true / pass
# condition_met: true / false
# run sub tests if conditions are met
for sub_test in self.sub_tests:
condition_function = sub_test["condition"]
sub_test_name = sub_test["test_file"]
identifier_string = self.combine_names(self.name, sub_test_name)
if condition_function(self.config):
test_path = self.root_dir / self.test_dir_name / sub_test_name
self._run_or_skip_test(identifier_string=identifier_string, test_path=test_path)
identifier_string = self.combine_names(self.name, test.test_file)
test_path = self.root_dir / self.test_dir_name / test.test_file
# check if test aleady passed
if self._is_test_passed(identifier_string, remove_existing=True):
if test.prevent_skip:
logger.info(f"continuing , test {identifier_string} has passed but prevent_skip=True")
else:
self._create_result_file(result=-1, identifier_string=identifier_string)
logger.info(f"skipping {identifier_string}, test has passed")
return
def _run_or_skip_test(self, identifier_string: str, test_path: Path):
if not self.prevent_skip and self._is_test_passed(identifier_string, remove_existing=True):
logger.info(f"skipping {identifier_string}")
else:
if test.condition and not test.condition(self.config):
# test condition is defined but not met
logger.info(f"skipping {identifier_string}, test condition is not met")
return
# test condition is undefined or not met
logger.info(f"running {identifier_string}")
result = self._call_pytest(test_path)
self._create_result_file(result=result, identifier_string=identifier_string)
def _is_test_passed(self, identifier_string: str, remove_existing: bool = False) -> bool:
"""returns True if the selected test (matching test_name + sub_test_name) already passed
"""returns True if the selected test matching identifier_string already passed
This is determined by the presence of a specific output file in the RESULTS folder that
matches identifier_string
remove_existing: If True, result files matching test_name + sub_test_name with a status
remove_existing: If True, result files matching identifier_string with a status
other than 'passed' will be deleted"""
already_passed = False
@ -96,20 +112,21 @@ class Runner:
command_arguments = []
# command_arguments.append("-v")
command_arguments.append("-v")
# command_arguments.append("-rx")
command_arguments.append(str(full_test_path))
command_arguments.append("--env_file")
command_arguments.append(str(self.dotenv_path))
# set root dir for tests output (used in DirManager). this is our custom argument
command_arguments.append("--output_dir")
command_arguments.append(str(self.DIRS.OUTPUT))
command_arguments.append(str(self.DIRS.OUTPUT_DIR))
command_arguments.append("--session_id")
command_arguments.append(self.session_id)
# artifacts dir
# artifacts dir from pytest
# warning: https://github.com/microsoft/playwright-pytest/issues/111
# --output only works with the given context and page fixture
# folder needs to be unique! traces will not appear, if every pytest run has same output dir
@ -144,18 +161,22 @@ class Runner:
with open(file_path, "w") as _:
pass # create empty file
def _assert_dependencies_passed(self):
"""assert that all dependencie setups passed before"""
def _dependencies_passed(self):
"""returns true if all setups of each dependency have passed"""
# todo: what about conditional setups?
passed_tests = [r.name for r in self.DIRS.RESULTS.glob("*") if "passed" in r.name]
for dependencie in self.dependencies:
dependencie_identifier = self.combine_names(dependencie.name, dependencie.main_setup_name)
assert any(
dependencie_identifier in f for f in passed_tests
), f"could not run {self.name} because {dependencie} did not run before"
results = []
for dependencie_runner in self.dependencies:
for setup_name in dependencie_runner.setups:
dependencie_identifier = self.combine_names(dependencie_runner.name, setup_name.test_file)
results.append(any(dependencie_identifier in f for f in passed_tests))
return all(results)
@staticmethod
def result_int_to_str(result_int: int) -> str:
"""converts the pytest exit code (int) into a meaningful string"""
match result_int:
case -1:
return "skipped"

View file

@ -1,100 +0,0 @@
import json
from pathlib import Path
import pytest
from icecream import ic
from playwright.sync_api import Browser, Locator, expect
cred_file = Path("credentials.json")
with open(cred_file, "r") as f:
CREDENTIALS = json.load(f)
print(CREDENTIALS)
TESTUSER = {"username": "testuser", "name": "Test User", "password": "test123", "email": "test@example.com"}
TIMEOUT = 5000
def check_for(locator: Locator):
expect(locator).to_be_visible(timeout=TIMEOUT)
# todo: why is this a fixture? to get dotenv_config
@pytest.fixture(scope="session", autouse=True)
def create_admin_login(browser: Browser, dotenv_config, STATES):
# ic(dotenv_config)
# go to page
context = browser.new_context()
context.set_default_timeout(TIMEOUT)
page = context.new_page()
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
# check welcome message
welcome_message = dotenv_config.get("welcome_message")
if welcome_message:
check_for(page.get_by_text(welcome_message))
# login
page.locator('input[name="uidField"]').fill(CREDENTIALS["admin"])
page.locator('ak-stage-identification input[name="password"]').fill(CREDENTIALS["admin_pw"])
page.get_by_role("button", name="Log In").click()
check_for(page.locator("ak-library"))
# save state
context.storage_state(path=f"{STATES}/admin_state.json")
page.close()
context.close()
def create_invite_link(page):
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
page.get_by_role("link", name="Admin Interface").click()
page.get_by_text("Verzeichnis").click()
page.get_by_text("Benutzer").nth(2).click()
page.get_by_text("Einladungen").click()
page.get_by_role("button", name="Erstellen").first.click()
page.locator('input[name="name"]').click()
linkname = "testlink9433"
page.locator('input[name="name"]').fill(linkname)
page.get_by_placeholder("Wählen Sie ein Objekt aus.").click()
page.get_by_role("option", name="invitation-enrollment-flow invitation-enrollment-flow").click()
page.get_by_text("Erstellen", exact=True).first.click()
linklocator = page.get_by_role("rowgroup").filter(has=page.get_by_text(linkname))
linklocator.locator(".fa-angle-down").click()
invitelink = linklocator.get_by_role("textbox").get_attribute(name="value")
return invitelink
def create_user(context, invitelink):
page = context.new_page()
page.goto(invitelink)
page.get_by_placeholder("Benutzername").click()
page.get_by_placeholder("Benutzername").fill(testuser["username"])
page.locator('input[name="name"]').click()
page.locator('input[name="name"]').fill(testuser["name"])
page.locator('input[name="email"]').click()
page.locator('input[name="email"]').fill(testuser["email"])
page.get_by_placeholder("Passwort", exact=True).click()
page.get_by_placeholder("Passwort", exact=True).fill(testuser["password"])
page.get_by_placeholder("Passwort (wiederholen)").click()
page.get_by_placeholder("Passwort (wiederholen)").fill(testuser["password"])
page.get_by_role("button", name="Weiter").click()
check_for(page.locator("ak-library"))
context.storage_state(path=f"{STATES}/user_state.json")
@pytest.fixture(scope="session", autouse=True)
def create_user_session(browser: Browser, admin_login):
admin_context = browser.new_context(storage_state=f"{STATES}/admin_state.json")
# admin_context = setup_context(browser, f"{STATES}/admin_state.json")
admin_page = admin_context.new_page()
invitelink = create_invite_link(admin_page)
admin_context.tracing.stop(path=f"{RECORDS}/create_invite_link.zip")
admin_context.close()
user_context = setup_context(browser)
create_user(user_context, invitelink)
user_context.tracing.stop(path=f"{RECORDS}/create_user.zip")
user_context.close()

View file

@ -6,33 +6,38 @@ from playwright.sync_api import BrowserContext, Page
from src.dirmanager import DirManager
TIMEOUT = 5000
@pytest.fixture
def admin_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "admin_state.json"
def authentik_admin_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "authentik_admin_state.json"
storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"])
context.set_default_timeout(TIMEOUT)
return context
@pytest.fixture
def authentik_admin_page(admin_context: BrowserContext, DIR: DirManager) -> Page:
page = admin_context.new_page()
page.pause()
authentik_env_file = DIR.ENV_FILES / "authentik"
authentik_config: dict[str, str] = dotenv_values(authentik_env_file) # type: ignore
url = "https://" + authentik_config["DOMAIN"]
def authentik_admin_page(authentik_admin_context: BrowserContext, DIR: DirManager) -> Page:
page = authentik_admin_context.new_page()
env_file = DIR.ENV_FILES / "authentik"
config: dict[str, str] = dotenv_values(env_file) # type: ignore
url = "https://" + config["DOMAIN"]
page.goto(url)
return page
@pytest.fixture
def user_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "user_state.json"
def authentik_user_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "authentik_user_state.json"
storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"])
context.set_default_timeout(TIMEOUT)
return context
@pytest.fixture
def authentik_user_page(authentik_user_context: BrowserContext, DIR: DirManager) -> Page:
page = authentik_user_context.new_page()
env_file = DIR.ENV_FILES / "authentik"
config: dict[str, str] = dotenv_values(env_file) # type: ignore
url = "https://" + config["DOMAIN"]
page.goto(url)
return page

View file

@ -1,3 +0,0 @@
# will be loaded in conftest.py
# will provide context for other tests (wordpress etc.)
# that depend on authentik (which is all of them)

View file

@ -1,4 +1,4 @@
from src.runner import Runner, SubTest
from src.runner import Runner, Test
def condition_always_true(dotenv_config: dict[str, str]) -> bool:
@ -12,5 +12,5 @@ def condition_always_false(dotenv_config: dict[str, str]) -> bool:
class RunnerAuthentik(Runner):
name = "authentik"
test_dir_name = "tests_authentik"
main_setup_name = "setup_authentik.py"
# main_test_name = "test_authentik_dummy.py"
setups = [Test(test_file="setup_authentik.py")]
# tests = [Test(test_file="test_authentik_dummy.py")]

View file

@ -9,15 +9,12 @@ from src.dirmanager import DirManager
ADMIN_USER = os.environ["ADMIN_USER"]
ADMIN_PASS = os.environ["ADMIN_PASS"]
LOCALE = {"Accept-Language": "de_DE"}
TESTUSER = {"username": "testuser", "name": "Test User", "password": "test123", "email": "test@example.com"}
TIMEOUT = 6000
def test_create_admin_login(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
def setup_admin_state(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
# go to page
context.set_extra_http_headers(LOCALE)
context.set_default_timeout(TIMEOUT)
page = context.new_page()
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
@ -34,7 +31,7 @@ def test_create_admin_login(context: BrowserContext, dotenv_config: dict[str, st
expect(page.locator("ak-library")).to_be_visible()
# save state
context.storage_state(path=f"{DIR.STATES}/admin_state.json")
context.storage_state(path=f"{DIR.STATES}/authentik_admin_state.json")
def check_if_user_exists(admin_context: BrowserContext, dotenv_config: dict[str, str]):
@ -102,12 +99,9 @@ def create_user(user_context: BrowserContext, invitelink):
expect(page.locator("ak-library")).to_be_visible()
def test_create_user_session(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
context.set_extra_http_headers(LOCALE)
context.set_default_timeout(TIMEOUT)
def setup_user_state(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
# load admin cookies
state_file = DIR.STATES / "admin_state.json"
state_file = DIR.STATES / "authentik_admin_state.json"
storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"])
@ -116,11 +110,10 @@ def test_create_user_session(context: BrowserContext, dotenv_config: dict[str, s
pass
context.clear_cookies()
else:
## create user
# create invite_link
# get invite_link
invite_link = create_invite_link(context, dotenv_config)
# create user
context.clear_cookies()
create_user(context, invite_link)
context.storage_state(path=f"{DIR.STATES}/user_state.json")
context.storage_state(path=f"{DIR.STATES}/authentik_user_state.json")

View file

@ -1,6 +1,4 @@
from typing import Optional
from src.runner import Runner, SubTest
from src.runner import Runner, Test
from src.tests_authentik.runner_authentik import RunnerAuthentik
@ -10,20 +8,19 @@ class RunnerDemo(Runner):
name: str = "demo" # name of the test, used for logging / output naming
test_dir_name: str = "tests_demo" # dir name holding all tests related to RunnerDemo
# Filename of Demo setup. If defined, it will run 1st by executing pytest
main_setup_name: Optional[str] = "setup_demo.py"
# Filename of Demo test. This file contains unconditional tests that will be run in any
# case. If defined, it will run 2nd by executing pytest
main_test_name: Optional[str] = None
# this indicates that tests from RunnerDemo depend on the setup from RunnerAuthentik.
# RunnerDemo will only execute, when setup_authentik.py has finished successfully.
# For example, setup_authentik.py generates session states, that can be used as fixtures
# that can be loaded from fixtures_authentik.py
dependencies: list[type["Runner"]] = [RunnerAuthentik]
# todo: update these comments
# Filename of Demo setup. If defined, it will run 1st by executing pytest
# Filename of Demo test. This file contains unconditional tests that will be run in any
# case. If defined, it will run 2nd by executing pytest
# this list can hold many more tests from RunnerDemo that run conditional. The condition
# and the test file can be defined by creating a SubTest instance:
# SubTest(condition: Callable, test_file: str)
sub_tests: list[SubTest] = []
# and the test file can be defined by creating a ConditionalTest instance:
# ConditionalTest(condition: Callable, test_file: str)
setups: list[Test] = []
tests: list[Test] = []
cleanups: list[Test] = []

View file

@ -0,0 +1,25 @@
import pytest
from playwright.sync_api import BrowserContext, expect
# todo: what is this test for, why is it a fixture? -> ignore for now
@pytest.fixture(scope="session", autouse=True)
def delete_nextcloud_user(admin_context: BrowserContext):
"""Delete Nextcloud User"""
yield
context = setup_context(browser, f"{STATES}/admin_state.json")
page = context.new_page()
page.goto(CONFIG["domain"])
with page.expect_popup() as nextcloud_info:
page.get_by_role("link", name="Nextcloud").click()
nextcloud = nextcloud_info.value
nextcloud.get_by_role("link", name="Open settings menu").click()
nextcloud.get_by_role("link", name="Users").click()
nextcloud.locator("#app-content div").filter(has_text=testuser["username"]).get_by_role(
"button", name="Toggle user actions menu"
).click()
nextcloud.get_by_role("button", name="Delete user").click()
nextcloud.get_by_role("button", name=f"Delete authentik-{testuser['username']}'s account").click()
context.tracing.stop(path=f"{RECORDS}/nextcloud_delete_user.zip")
context.close()

View file

@ -0,0 +1 @@
from src.tests_authentik.fixtures_authentik import admin_context, authentik_admin_page, user_context

View file

@ -0,0 +1,18 @@
from src.runner import Runner, Test
from src.tests_authentik.runner_authentik import RunnerAuthentik
def condition_always_false(dotenv_config: dict[str, str]) -> bool:
return False
class RunnerNextcloud(Runner):
name: str = "nextcloud"
test_dir_name: str = "tests_nextcloud"
dependencies = [RunnerAuthentik]
setups = [Test(test_file="setup_nextcloud.py")]
tests = [
Test(test_file="tests_nextcloud.py"),
Test(condition=condition_always_false, test_file="tests_nextcloud_onlyoffice.py"),
]
# cleanups = [Test(test_file="cleanup_nextcloud.py")]

View file

@ -0,0 +1,35 @@
import pytest
@pytest.fixture(scope="session", autouse=True)
def nc_login(browser: Browser):
"""Nextcloud Login"""
context = setup_context(browser, f"{STATES}/user_state.json")
page = context.new_page()
page.goto(CONFIG["domain"])
with page.expect_popup() as nextcloud_info:
link = page.get_by_role("link", name="Nextcloud")
CONFIG["nc_domain"] = link.get_attribute("href")
link.click()
nextcloud = nextcloud_info.value
check_for(nextcloud.get_by_role("link", name="Name"))
if nextcloud.query_selector(".close-icon"):
close_button = nextcloud.get_by_role("button", name="Close modal")
close_button.click()
expect(close_button).to_be_hidden()
nextcloud.wait_for_timeout(2000)
context.storage_state(path=f"{STATES}/nc_user_state.json")
context.tracing.stop(path=f"{RECORDS}/nextcloud_login_user.zip")
context.close()
@pytest.fixture
def nc_session(browser: Browser):
"""Reuse Nextcloud User Session"""
context = setup_context(browser, f"{STATES}/nc_user_state.json")
page = context.new_page()
page.goto(CONFIG["nc_domain"])
if page.query_selector(".close-icon"):
page.get_by_role("button", name="Close modal").click()
yield context, page
context.close()

View file

@ -0,0 +1,13 @@
def test_nextcloud(nc_session):
"""Test Nextcloud"""
context, page = nc_session
# if page.query_selector('.close-icon'):
# page.get_by_role("button", name="Close modal").click()
if CONFIG.get("default_quota"):
quota = int(
page.get_by_role("listitem", name="Storage informations").get_by_role("link").inner_text().split()[3]
)
assert quota == CONFIG["default_quota"]
for app in CONFIG["nc_apps"]:
check_for(page.get_by_role("link", name=app))
context.tracing.stop(path=f"{RECORDS}/nextcloud.zip")

View file

@ -0,0 +1,19 @@
def test_onlyoffice(nc_session):
"""Test Onlyoffice in Nextcloud"""
context, page = nc_session
# if page.query_selector('.close-icon'):
# page.get_by_role("button", name="Close modal").click()
page.get_by_role("link", name="New file/folder menu").click()
page.get_by_role("link", name="New document").click()
page.locator("#view9-input-file").fill("test.docx")
page.get_by_role("button", name="Submit").click()
outer_frame = page.frame_locator("#onlyofficeFrame")
check_for(outer_frame.locator("body"))
inner_frame = outer_frame.frame_locator("#app > iframe")
check_for(inner_frame.locator("body"))
onlyoffice = page.frame("frameEditor")
check_for(onlyoffice.locator('//*[@id="area_id"]'))
onlyoffice.locator("#btn-goback").click()
page.get_by_role("link", name="Not favorited test .docx Share Actions").get_by_role("link", name="Actions").click()
page.get_by_role("link", name="Delete file").click()
context.tracing.stop(path=f"{RECORDS}/onlyoffice.zip")

View file

@ -1 +1,34 @@
from src.tests_authentik.fixtures_authentik import admin_context, authentik_admin_page, user_context
import json
import pytest
from dotenv import dotenv_values
from playwright.sync_api import BrowserContext, Page
from src.dirmanager import DirManager
# from src.tests_authentik.fixtures_authentik import (
# authentik_admin_context,
# authentik_admin_page,
# authentik_user_context,
# authentik_user_page,
# )
pytest_plugins = "src.tests_authentik.fixtures_authentik"
@pytest.fixture
def wordpress_admin_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "wordpress_admin_state.json"
storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"])
return context
@pytest.fixture
def wordpress_admin_page(wordpress_admin_context: BrowserContext, DIR: DirManager) -> Page:
page = wordpress_admin_context.new_page()
env_file = DIR.ENV_FILES / "wordpress"
config: dict[str, str] = dotenv_values(env_file) # type: ignore
url = "https://" + config["DOMAIN"]
page.goto(url)
return page

View file

@ -1,4 +1,4 @@
from src.runner import Runner, SubTest
from src.runner import Runner, Test
from src.tests_authentik.runner_authentik import RunnerAuthentik
@ -20,9 +20,9 @@ def condition_has_locale(dotenv_config: dict[str, str]) -> bool:
class RunnerWordpress(Runner):
name = "wordpress"
test_dir_name = "tests_wordpress"
main_test_name = "test_wordpress.py"
dependencies: list[type[Runner]] = [RunnerAuthentik]
sub_tests = [
SubTest(condition=condition_has_locale, test_file="test_wordpress_localization.py"),
setups = [Test(test_file="setup_wordpress.py")]
tests = [
Test(test_file="test_wordpress.py"),
Test(condition=condition_has_locale, test_file="test_wordpress_localization.py"),
]
prevent_skip = True

View file

@ -0,0 +1,30 @@
import pytest
from playwright.sync_api import BrowserContext, Page, expect
from src.dirmanager import DirManager
@pytest.mark.xfail(reason="wordpress sso login has not been generated")
def test_visit_from_domain(authentik_admin_context: BrowserContext, dotenv_config: dict[str, str]):
"""visit wordpress directly with admin_session, expect not to be logged in"""
page = authentik_admin_context.new_page()
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
# look for content wrapper
expect(page.locator("#wpcontent")).to_be_visible(timeout=3_000)
# look for admin bar
expect(page.locator("#wpadminbar")).to_be_visible(timeout=3_000)
def setup_wordpress_admin_session(authentik_admin_page: Page, DIR: DirManager):
"""visit wordpress from authentik with admin_session to create wordpress_admin_session"""
with authentik_admin_page.expect_popup() as event_context:
authentik_admin_page.get_by_role("link", name="Wordpress").click()
page_wordpress = event_context.value
# look for content wrapper
expect(page_wordpress.locator("#wpcontent")).to_be_visible()
# look for admin bar
expect(page_wordpress.locator("#wpadminbar")).to_be_visible()
# save session
context = page_wordpress.context
context.storage_state(path=f"{DIR.STATES}/wordpress_admin_state.json")

View file

@ -1,13 +0,0 @@
from playwright.sync_api import Page, expect
def test_visit_from_authentik(authentik_admin_page: Page):
with authentik_admin_page.expect_popup() as event_context:
authentik_admin_page.get_by_role("link", name="Wordpress").click()
page_wordpress = event_context.value
# look for content wrapper
expect(page_wordpress.locator("#wpcontent")).to_be_visible()
# look for admin bar
expect(page_wordpress.locator("#wpadminbar")).to_be_visible()

View file

@ -5,11 +5,11 @@ from playwright.sync_api import BrowserContext, expect
from src.dirmanager import DirManager
def test_welcome_message(user_context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
page = user_context.new_page()
def test_welcome_message(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
page = context.new_page()
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
expect(page.locator("#wpcontent")).to_be_visible()
expect(page.locator(".wp-block-heading")).to_be_visible()
if "locale" in dotenv_config and "de" in dotenv_config["locale"]:
expect(page.get_by_role("heading")).to_have_text("Willkommen bei WordPress!")

View file

@ -18,3 +18,8 @@ def rmtree(root_dir: Path):
child.unlink()
root_dir.rmdir()
def make_url(domain: str) -> str:
"""adds 'http://' at the beginning of a string"""
return "https://" + domain

View file

@ -0,0 +1,59 @@
import sys
from pathlib import Path
from icecream import ic
sys.path.append(Path(__file__).parent.parent.resolve().__str__())
# import pytest
# from prototyping.sorting_algo import Rule, is_rule_satisfied, sort_by_rules
from src.coordinator import Coordinator
from src.env_file_helper import DependencyRule, EnvFile, sort_env_files_by_rule
# @pytest.fixture
# def in_list():
# return ["a", "b", "c", "d", "e", "f", "g"]
# @pytest.fixture
# def rules() -> list[Rule]:
# return [ # X depends on Y
# Rule("a", "e"),
# Rule("b", "e"),
# Rule("b", "f"),
# Rule("c", "e"),
# Rule("d", "e"),
# Rule("f", "e"),
# ]
# def has_rules_satisfied(in_list, rules):
# rule_satisfied: list[bool] = []
# for rule in rules:
# if is_rule_satisfied(in_list, rule):
# rule_satisfied.append(True)
# else:
# rule_satisfied.append(False)
# return all(rule_satisfied)
# def test_stuff(in_list, rules):
# sort_by_rules(in_list, rules)
# assert has_unique_elements(in_list)
# assert has_rules_satisfied(in_list, rules)
ENV_FILES = [
Path("envfiles/blog.test.dev.local-it.cloud.env"), # wordpress
Path("envfiles/login.test.dev.local-it.cloud.env"), # authentik
]
env_files: list[EnvFile] = Coordinator._getn_env_files_list(ENV_FILES)
dependency_rules: list[DependencyRule] = Coordinator._get_dependency_rules(env_files)
ic(env_files)
sorted_env_files = sort_env_files_by_rule(env_files, dependency_rules)
ic(env_files)
ic(sorted_env_files)