rework-output-and-test-logic (#3)

* fix flakey tests in authentik / wordpress

* make it possible to rerun tests partially -> passed will be skipped, failed will be repeated

* improve organization of all outputs (moving, renaming, keeping multiple versions etc.)

* add html reports, replace .txt tracebacks

* combine all html reports into one

* add demo runner with comments for documentation purposes

Reviewed-on: local-it-infrastructure/e2e_tests#3
Co-authored-by: Daniel <d.brummerloh@gmail.com>
Co-committed-by: Daniel <d.brummerloh@gmail.com>
This commit is contained in:
Daniel 2023-11-29 14:14:46 +01:00 committed by dan
parent d2cd6ba47f
commit 8172f685de
24 changed files with 588 additions and 418 deletions

View file

@ -9,4 +9,4 @@ RUN playwright install-deps
COPY ./requirements.txt ./ COPY ./requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
WORKDIR /code/src WORKDIR /code

View file

@ -1,4 +1,14 @@
# Readme # Clone
To clone with submodules, use these git commands:
```bash
git clone --recurse-submodules <repository>
git submodule update --init // add submodule after normal cloning
git submodule update --remote // update submodules
```
# Run
```bash ```bash
docker compose build docker compose build

57
main.py
View file

@ -2,27 +2,42 @@ import json
import os import os
from pathlib import Path from pathlib import Path
from src.utils import get_session_id from loguru import logger
from src.wrapper import Wrapper
from src.coordinator import Coordinator
from src.dirmanager import DirManager
from src.utils import get_session_id
# ----------------------------- lookup env files ----------------------------- #
# This list of env files is the input to testing framework. each env file
# triggers the execution of one test Runner and provides configuration to the
# tests inside the runner. There can be dependencies, for example wordpress
# requires that authentik ran first to create the admin session and the user
# session. At the moment, wrong ordering results in unsuccessful test
# (wrong ordering would be wordpress env file is before authentik env file).
# The env file list is the input to testing framework. each env file triggers
# the execution of one test Runner and provides configuration to the tests
# inside the runner. There can be dependencies, for example wordpress requires
# that authentik ran first to create the admin session and the user session.
# At the moment, wrong ordering results in unsuccessful test (wrong ordering
# would be wordpress env file is before authentik env file).
ENV_FILES = [ ENV_FILES = [
Path("envfiles/login.test.dev.local-it.cloud.env"), # authentik Path("envfiles/login.test.dev.local-it.cloud.env"), # authentik
Path("envfiles/blog.test.dev.local-it.cloud.env"), # wordpress Path("envfiles/blog.test.dev.local-it.cloud.env"), # wordpress
] ]
# ----------------------------- define ouptut dir ---------------------------- #
OUTPUT_DIR = Path("./test-output").resolve() OUTPUT_DIR = Path("./test-output").resolve()
# Set environment variables # -------------------------- enable playwright debug ------------------------- #
# os.environ["PWDEBUG"] = "1" # os.environ["PWDEBUG"] = "1"
# --------------------- load credentials to env variables -------------------- #
cred_file = Path("credentials.json") cred_file = Path("credentials.json")
with open(cred_file, "r") as f: with open(cred_file, "r") as f:
CREDENTIALS = json.load(f) CREDENTIALS = json.load(f)
@ -31,7 +46,25 @@ os.environ["ADMIN_USER"] = CREDENTIALS["admin_user"]
os.environ["ADMIN_PASS"] = CREDENTIALS["admin_pass"] os.environ["ADMIN_PASS"] = CREDENTIALS["admin_pass"]
# ----------------------------- define session_id ---------------------------- #
session_id = get_session_id() session_id = get_session_id()
wrapper = Wrapper(ENV_FILES, output_dir=OUTPUT_DIR, session_id=session_id) # session_id = "abc"
wrapper.setup_test()
wrapper.run_test()
# ------------------------------- setup logging ------------------------------ #
DIR = DirManager(output_dir=OUTPUT_DIR, session_id=session_id)
log_file = DIR.RESULTS / "full.log"
logger.add(log_file)
# ---------------------------- initialize and run ---------------------------- #
coordinator = Coordinator(ENV_FILES, output_dir=OUTPUT_DIR, session_id=session_id)
coordinator.setup_test()
coordinator.run_test()
coordinator.combine_html()
coordinator.collect_traces()

View file

@ -1,14 +1,14 @@
from conftest import CONFIG, check_for, RECORDS from playwright.sync_api import BrowserContext, expect
""" Test Wordpress """ from src.dirmanager import DirManager
def test_wordpress(admin_session):
context, page = admin_session
with page.expect_popup() as info:
page.get_by_role("link", name="Wordpress").click()
wordpress = info.value
check_for(wordpress.locator("#wpcontent"))
if CONFIG['locale'] == 'de':
check_for(wordpress.get_by_role("heading", name="Willkommen bei WordPress!"))
context.tracing.stop(path=f"{RECORDS}/wordpress.zip")
def test_wordpress(admin_session: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
page_authentik = admin_session.new_page()
with page_authentik.expect_popup() as event_context:
page_authentik.get_by_role("link", name="Wordpress").click()
page_wordpress = event_context.value
expect(page_wordpress.locator("#wpcontent")).to_be_visible()
if "locale" in dotenv_config and "de" in dotenv_config["locale"]:
expect(page_wordpress.get_by_role("heading")).to_have_text("Willkommen bei WordPress!")

View file

@ -2,3 +2,5 @@ pytest
pytest-playwright pytest-playwright
python-dotenv python-dotenv
icecream icecream
loguru
beautifulsoup4

View file

@ -9,24 +9,28 @@ from pathlib import Path
import pytest import pytest
from dotenv import dotenv_values from dotenv import dotenv_values
from pytest import Parser
from src.dirmanager import DirManager from src.dirmanager import DirManager
TIMEOUT = 5000 TIMEOUT = 5000
def pytest_addoption(parser): def pytest_addoption(parser: Parser):
parser.addoption( parser.addoption(
"--env_file", "--env_file",
action="store", action="store",
required=True,
) )
parser.addoption( parser.addoption(
"--output_dir", "--output_dir",
action="store", action="store",
required=True,
) )
parser.addoption( parser.addoption(
"--session_id", "--session_id",
action="store", action="store",
required=True,
) )
@ -38,14 +42,11 @@ def DIR(request) -> DirManager:
DIR.SESSION DIR.SESSION
DIR.RECORDS DIR.RECORDS
DIR.STATES DIR.STATES
DIR.RESULTS DIR.RESULTS"""
DIR.PROGRESS"""
output_dir = request.config.getoption("--output_dir") output_dir = request.config.getoption("--output_dir")
assert output_dir is not None, "required pytest command line argument not given"
output_dir = Path(output_dir) output_dir = Path(output_dir)
session_id = request.config.getoption("--session_id") session_id = request.config.getoption("--session_id")
assert session_id is not None, "required pytest command line argument not given"
dirmanager = DirManager(output_dir=output_dir, session_id=session_id) dirmanager = DirManager(output_dir=output_dir, session_id=session_id)
dirmanager.create_all_dirs() dirmanager.create_all_dirs()
return dirmanager return dirmanager
@ -54,26 +55,6 @@ def DIR(request) -> DirManager:
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
def dotenv_config(request) -> dict[str, str]: def dotenv_config(request) -> dict[str, str]:
dotenv_path = request.config.getoption("--env_file") dotenv_path = request.config.getoption("--env_file")
assert dotenv_path is not None, "required pytest command line argument not given"
dotenv_path = Path(dotenv_path) dotenv_path = Path(dotenv_path)
assert dotenv_path.is_file() assert dotenv_path.is_file()
return dotenv_values(dotenv_path) # type: ignore return dotenv_values(dotenv_path) # type: ignore
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""saves traceback when test fails"""
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# we only look at actual failing test calls, not setup/teardown
if rep.when == "call" and rep.failed:
# saves traceback as .txt for failed test
filename = f"failed-{item.nodeid}.txt"
filename = filename.replace("/", "-")
filename = filename.replace("::", "-")
filepath = item.funcargs["DIR"].RESULTS / filename
with open(filepath, "a") as f:
f.write(rep.longreprtext + "\n")

99
src/coordinator.py Normal file
View file

@ -0,0 +1,99 @@
import shutil
from pathlib import Path
from dotenv import dotenv_values
from loguru import logger
from src.dirmanager import DirManager
from src.html_helper import merge_html_files
from src.runner import Runner
from src.tests_authentik.runner_authentik import RunnerAuthentik
from src.tests_wordpress.runner_wordpress import RunnerWordpress
from src.utils import rmtree
# Register all runners here. Each .env file with TYPE=authentik will be ran with RunnerAuthentik
RUNNER_DICT: dict[str, type[Runner]] = {
"authentik": RunnerAuthentik,
"wordpress": RunnerWordpress,
}
class Coordinator:
def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str):
out_string = "".join([e.name + "\n" for e in env_paths_list])
out_string += f"output_dir = {output_dir}\n"
out_string += f"session_id = {session_id}"
logger.info(f"initialize Coordinator instance with\nenv_paths_list =\n{out_string}")
self.DIR = DirManager(output_dir=output_dir, session_id=session_id)
self.output_dir = output_dir
self.session_id = session_id
self.env_paths: dict[str, Path] = dict()
self.env_configs: dict[str, dict[str, str]] = dict() # todo: needed?
self._parse_env_files(env_paths_list)
def _parse_env_files(self, env_paths: list[Path]):
for env_path in env_paths:
assert env_path.is_file(), f"the env file {env_path} does not exist"
config: dict[str, str] = dotenv_values(env_path) # type: ignore
assert "TYPE" in config, f"the env file {env_path} does not specify the required TYPE key."
env_type = config["TYPE"]
self.env_paths[env_type] = env_path
self.env_configs[env_type] = config # todo: needed?
def setup_test(self):
logger.info("calling setup_test()")
self.DIR.create_all_dirs()
self._copy_env_files()
def _copy_env_files(self):
"""Copies all env filesto STATES/env_files. Files will be renamed to their own TYPE value."""
env_files_dir = self.DIR.STATES / "env_files"
env_files_dir.mkdir(exist_ok=True)
for type_key, env_path in self.env_paths.items():
shutil.copy(env_path, env_files_dir / type_key)
def run_test(self):
logger.info("calling run_test()")
self.runners: list[Runner] = self._load_runners(self.env_paths.values())
for runner in self.runners:
runner.run_tests()
logger.info("run_test() finished")
def _load_runners(self, env_files: list[Path]) -> list[Runner]:
runners = []
for env_file in env_files:
config: dict[str, str] = dotenv_values(env_file) # type: ignore
RunnerClass = RUNNER_DICT[config["TYPE"]]
runners.append(RunnerClass(dotenv_path=env_file, output_dir=self.output_dir, session_id=self.session_id))
return runners
def combine_html(self):
in_path = str(self.DIR.RECORDS / "html")
out_path = str(self.DIR.RECORDS / "full-report.html")
title = "combined.html"
merge_html_files(in_path, out_path, title)
def collect_traces(self):
"""moves all traces into SESSION/RECORDS dir
if tests are rerun and generate another trace, the new trace will get a unique name such as
tracename-0
tracename-1
...
"""
def get_new_path(root_dir: Path, base_name: str, index=0) -> Path:
new_name_alt = base_name + f"-{index}"
if not (root_dir / new_name_alt).is_dir():
return root_dir / new_name_alt
else:
index += 1
return get_new_path(root_dir, base_name, index=index)
trace_root_dir = self.DIR.RECORDS / "traces"
for f in trace_root_dir.rglob("*/trace.zip"):
new_path = get_new_path(self.DIR.RECORDS, f.parent.name)
f.parent.rename(new_path)
rmtree(trace_root_dir)

View file

@ -8,7 +8,6 @@ class DirManager:
The structures is as follows: The structures is as follows:
tests dir/ tests dir/
session_dir-1/ session_dir-1/
progress
records records
results results
states states
@ -26,7 +25,7 @@ class DirManager:
def create_all_dirs(self): def create_all_dirs(self):
self.create_dirs(self._output_dir, exist_ok=True) self.create_dirs(self._output_dir, exist_ok=True)
self.create_dirs([self.SESSION, self.RECORDS, self.STATES, self.RESULTS, self.PROGRESS], exist_ok=True) self.create_dirs([self.SESSION, self.RECORDS, self.RECORDS / "html", self.STATES, self.RESULTS], exist_ok=True)
@property @property
def OUTPUT(self): def OUTPUT(self):
@ -48,10 +47,6 @@ class DirManager:
def RESULTS(self): def RESULTS(self):
return self.SESSION / Path("results") return self.SESSION / Path("results")
@property
def PROGRESS(self):
return self.SESSION / Path("progress")
@staticmethod @staticmethod
def create_dirs(dirs: Path | list[Path] | dict[str, Path], exist_ok=False): def create_dirs(dirs: Path | list[Path] | dict[str, Path], exist_ok=False):
match dirs: match dirs:

196
src/html_helper.py Normal file
View file

@ -0,0 +1,196 @@
# code from
# https://github.com/akavbathen/pytest_html_merger/tree/main
import json
import os
import pathlib
import re
from bs4 import BeautifulSoup
from packaging import version
CHECKBOX_REGEX = r"^(?P<num>0|[1-9]\d*) (?P<txt1>.*)"
def merge_html_files(in_path: str, out_path: str, title: str):
paths = get_html_files(in_path, out_path)
if not paths:
raise RuntimeError(f"Unable to find html files in {in_path}")
assets_dir_path = get_assets_path(in_path)
first_file = BeautifulSoup("".join(open(paths[0])), features="html.parser")
paths.pop(0)
try:
first_file.find("link").decompose()
except:
pass
if assets_dir_path is None:
print(
f"Will assume css is embedded in the reports. If this is not the case, "
f"Please make sure that you have 'assets' directory inside {in_path} "
f"which contains css files generated by pytest-html."
)
else:
with open(os.path.join(assets_dir_path, "style.css"), "r") as f:
content = f.read()
head = first_file.head
head.append(first_file.new_tag("style", type="text/css"))
head.style.append(content)
h = first_file.find("h1")
h.string = title or os.path.basename(out_path)
ps = first_file.find_all("p")
pytest_version = ps[0].text.split(" ")[-1]
ps.pop(0)
cb_types = {
"passed": [0, ""],
"skipped": [0, ""],
"failed": [0, ""],
"error": [0, ""],
"xfailed": [0, ""],
"xpassed": [0, ""],
}
html_ver = version.parse(pytest_version)
if html_ver >= version.parse("4.0.0rc"):
cb_types["rerun"] = [0, ""]
for cb_type in cb_types:
cb_val = get_checkbox_value(first_file, cb_type)
cb_types[cb_type][0] = cb_val[0]
cb_types[cb_type][1] = cb_val[1]
dur, test_count, fp = get_test_count_and_duration(ps, html_ver)
if html_ver < version.parse("4.0.0rc"):
t = first_file.find("table", {"id": "results-table"})
else:
f_json_blob = first_file.find("div", {"id": "data-container"}).get("data-jsonblob")
# Convert the JSON string into a dictionary
f_data_dict = json.loads(f_json_blob)
for path in paths:
cur_file = BeautifulSoup("".join(open(path)), features="html.parser")
if html_ver < version.parse("4.0.0rc"):
tbody_res = cur_file.find_all("tbody", {"class": "results-table-row"})
for elm in tbody_res:
t.append(elm)
else:
f_json_blob = cur_file.find("div", {"id": "data-container"}).get("data-jsonblob")
# Convert the JSON string into a dictionary
c_data_dict = json.loads(f_json_blob)
f_data_dict["tests"].update(c_data_dict["tests"])
p_res = cur_file.find_all("p")
_dur, _test_count, _ = get_test_count_and_duration(p_res, html_ver)
dur += _dur
test_count += _test_count
for cb_type in cb_types:
tmp = get_checkbox_value(cur_file, cb_type)
cb_types[cb_type][0] += tmp[0]
fp.string = f"{test_count} tests ran in {dur} seconds"
if html_ver >= version.parse("4.0.0rc"):
first_file.find("div", {"id": "data-container"})["data-jsonblob"] = json.dumps(f_data_dict)
for cb_type in cb_types:
set_checkbox_value(first_file, cb_type, cb_types[cb_type])
with open(out_path, "w") as f:
f.write(str(first_file))
def get_test_count_and_duration(ps, html_ver):
test_count = 0
dur = 0
fp = None
for p in ps:
if html_ver >= version.parse("4.0.0"):
match = re.search(r"test.* took ", p.text)
if match:
tmp = p.text.split(" ")
test_count = int(tmp[0])
if "ms." in tmp:
dur = int(tmp[3]) / 1000
else:
hours, minutes, seconds = map(int, tmp[3][:-1].split(":"))
dur = hours * 3600 + minutes * 60 + seconds
fp = p
break
if html_ver < version.parse("4.0.0"):
if " tests ran" in p.text:
tmp = p.text.split(" ")
test_count = int(tmp[0])
dur = float(tmp[4])
fp = p
break
return dur, test_count, fp
def set_checkbox_value(root_soap, cb_type, val):
elem = root_soap.find("span", {"class": cb_type})
match = re.search(CHECKBOX_REGEX, elem.text)
if match is None:
raise RuntimeError(f"{cb_type} <span> not found")
elem.string = f"{val[0]} {val[1]}"
elem = root_soap.find("input", {"data-test-result": cb_type})
if val[0] != 0:
del elem["disabled"]
del elem["hidden"]
def get_checkbox_value(root_soap, cb_type):
elem = root_soap.find("span", {"class": cb_type})
match = re.search(CHECKBOX_REGEX, elem.text)
if match is None:
raise RuntimeError(f"{cb_type} <span> not found")
gdict = match.groupdict()
return int(gdict["num"]), gdict["txt1"]
def get_html_files(path, output_file_path):
onlyfiles = []
output_file_path = os.path.abspath(output_file_path)
for p in pathlib.Path(path).rglob("*.html"):
print(p)
res = str(p.absolute())
if output_file_path in res:
print("damn")
continue
tmp = BeautifulSoup("".join(open(res)), features="html.parser")
p = tmp.find("p")
if p and "Report generated on " in p.text:
onlyfiles.append(res)
return sorted(onlyfiles, reverse=True)
def get_assets_path(path):
res = None
for p in pathlib.Path(path).rglob("assets"):
return str(p.absolute())
return res

View file

@ -3,23 +3,24 @@ from typing import Callable, Optional, TypedDict
import pytest import pytest
from dotenv import dotenv_values from dotenv import dotenv_values
from icecream import ic from loguru import logger
from src.dirmanager import DirManager from src.dirmanager import DirManager
class SubTest(TypedDict): class SubTest(TypedDict):
condition: Callable[[Path], bool] condition: Callable[[dict[str, str]], bool]
test_file: str test_file: str
class Runner: class Runner:
name: Optional[str] = None name: str = ""
test_dir_name: Optional[str] = None test_dir_name: str = ""
main_setup_name: Optional[str] = None main_setup_name: Optional[str] = None
main_test_name: Optional[str] = None main_test_name: Optional[str] = None
dependencies: list[type["Runner"]] = []
sub_tests: list[SubTest] = [] sub_tests: list[SubTest] = []
dependencies: list[str] = [] prevent_skip = False
def __init__(self, dotenv_path: Path, output_dir: Path, session_id: str): def __init__(self, dotenv_path: Path, output_dir: Path, session_id: str):
self.dotenv_path = dotenv_path self.dotenv_path = dotenv_path
@ -28,24 +29,54 @@ class Runner:
self.session_id = session_id self.session_id = session_id
self.DIRS = DirManager(output_dir, session_id) self.DIRS = DirManager(output_dir, session_id)
ic(f"creating instance of {self.__class__.__name__}") logger.info(f"creating instance of {self.__class__.__name__}")
assert self.test_dir_name is not None assert self.test_dir_name
self.root_dir = Path(__file__).parent self.root_dir = Path(__file__).parent
def _run_main_test(self): def _run_main_setup_and_test(self):
if isinstance(self.main_setup_name, str): if isinstance(self.main_setup_name, str):
full_path = self.root_dir / self.test_dir_name / self.main_setup_name self._run_test_if_required(
self._run_pytest(full_path) identifier_string=self.combine_names(self.name, self.main_setup_name),
test_path=self.root_dir / self.test_dir_name / self.main_setup_name,
)
if isinstance(self.main_test_name, str): if isinstance(self.main_test_name, str):
full_path = self.root_dir / self.test_dir_name / self.main_test_name self._run_test_if_required(
self._run_pytest(full_path) identifier_string=self.combine_names(self.name, self.main_test_name),
test_path=self.root_dir / self.test_dir_name / self.main_test_name,
)
def _run_pytest(self, full_test_path: Path): def _run_test_if_required(self, identifier_string: str, test_path: Path):
"""runs pytest programmatically if not self.prevent_skip and self._test_already_passed(identifier_string, remove_existing=True):
logger.info(f"skipping {identifier_string}")
else:
logger.info(f"running {identifier_string}")
result = self._call_pytest(test_path)
self._create_result_file(result=result, identifier_string=identifier_string)
will run all tests in the file at full_test_path with some command line arguments""" def _test_already_passed(self, identifier_string: str, remove_existing: bool = False) -> bool:
"""returns True if the selected test (matching test_name + sub_test_name) already passed
ic(f"running test: {full_test_path}") This is determined by the presence of a specific output file in the RESULTS folder that
matches identifier_string
remove_existing: If True, result files matching test_name + sub_test_name with a status
other than 'passed' will be deleted"""
already_passed = False
for result in self.DIRS.RESULTS.glob("*"):
if identifier_string in result.name:
# process any result file (passed / failed / skipped) if it exists
if "passed" in result.name:
already_passed = True
elif remove_existing:
result.unlink()
return already_passed
def _call_pytest(self, full_test_path: Path) -> int:
"""runs pytest programmatically on a specific file
all tests in the file [full_test_path] will be run along with command line arguments"""
command_arguments = [] command_arguments = []
@ -66,9 +97,8 @@ class Runner:
# warning: https://github.com/microsoft/playwright-pytest/issues/111 # warning: https://github.com/microsoft/playwright-pytest/issues/111
# --output only works with the given context and page fixture # --output only works with the given context and page fixture
# folder needs to be unique! traces will not appear, if every pytest run has same output dir # folder needs to be unique! traces will not appear, if every pytest run has same output dir
output = self.DIRS.RESULTS / full_test_path.stem
command_arguments.append("--output") command_arguments.append("--output")
command_arguments.append(str(output)) command_arguments.append(str(self.DIRS.RECORDS / "traces" / full_test_path.stem))
# tracing # tracing
command_arguments.append("--tracing") command_arguments.append("--tracing")
@ -81,27 +111,56 @@ class Runner:
# headed # headed
# command_arguments.append("--headed") # command_arguments.append("--headed")
pytest.main(command_arguments) # html report. Will be combined into one file later.
command_arguments.append(f"--html={self.DIRS.RECORDS / 'html' / full_test_path.with_suffix('.html').name}")
return pytest.main(command_arguments)
def run_tests(self): def run_tests(self):
self._check_dependencies_finished() self._assert_dependencies_passed()
self._run_main_test() self._run_main_setup_and_test()
for sub_test in self.sub_tests: for sub_test in self.sub_tests:
condition_function = sub_test["condition"] condition_function = sub_test["condition"]
if condition_function(self.dotenv_path): sub_test_name = sub_test["test_file"]
test_name = sub_test["test_file"] identifier_string = self.combine_names(self.name, sub_test_name)
full_test_path = self.root_dir / self.test_dir_name / test_name if condition_function(self.config):
self._run_pytest(full_test_path) test_path = self.root_dir / self.test_dir_name / sub_test_name
self._create_progress_file() self._run_test_if_required(identifier_string=identifier_string, test_path=test_path)
else:
self._create_result_file(result=-1, identifier_string=identifier_string)
def _create_progress_file(self): def _create_result_file(
"""create progress file to indicated finished test""" self,
file_path = self.DIRS.PROGRESS / self.name result: int,
identifier_string: str,
):
"""create result file to indicated passed/failed or skipped test"""
full_name = self.combine_names(self.result_int_to_str(result), identifier_string)
file_path = self.DIRS.RESULTS / full_name
with open(file_path, "w") as _: with open(file_path, "w") as _:
pass # create empty file pass # create empty file
def _check_dependencies_finished(self): @staticmethod
"""look for progress file of dependencies to confirm they have ran""" def result_int_to_str(result_int: int) -> str:
finished_tests = [result.name for result in self.DIRS.PROGRESS.glob("*")] match result_int:
case -1:
return "skipped"
case 0:
return "passed"
case _:
return "failed"
@staticmethod
def combine_names(*names: str) -> str:
return "-".join(names)
def _assert_dependencies_passed(self):
"""assert that all dependencie setups passed before"""
passed_tests = [r.name for r in self.DIRS.RESULTS.glob("*") if "passed" in r.name]
for dependencie in self.dependencies: for dependencie in self.dependencies:
assert dependencie in finished_tests dependencie_identifier = self.combine_names(dependencie.name, dependencie.main_setup_name)
assert any(
dependencie_identifier in f for f in passed_tests
), f"could not run {self.name} because {dependencie} did not run before"

View file

@ -5,18 +5,22 @@ from playwright.sync_api import BrowserContext
from src.dirmanager import DirManager from src.dirmanager import DirManager
TIMEOUT = 5000
@pytest.fixture @pytest.fixture
def admin_session(context: BrowserContext, DIR: DirManager) -> BrowserContext: def admin_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "admin_state.json" state_file = DIR.STATES / "admin_state.json"
storage_state = json.loads(state_file.read_bytes()) storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"]) context.add_cookies(storage_state["cookies"])
context.set_default_timeout(TIMEOUT)
return context return context
@pytest.fixture @pytest.fixture
def user_session(context: BrowserContext, DIR: DirManager) -> BrowserContext: def user_context(context: BrowserContext, DIR: DirManager) -> BrowserContext:
state_file = DIR.STATES / "user_state.json" state_file = DIR.STATES / "user_state.json"
storage_state = json.loads(state_file.read_bytes()) storage_state = json.loads(state_file.read_bytes())
context.add_cookies(storage_state["cookies"]) context.add_cookies(storage_state["cookies"])
context.set_default_timeout(TIMEOUT)
return context return context

View file

@ -1,15 +1,11 @@
from pathlib import Path
from src.runner import Runner, SubTest from src.runner import Runner, SubTest
# from src.tests_authentik.setup_authentik import setup_authentik
def condition_always_true(dotenv_config: dict[str, str]) -> bool:
def condition_always_true(dotenv_path: Path) -> bool:
return True return True
def condition_always_false(dotenv_path: Path) -> bool: def condition_always_false(dotenv_config: dict[str, str]) -> bool:
return False return False

View file

@ -2,7 +2,6 @@ import json
import os import os
import re import re
from icecream import ic
from playwright.sync_api import BrowserContext, expect from playwright.sync_api import BrowserContext, expect
from src.dirmanager import DirManager from src.dirmanager import DirManager
@ -10,13 +9,15 @@ from src.dirmanager import DirManager
ADMIN_USER = os.environ["ADMIN_USER"] ADMIN_USER = os.environ["ADMIN_USER"]
ADMIN_PASS = os.environ["ADMIN_PASS"] ADMIN_PASS = os.environ["ADMIN_PASS"]
LOCALE = {"Accept-Language": "de_DE"}
TESTUSER = {"username": "testuser", "name": "Test User", "password": "test123", "email": "test@example.com"} TESTUSER = {"username": "testuser", "name": "Test User", "password": "test123", "email": "test@example.com"}
TIMEOUT = 10000 TIMEOUT = 6000
def test_create_admin_login(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager): def test_create_admin_login(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
# go to page # go to page
context.set_extra_http_headers(LOCALE)
context.set_default_timeout(TIMEOUT)
page = context.new_page() page = context.new_page()
url = "https://" + dotenv_config["DOMAIN"] url = "https://" + dotenv_config["DOMAIN"]
page.goto(url) page.goto(url)
@ -45,8 +46,10 @@ def check_if_user_exists(admin_context: BrowserContext, dotenv_config: dict[str,
nav = page.locator("ak-sidebar-item", has_text=re.compile(r"Directory|Verzeichnis")) nav = page.locator("ak-sidebar-item", has_text=re.compile(r"Directory|Verzeichnis"))
nav.click() nav.click()
nav.get_by_role("link", name=re.compile(r"Users|Benutzer")).click() nav.get_by_role("link", name=re.compile(r"Users|Benutzer")).click()
result = page.get_by_text(TESTUSER["username"]).is_visible(timeout=TIMEOUT)
return result user = page.get_by_text(TESTUSER["username"])
user.wait_for(state="visible")
return user.is_visible()
def create_invite_link(admin_context: BrowserContext, dotenv_config: dict[str, str]): def create_invite_link(admin_context: BrowserContext, dotenv_config: dict[str, str]):
@ -100,6 +103,7 @@ def create_user(user_context: BrowserContext, invitelink):
def test_create_user_session(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager): def test_create_user_session(context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
context.set_extra_http_headers(LOCALE)
context.set_default_timeout(TIMEOUT) context.set_default_timeout(TIMEOUT)
# load admin cookies # load admin cookies

View file

@ -1,179 +0,0 @@
## this file will not be used later
## split into
# -> setup.setup_authentic.py
# and
# -> tests
import pytest
from icecream import ic
from playwright.sync_api import Browser, Locator, expect
# playwright = sync_playwright().start()
# browser = playwright.chromium.launch(headless=False)
testuser = {"username": "testuser", "name": "Test User", "password": "test123", "email": "test@example.com"}
TIMEOUT = 5000
def check_for(locator: Locator):
expect(locator).to_be_visible(timeout=TIMEOUT)
def setup_context(browser, state_file=None):
if state_file:
context = browser.new_context(storage_state=state_file)
else:
context = browser.new_context()
context.set_default_timeout(TIMEOUT)
return context
""" Test Authentik Login and DE Locale """
@pytest.fixture(scope="session", autouse=True)
def admin_login(browser: Browser, dotenv_config, STATES):
# ic(dotenv_config)
CONFIG = dotenv_config
context = setup_context(browser)
page = context.new_page()
url = "https://" + CONFIG["DOMAIN"]
ic(url)
page.goto(url)
welcome_message = CONFIG.get("welcome_message")
if welcome_message:
check_for(page.get_by_text(welcome_message))
if CONFIG["locale"] == "de":
check_for(page.get_by_text("Benutzername oder Passwort vergessen?"))
check_for(page.get_by_text("E-Mail or Anmeldename"))
check_for(page.get_by_text("Passwort", exact=True))
page.locator('input[name="uidField"]').fill(CONFIG["admin"])
page.locator('ak-stage-identification input[name="password"]').fill(CONFIG["admin_pw"])
page.get_by_role("button", name="Log In").click()
check_for(page.locator("ak-library"))
if CONFIG["locale"] == "de":
check_for(page.get_by_text("Meine Anwendungen"))
context.storage_state(path=f"{STATES}/admin_state.json")
page.close()
context.close()
""" Create User """
@pytest.fixture(scope="session", autouse=True)
def init_create_user(browser: Browser, dotenv_config, STATES):
admin_context = setup_context(browser, f"{STATES}/admin_state.json")
admin_page = admin_context.new_page()
invitelink = create_invite_link(admin_page, dotenv_config)
admin_context.close()
user_context = setup_context(browser)
create_user(user_context, invitelink)
user_context.close()
""" Delete User """
@pytest.fixture(scope="session", autouse=True)
def post_delete_user(browser: Browser, dotenv_config, RECORDS, STATES):
yield
context = browser.new_context(storage_state=f"{STATES}/admin_state.json")
context.tracing.start(screenshots=True, snapshots=True, sources=True)
context.set_default_timeout(TIMEOUT)
page = context.new_page()
# delete_nextcloud_user(page)
delete_authentik_user(page, dotenv_config)
context.tracing.stop(path=f"{RECORDS}/delete_user.zip")
""" Create Invite Link """
def create_invite_link(page, dotenv_config):
CONFIG = dotenv_config
page.goto(CONFIG["domain"])
page.get_by_role("link", name="Admin Interface").click()
page.get_by_text("Verzeichnis").click()
page.get_by_text("Benutzer").nth(2).click()
page.get_by_text("Einladungen").click()
page.get_by_role("button", name="Erstellen").first.click()
page.locator('input[name="name"]').click()
linkname = "testlink9433"
page.locator('input[name="name"]').fill(linkname)
page.get_by_placeholder("Wählen Sie ein Objekt aus.").click()
page.get_by_role("option", name="invitation-enrollment-flow invitation-enrollment-flow").click()
page.get_by_text("Erstellen", exact=True).first.click()
linklocator = page.get_by_role("rowgroup").filter(has=page.get_by_text(linkname))
linklocator.locator(".fa-angle-down").click()
invitelink = linklocator.get_by_role("textbox").get_attribute(name="value")
return invitelink
""" Create User from invitelink """
def create_user(context, invitelink, STATES):
page = context.new_page()
page.goto(invitelink)
page.get_by_placeholder("Benutzername").click()
page.get_by_placeholder("Benutzername").fill(testuser["username"])
page.locator('input[name="name"]').click()
page.locator('input[name="name"]').fill(testuser["name"])
page.locator('input[name="email"]').click()
page.locator('input[name="email"]').fill(testuser["email"])
page.get_by_placeholder("Passwort", exact=True).click()
page.get_by_placeholder("Passwort", exact=True).fill(testuser["password"])
page.get_by_placeholder("Passwort (wiederholen)").click()
page.get_by_placeholder("Passwort (wiederholen)").fill(testuser["password"])
page.get_by_role("button", name="Weiter").click()
check_for(page.locator("ak-library"))
context.storage_state(path=f"{STATES}/user_state.json")
""" Delete Authentik Account """
def delete_authentik_user(page, dotenv_config):
CONFIG = dotenv_config
page.goto(CONFIG["domain"])
page.get_by_role("link", name="Admin Interface").click()
page.get_by_text("Verzeichnis").click()
page.get_by_text("Benutzer").nth(2).click()
page.get_by_role("row").filter(has=page.get_by_text(testuser["username"])).get_by_role("checkbox").click()
page.get_by_role("button", name="Löschen").click()
page.get_by_role("dialog").get_by_role("button", name="Löschen").click()
check_for(page.get_by_text("1 Benutzer erfolgreich gelöscht"))
""" Reuse Authentik Admin Session """
@pytest.fixture
def admin_session(browser: Browser, dotenv_config, STATES):
CONFIG = dotenv_config
context = setup_context(browser, f"{STATES}/admin_state.json")
page = context.new_page()
page.goto(CONFIG["domain"])
yield context, page
context.close()
""" Reuse Authentik User Session """
@pytest.fixture
def user_session(browser: Browser, dotenv_config, STATES):
CONFIG = dotenv_config
context = setup_context(browser, f"{STATES}/user_state.json")
page = context.new_page()
page.goto(CONFIG["domain"])
yield context, page
context.close()
def test_true():
assert 1 + 1 == 2

View file

@ -0,0 +1,26 @@
"""
This file can be used to define fixtures thate are then used by other tests which
depend on [demo]. For this to work
1. the Runner class of the other test needs to define the depencency as seen
by referencing RunnerDemo in the dependencies list:
from src.tests_demo.runner_demo import RunnerDemo
class RunnerOther(Runner):
dependencies = [RunnerDemo]
2. the specific tests that rely on these fixtures need to import the fixtures.
To globally import for all tests in 'other', the import should be done in conftest:
in 'conftest.py' in 'test_other' dir:
from src.tests_demo.fixtures_demo import demo_fixture
"""
import pytest
@pytest.fixture
def demo_fixture():
return ""

View file

@ -0,0 +1,29 @@
from typing import Optional
from src.runner import Runner, SubTest
from src.tests_authentik.runner_authentik import RunnerAuthentik
class RunnerDemo(Runner):
"""Every env file has a corresponding runner class"""
name: str = "demo" # name of the test, used for logging / output naming
test_dir_name: str = "tests_demo" # dir name holding all tests related to RunnerDemo
# Filename of Demo setup. If defined, it will run 1st by executing pytest
main_setup_name: Optional[str] = "setup_demo.py"
# Filename of Demo test. This file contains unconditional tests that will be run in any
# case. If defined, it will run 2nd by executing pytest
main_test_name: Optional[str] = None
# this indicates that tests from RunnerDemo depend on the setup from RunnerAuthentik.
# RunnerDemo will only execute, when setup_authentik.py has finished successfully.
# For example, setup_authentik.py generates session states, that can be used as fixtures
# that can be loaded from fixtures_authentik.py
dependencies: list[type["Runner"]] = [RunnerAuthentik]
# this list can hold many more tests from RunnerDemo that run conditional. The condition
# and the test file can be defined by creating a SubTest instance:
# SubTest(condition: Callable, test_file: str)
sub_tests: list[SubTest] = []

View file

@ -0,0 +1,3 @@
# Define functions here that are specifically meant for setup, not for testing. This means
# all actions that simply are required for other tests from 'demo' to run. Runs before all
# tests from 'demo'.

View file

@ -1 +1 @@
from src.tests_authentik.fixtures_authentik import admin_session, user_session from src.tests_authentik.fixtures_authentik import admin_context, user_context

View file

@ -1,21 +1,28 @@
from pathlib import Path
from src.runner import Runner, SubTest from src.runner import Runner, SubTest
from src.tests_authentik.runner_authentik import RunnerAuthentik
def condition_always_true(dotenv_path: Path) -> bool: def condition_always_true(dotenv_config: dict[str, str]) -> bool:
return True return True
def condition_always_false(dotenv_path: Path) -> bool: def condition_always_false(dotenv_config: dict[str, str]) -> bool:
return False
def condition_has_locale(dotenv_config: dict[str, str]) -> bool:
if "LOCALE" in dotenv_config:
if "de" in dotenv_config["LOCALE"]:
return True
return False return False
class RunnerWordpress(Runner): class RunnerWordpress(Runner):
name = "wordpress" name = "wordpress"
test_dir_name = "tests_wordpress" test_dir_name = "tests_wordpress"
# main_test_name = "test_wordpress.py" main_test_name = "test_wordpress.py"
dependencies: list[type[Runner]] = [RunnerAuthentik]
sub_tests = [ sub_tests = [
SubTest(condition=condition_always_true, test_file="test_wordpress_feature1.py"), SubTest(condition=condition_has_locale, test_file="test_wordpress_localization.py"),
] ]
dependencies: list[str] = ["authentik"] prevent_skip = True

View file

@ -1,29 +1,14 @@
import re import re
import pytest from playwright.sync_api import BrowserContext, expect
from playwright.sync_api import Page, expect
from src.dirmanager import DirManager
def test_one(): def test_visit_from_authentik(admin_context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
assert 1 + 1 == 2 page_authentik = admin_context.new_page()
with page_authentik.expect_popup() as event_context:
page_authentik.get_by_role("link", name="Wordpress").click()
page_wordpress = event_context.value
expect(page_wordpress.locator("#wpcontent")).to_be_visible()
def test_two():
assert 2 + 1 == 3
def test_has_title(page: Page):
page.goto("https://playwright.dev/")
# Expect a title "to contain" a substring.
expect(page).to_have_title(re.compile("Playwright"))
def test_get_started_link(page: Page):
page.goto("https://playwright.dev/")
# Click the get started link.
page.get_by_role("link", name="Get started").click()
# Expects page to have a heading with the name of Installation.
expect(page.get_by_role("heading", name="Installation")).to_be_visible()

View file

@ -1,31 +0,0 @@
import re
from icecream import ic
from playwright.sync_api import BrowserContext, Page, expect
def test_demo(admin_session: BrowserContext):
admin_session.new_page()
assert 1 + 1 == 2
# def test_one(config):
# ic(config)
# assert 1 + 1 == 2
# def test_has_title(page: Page):
# page.goto("https://playwright.dev/")
# # Expect a title "to contain" a substring.
# expect(page).to_have_title(re.compile("Playwright"))
# def test_get_started_link(page: Page):
# page.goto("https://playwright.dev/")
# # Click the get started link.
# page.get_by_role("link", name="Get started").click()
# # Expects page to have a heading with the name of Installation.
# expect(page.get_by_role("heading", name="Installation")).to_be_visible()

View file

@ -1,22 +1,15 @@
# WIP localization # WIP localization
from playwright.sync_api import Page, expect from playwright.sync_api import BrowserContext, expect
from src.dirmanager import DirManager
def test_has_title(page: Page): def test_welcome_message(user_context: BrowserContext, dotenv_config: dict[str, str], DIR: DirManager):
page.goto("https://playwright.dev/") page = user_context.new_page()
url = "https://" + dotenv_config["DOMAIN"]
page.goto(url)
# Expect a title "to contain" a substring. expect(page.locator("#wpcontent")).to_be_visible()
expect(page).to_have_title(re.compile("Playwright")) if "locale" in dotenv_config and "de" in dotenv_config["locale"]:
expect(page.get_by_role("heading")).to_have_text("Willkommen bei WordPress!")
def test_wordpress(admin_session):
context, page = admin_session
with page.expect_popup() as info:
page.get_by_role("link", name="Wordpress").click()
wordpress = info.value
check_for(wordpress.locator("#wpcontent"))
if CONFIG["locale"] == "de":
check_for(wordpress.get_by_role("heading", name="Willkommen bei WordPress!"))
context.tracing.stop(path=f"{RECORDS}/wordpress.zip")

View file

@ -1,7 +1,20 @@
from datetime import datetime from datetime import datetime
from pathlib import Path
@staticmethod
def get_session_id() -> str: def get_session_id() -> str:
current_datetime = datetime.now() current_datetime = datetime.now()
return current_datetime.strftime("%Y-%m-%d-%H-%M-%S") return current_datetime.strftime("%Y-%m-%d-%H-%M-%S")
def rmtree(root_dir: Path):
"""removes a folder with content recursively"""
if not root_dir.is_dir():
return
for child in root_dir.iterdir():
if child.is_dir():
rmtree(child)
else:
child.unlink()
root_dir.rmdir()

View file

@ -1,55 +0,0 @@
import os
from pathlib import Path
from typing import Protocol
from dotenv import dotenv_values
from src.dirmanager import DirManager
from src.tests_authentik.runner_authentik import RunnerAuthentik
from src.tests_wordpress.runner_wordpress import RunnerWordpress
class TestRunner(Protocol):
def __init__(self, dotenv_path: Path, output_dir: Path, session_id: str):
...
def run_tests(self):
...
# Register all runners here. A .env file with TYPE=authentik will be ran with RunnerAuthentik
RUNNER_DICT: dict[str, type[TestRunner]] = {
"authentik": RunnerAuthentik,
"wordpress": RunnerWordpress,
}
class Wrapper:
def __init__(self, env_files: list[Path], output_dir: Path, session_id: str):
self.env_files = env_files
self.check_env_files(self.env_files)
self.output_dir = output_dir
self.session_id = session_id
def setup_test(self):
self.dir_manager = DirManager(output_dir=self.output_dir, session_id=self.session_id)
self.dir_manager.create_all_dirs()
def run_test(self):
self.runners: list[TestRunner] = self._load_runners(self.env_files)
for runner in self.runners:
runner.run_tests()
def _load_runners(self, env_files: list[Path]) -> list[TestRunner]:
runners = []
for env_file in env_files:
config: dict[str, str] = dotenv_values(env_file) # type: ignore
RunnerClass = RUNNER_DICT[config["TYPE"]]
runners.append(RunnerClass(dotenv_path=env_file, output_dir=self.output_dir, session_id=self.session_id))
return runners
@staticmethod
def check_env_files(env_files: list[Path]):
"""checks if file exist for every file in list"""
for env_file in env_files:
assert env_file.is_file()