refactor for independent test dirs (#7)
* make it so that the actual tests can be moved anywhere, for example in abra recipe repos -> major refactoring with pytest test discovery magic * create RUNNER_DICT dynamically with importlib -> none of the tests are hardcoded, more tests can be added by placing a folder * autoload fixtures with pytest plugins * add URL fixture to navigate on web pages. Includes url parser based on python urllib to generate correct links * fix nextcloud setups and tests * add email groundwork with imbox Reviewed-on: local-it-infrastructure/e2e_tests#7 Co-authored-by: Daniel <d.brummerloh@gmail.com> Co-committed-by: Daniel <d.brummerloh@gmail.com>
This commit is contained in:
parent
3fa10aaa69
commit
f9c21c6e6b
45 changed files with 373 additions and 228 deletions
0
abratest/__init__.py
Normal file
0
abratest/__init__.py
Normal file
108
abratest/coordinator.py
Normal file
108
abratest/coordinator.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
import importlib
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from abratest.dir_manager import DirManager
|
||||
from abratest.env_manager import EnvFile, EnvManager
|
||||
from abratest.html_helper import merge_html_files
|
||||
from abratest.runner import Runner
|
||||
from abratest.utils import rmtree
|
||||
|
||||
|
||||
class Coordinator:
|
||||
def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str, recipes_dir: Path) -> None:
|
||||
# logging
|
||||
out_string = "".join([e.name + "\n" for e in env_paths_list])
|
||||
out_string += f"output_dir = {output_dir}\n"
|
||||
out_string += f"session_id = {session_id}"
|
||||
logger.info(f"initialize Coordinator instance with\nenv_paths_list =\n{out_string}")
|
||||
|
||||
self.RUNNER_DICT = self.create_runner_dict(recipes_dir)
|
||||
self.DIR = DirManager(output_dir=output_dir, session_id=session_id, recipes_dir=recipes_dir)
|
||||
self.ENV = EnvManager(env_paths_list, self.RUNNER_DICT)
|
||||
|
||||
def setup_test(self) -> None:
|
||||
logger.info("calling setup_test()")
|
||||
self.DIR.create_all_dirs()
|
||||
self.ENV.copy_env_files(self.DIR)
|
||||
|
||||
def run_test(self) -> None:
|
||||
logger.info("calling run_test()")
|
||||
self.runners: list[Runner] = self._load_runners(self.ENV.env_files)
|
||||
for runner in self.runners:
|
||||
runner.run_setups()
|
||||
for runner in self.runners:
|
||||
runner.run_tests()
|
||||
for runner in self.runners:
|
||||
runner.run_cleanups()
|
||||
logger.info("run_test() finished")
|
||||
|
||||
def _load_runners(self, env_files: list[EnvFile]) -> list[Runner]:
|
||||
"""Creates an instance of the correct Runner class for each given env file"""
|
||||
runners: list[Runner] = []
|
||||
for env_file in env_files:
|
||||
RunnerClass = self.RUNNER_DICT[env_file.config["TYPE"]]
|
||||
dependency_classes: list[type[Runner]] = []
|
||||
for dependency in RunnerClass.dependencies:
|
||||
dependency_classes.append(self.RUNNER_DICT[dependency])
|
||||
runner_instance = RunnerClass(dotenv_path=env_file.env_path, DIR=self.DIR)
|
||||
runner_instance._dependency_runners = dependency_classes
|
||||
runners.append(runner_instance)
|
||||
return runners
|
||||
|
||||
def combine_html(self) -> None:
|
||||
"""combines all generated pytest html reports into one"""
|
||||
in_path = str(self.DIR.RECORDS / "html")
|
||||
out_path = str(self.DIR.RECORDS / "full-report.html")
|
||||
title = "combined.html"
|
||||
merge_html_files(in_path, out_path, title)
|
||||
|
||||
def collect_traces(self):
|
||||
"""moves all traces into SESSION/RECORDS dir
|
||||
|
||||
if tests are rerun and generate another trace, the new trace will get a unique name such as
|
||||
tracename-0
|
||||
tracename-1
|
||||
...
|
||||
"""
|
||||
|
||||
def get_new_path(root_dir: Path, base_name: str, index=0) -> Path:
|
||||
new_name_alt = base_name + f"-{index}"
|
||||
if not (root_dir / new_name_alt).is_dir():
|
||||
return root_dir / new_name_alt
|
||||
else:
|
||||
index += 1
|
||||
return get_new_path(root_dir, base_name, index=index)
|
||||
|
||||
trace_root_dir = self.DIR.RECORDS / "traces"
|
||||
for f in trace_root_dir.rglob("*/trace.zip"):
|
||||
new_path = get_new_path(self.DIR.RECORDS, f.parent.name)
|
||||
f.parent.rename(new_path)
|
||||
rmtree(trace_root_dir)
|
||||
|
||||
@staticmethod
|
||||
def create_runner_dict(recipes_dir: Path) -> dict[str, type["Runner"]]:
|
||||
"""Creates a dictionary holding all the RunnerClasses that can be discovered in recipes_dir
|
||||
|
||||
example:
|
||||
RUNNER_DICT: dict[str, type["Runner"]] = {
|
||||
"authentik": RunnerAuthentik,
|
||||
"wordpress": RunnerWordpress,
|
||||
"nextcloud": RunnerNextcloud,
|
||||
}
|
||||
"""
|
||||
|
||||
RUNNER_DICT: dict[str, type["Runner"]] = dict()
|
||||
runner_discovery_pattern = re.compile("Runner.+")
|
||||
|
||||
for module_path in recipes_dir.rglob("*/runner*.py"):
|
||||
rel_path = module_path.relative_to(recipes_dir).as_posix().replace("/", ".").replace(".py", "")
|
||||
module = importlib.import_module(rel_path)
|
||||
runner_class_names = [name for name in dir(module) if runner_discovery_pattern.match(name)]
|
||||
assert len(runner_class_names) == 1
|
||||
runner_class_name = runner_class_names[0]
|
||||
RunnerClass: type[Runner] = getattr(module, runner_class_name)
|
||||
RUNNER_DICT[RunnerClass.name] = RunnerClass
|
||||
return RUNNER_DICT
|
||||
71
abratest/dir_manager.py
Normal file
71
abratest/dir_manager.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
class DirManager:
|
||||
"""Manages directories for the tests and should be used to create and find
|
||||
and use the correct directories.
|
||||
|
||||
The structures is as follows:
|
||||
tests dir/
|
||||
session_dir-1/
|
||||
records
|
||||
results
|
||||
states
|
||||
session_dir-2/
|
||||
records
|
||||
...
|
||||
"""
|
||||
|
||||
def __init__(self, output_dir: Path | str, session_id: str, recipes_dir: Path | str = ""):
|
||||
if isinstance(output_dir, str):
|
||||
output_dir = Path(output_dir)
|
||||
self.output_dir = output_dir.resolve()
|
||||
self.session_id = session_id
|
||||
if isinstance(recipes_dir, str):
|
||||
recipes_dir = Path(recipes_dir)
|
||||
self.recipes_dir = recipes_dir
|
||||
|
||||
def create_all_dirs(self) -> None:
|
||||
dirs: list[Path] = [
|
||||
self.OUTPUT_DIR,
|
||||
self.SESSION,
|
||||
self.RECORDS,
|
||||
self.HTML,
|
||||
self.STATES,
|
||||
self.ENV_FILES,
|
||||
self.RESULTS,
|
||||
]
|
||||
for d in dirs:
|
||||
d.mkdir(exist_ok=True)
|
||||
|
||||
@property
|
||||
def OUTPUT_DIR(self):
|
||||
return self.output_dir
|
||||
|
||||
@property
|
||||
def SESSION(self):
|
||||
return self.OUTPUT_DIR / f"test-{self.session_id}"
|
||||
|
||||
@property
|
||||
def RECORDS(self):
|
||||
return self.SESSION / "records"
|
||||
|
||||
@property
|
||||
def HTML(self):
|
||||
return self.RECORDS / "html"
|
||||
|
||||
@property
|
||||
def STATES(self):
|
||||
return self.SESSION / "states"
|
||||
|
||||
@property
|
||||
def ENV_FILES(self):
|
||||
return self.STATES / "env_files"
|
||||
|
||||
@property
|
||||
def RESULTS(self):
|
||||
return self.SESSION / "results"
|
||||
|
||||
@property
|
||||
def RECIPES(self):
|
||||
return self.recipes_dir
|
||||
100
abratest/env_manager.py
Normal file
100
abratest/env_manager.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
from dotenv import dotenv_values
|
||||
|
||||
from abratest.dir_manager import DirManager
|
||||
from abratest.runner import Runner
|
||||
|
||||
|
||||
class EnvFile(NamedTuple):
|
||||
env_path: Path
|
||||
config: dict[str, str]
|
||||
env_type: str
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"EnvFile(type={self.env_type})"
|
||||
|
||||
|
||||
class DependencyRule(NamedTuple):
|
||||
child: str
|
||||
dependency: str
|
||||
|
||||
|
||||
class EnvManager:
|
||||
def __init__(self, env_paths_list: list[Path], RUNNER_DICT: dict[str, type["Runner"]]):
|
||||
self.env_files: list[EnvFile] = self._get_env_files(env_paths_list)
|
||||
self.dependency_rules: list[DependencyRule] = self._get_dependency_rules(self.env_files, RUNNER_DICT)
|
||||
self.env_files = self.sort_env_files_by_rule(self.env_files, self.dependency_rules)
|
||||
|
||||
@staticmethod
|
||||
def _get_env_files(env_paths: list[Path]) -> list[EnvFile]:
|
||||
"""Returns a list of EnvFile objects created from the given env files"""
|
||||
env_files: list[EnvFile] = []
|
||||
for env_path in env_paths:
|
||||
assert env_path.is_file(), f"the env file {env_path} does not exist"
|
||||
config: dict[str, str] = dotenv_values(env_path) # type: ignore
|
||||
assert "TYPE" in config, f"the env file {env_path} does not specify the required TYPE key."
|
||||
env_type = config["TYPE"]
|
||||
env_files.append(EnvFile(env_path=env_path, config=config, env_type=env_type))
|
||||
return env_files
|
||||
|
||||
@staticmethod
|
||||
def _get_dependency_rules(env_files: list[EnvFile], RUNNER_DICT: dict[str, type["Runner"]]) -> list[DependencyRule]:
|
||||
dependency_rules: list[DependencyRule] = []
|
||||
for env_file in env_files:
|
||||
child_runner_class = RUNNER_DICT[env_file.env_type]
|
||||
for dependency in child_runner_class.dependencies:
|
||||
dependency_rule = DependencyRule(child=child_runner_class.name, dependency=dependency)
|
||||
dependency_rules.append(dependency_rule)
|
||||
return dependency_rules
|
||||
|
||||
@staticmethod
|
||||
def _get_indices_by_string(in_list: list[EnvFile], string: str) -> list[int]:
|
||||
"""returns all indices of items in in_list, where item.env_type matches string"""
|
||||
return [index for index, element in enumerate(in_list) if element.env_type == string]
|
||||
|
||||
@staticmethod
|
||||
def _swap_item_with_previous(in_list: list[EnvFile], index: int):
|
||||
"""swaps item at index N with item at index N-1"""
|
||||
assert index > 0, "cannot swap with negative index"
|
||||
in_list[index], in_list[index - 1] = in_list[index - 1], in_list[index]
|
||||
|
||||
@classmethod
|
||||
def is_rule_satisfied(cls, env_list: list[EnvFile], rule: DependencyRule, swap=False) -> bool:
|
||||
"""returns if the ordering in in_list is compliant with the given rule
|
||||
|
||||
if swap=True, some reordering will happen in case of a violated rule"""
|
||||
|
||||
child_indices = cls._get_indices_by_string(env_list, rule.child)
|
||||
parent_indices = cls._get_indices_by_string(env_list, rule.dependency)
|
||||
for child_index in child_indices:
|
||||
for parent_index in parent_indices:
|
||||
if not parent_index < child_index:
|
||||
if swap:
|
||||
cls._swap_item_with_previous(env_list, parent_index)
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def sort_env_files_by_rule(cls, env_list: list[EnvFile], rules: list[DependencyRule]) -> list[EnvFile]:
|
||||
out_list = env_list.copy()
|
||||
|
||||
for _ in range(10_000):
|
||||
rule_satisfied: list[bool] = []
|
||||
for rule in rules:
|
||||
rule_satisfied.append(cls.is_rule_satisfied(out_list, rule, swap=True))
|
||||
|
||||
if all(rule_satisfied):
|
||||
return out_list
|
||||
raise ValueError(
|
||||
"Could not resolve test order. This is possibly due to a circular dependency (a on b, b on c, c on a)"
|
||||
)
|
||||
|
||||
def copy_env_files(self, DIR: DirManager) -> None:
|
||||
"""Copies all env files to STATES/env_files. Files will be renamed to their own TYPE value."""
|
||||
env_files_dir = DIR.STATES / "env_files"
|
||||
env_files_dir.mkdir(exist_ok=True)
|
||||
for env_file in self.env_files:
|
||||
shutil.copy(env_file.env_path, env_files_dir / env_file.env_type)
|
||||
194
abratest/html_helper.py
Normal file
194
abratest/html_helper.py
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
# code from
|
||||
# https://github.com/akavbathen/pytest_html_merger/tree/main
|
||||
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from packaging import version
|
||||
|
||||
CHECKBOX_REGEX = r"^(?P<num>0|[1-9]\d*) (?P<txt1>.*)"
|
||||
|
||||
|
||||
def merge_html_files(in_path: str, out_path: str, title: str):
|
||||
paths = get_html_files(in_path, out_path)
|
||||
if not paths:
|
||||
raise RuntimeError(f"Unable to find html files in {in_path}")
|
||||
|
||||
assets_dir_path = get_assets_path(in_path)
|
||||
|
||||
first_file = BeautifulSoup("".join(open(paths[0])), features="html.parser")
|
||||
paths.pop(0)
|
||||
|
||||
try:
|
||||
first_file.find("link").decompose()
|
||||
except:
|
||||
pass
|
||||
|
||||
if assets_dir_path is None:
|
||||
print(
|
||||
f"Will assume css is embedded in the reports. If this is not the case, "
|
||||
f"Please make sure that you have 'assets' directory inside {in_path} "
|
||||
f"which contains css files generated by pytest-html."
|
||||
)
|
||||
else:
|
||||
with open(os.path.join(assets_dir_path, "style.css"), "r") as f:
|
||||
content = f.read()
|
||||
|
||||
head = first_file.head
|
||||
head.append(first_file.new_tag("style", type="text/css"))
|
||||
head.style.append(content)
|
||||
|
||||
h = first_file.find("h1")
|
||||
h.string = title or os.path.basename(out_path)
|
||||
|
||||
ps = first_file.find_all("p")
|
||||
pytest_version = ps[0].text.split(" ")[-1]
|
||||
ps.pop(0)
|
||||
|
||||
cb_types = {
|
||||
"passed": [0, ""],
|
||||
"skipped": [0, ""],
|
||||
"failed": [0, ""],
|
||||
"error": [0, ""],
|
||||
"xfailed": [0, ""],
|
||||
"xpassed": [0, ""],
|
||||
}
|
||||
|
||||
html_ver = version.parse(pytest_version)
|
||||
if html_ver >= version.parse("4.0.0rc"):
|
||||
cb_types["rerun"] = [0, ""]
|
||||
|
||||
for cb_type in cb_types:
|
||||
cb_val = get_checkbox_value(first_file, cb_type)
|
||||
cb_types[cb_type][0] = cb_val[0]
|
||||
cb_types[cb_type][1] = cb_val[1]
|
||||
|
||||
dur, test_count, fp = get_test_count_and_duration(ps, html_ver)
|
||||
|
||||
if html_ver < version.parse("4.0.0rc"):
|
||||
t = first_file.find("table", {"id": "results-table"})
|
||||
else:
|
||||
f_json_blob = first_file.find("div", {"id": "data-container"}).get("data-jsonblob")
|
||||
# Convert the JSON string into a dictionary
|
||||
f_data_dict = json.loads(f_json_blob)
|
||||
|
||||
for path in paths:
|
||||
cur_file = BeautifulSoup("".join(open(path)), features="html.parser")
|
||||
|
||||
if html_ver < version.parse("4.0.0rc"):
|
||||
tbody_res = cur_file.find_all("tbody", {"class": "results-table-row"})
|
||||
for elm in tbody_res:
|
||||
t.append(elm)
|
||||
else:
|
||||
f_json_blob = cur_file.find("div", {"id": "data-container"}).get("data-jsonblob")
|
||||
# Convert the JSON string into a dictionary
|
||||
c_data_dict = json.loads(f_json_blob)
|
||||
|
||||
f_data_dict["tests"].update(c_data_dict["tests"])
|
||||
|
||||
p_res = cur_file.find_all("p")
|
||||
_dur, _test_count, _ = get_test_count_and_duration(p_res, html_ver)
|
||||
dur += _dur
|
||||
test_count += _test_count
|
||||
|
||||
for cb_type in cb_types:
|
||||
tmp = get_checkbox_value(cur_file, cb_type)
|
||||
cb_types[cb_type][0] += tmp[0]
|
||||
|
||||
fp.string = f"{test_count} tests ran in {dur} seconds"
|
||||
|
||||
if html_ver >= version.parse("4.0.0rc"):
|
||||
first_file.find("div", {"id": "data-container"})["data-jsonblob"] = json.dumps(f_data_dict)
|
||||
|
||||
for cb_type in cb_types:
|
||||
set_checkbox_value(first_file, cb_type, cb_types[cb_type])
|
||||
|
||||
with open(out_path, "w") as f:
|
||||
f.write(str(first_file))
|
||||
|
||||
|
||||
def get_test_count_and_duration(ps, html_ver):
|
||||
test_count = 0
|
||||
dur = 0
|
||||
fp = None
|
||||
|
||||
for p in ps:
|
||||
if html_ver >= version.parse("4.0.0"):
|
||||
match = re.search(r"test.* took ", p.text)
|
||||
if match:
|
||||
tmp = p.text.split(" ")
|
||||
test_count = int(tmp[0])
|
||||
|
||||
if "ms." in tmp:
|
||||
dur = int(tmp[3]) / 1000
|
||||
else:
|
||||
hours, minutes, seconds = map(int, tmp[3][:-1].split(":"))
|
||||
dur = hours * 3600 + minutes * 60 + seconds
|
||||
|
||||
fp = p
|
||||
|
||||
break
|
||||
|
||||
if html_ver < version.parse("4.0.0"):
|
||||
if " tests ran" in p.text:
|
||||
tmp = p.text.split(" ")
|
||||
test_count = int(tmp[0])
|
||||
dur = float(tmp[4])
|
||||
fp = p
|
||||
|
||||
break
|
||||
|
||||
return dur, test_count, fp
|
||||
|
||||
|
||||
def set_checkbox_value(root_soap, cb_type, val):
|
||||
elem = root_soap.find("span", {"class": cb_type})
|
||||
match = re.search(CHECKBOX_REGEX, elem.text)
|
||||
if match is None:
|
||||
raise RuntimeError(f"{cb_type} <span> not found")
|
||||
|
||||
elem.string = f"{val[0]} {val[1]}"
|
||||
|
||||
elem = root_soap.find("input", {"data-test-result": cb_type})
|
||||
if val[0] != 0:
|
||||
del elem["disabled"]
|
||||
del elem["hidden"]
|
||||
|
||||
|
||||
def get_checkbox_value(root_soap, cb_type):
|
||||
elem = root_soap.find("span", {"class": cb_type})
|
||||
match = re.search(CHECKBOX_REGEX, elem.text)
|
||||
if match is None:
|
||||
raise RuntimeError(f"{cb_type} <span> not found")
|
||||
|
||||
gdict = match.groupdict()
|
||||
|
||||
return int(gdict["num"]), gdict["txt1"]
|
||||
|
||||
|
||||
def get_html_files(path, output_file_path):
|
||||
onlyfiles = []
|
||||
output_file_path = os.path.abspath(output_file_path)
|
||||
|
||||
for p in pathlib.Path(path).rglob("*.html"):
|
||||
res = str(p.absolute())
|
||||
if output_file_path in res:
|
||||
continue
|
||||
|
||||
tmp = BeautifulSoup("".join(open(res)), features="html.parser")
|
||||
p = tmp.find("p")
|
||||
if p and "Report generated on " in p.text:
|
||||
onlyfiles.append(res)
|
||||
|
||||
return sorted(onlyfiles, reverse=True)
|
||||
|
||||
|
||||
def get_assets_path(path):
|
||||
res = None
|
||||
for p in pathlib.Path(path).rglob("assets"):
|
||||
return str(p.absolute())
|
||||
|
||||
return res
|
||||
94
abratest/plugin-abra.py
Normal file
94
abratest/plugin-abra.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
# regarding conftest:
|
||||
# If you have conftest.py files which do not reside in a python package directory
|
||||
# (i.e. one containing an __init__.py) then “import conftest” can be ambiguous
|
||||
# because there might be other conftest.py files as well on your PYTHONPATH or
|
||||
# sys.path. It is thus good practise for projects to either put conftest.py under
|
||||
# a package scope or to never import anything from a conftest.py file.
|
||||
|
||||
import os
|
||||
from imaplib import IMAP4_SSL
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from dotenv import dotenv_values
|
||||
from playwright.sync_api import BrowserContext, expect
|
||||
from pytest import Parser
|
||||
|
||||
from abratest.dir_manager import DirManager
|
||||
from abratest.utils import BaseUrl
|
||||
|
||||
# global timeout and LOCALE
|
||||
LOCALE = {"Accept-Language": "de_DE"}
|
||||
TIMEOUT = 20_000
|
||||
expect.set_options(timeout=TIMEOUT)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def context(context: BrowserContext) -> BrowserContext:
|
||||
context.set_default_timeout(TIMEOUT)
|
||||
context.set_extra_http_headers(LOCALE)
|
||||
return context
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
parser.addoption(
|
||||
"--env_file",
|
||||
action="store",
|
||||
required=True,
|
||||
)
|
||||
parser.addoption(
|
||||
"--output_dir",
|
||||
action="store",
|
||||
required=True,
|
||||
)
|
||||
parser.addoption(
|
||||
"--session_id",
|
||||
action="store",
|
||||
required=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def DIR(request) -> DirManager:
|
||||
"""Fixture holding test directories
|
||||
|
||||
DIR.OUTPUT
|
||||
DIR.SESSION
|
||||
DIR.RECORDS
|
||||
DIR.STATES
|
||||
DIR.RESULTS"""
|
||||
|
||||
output_dir = request.config.getoption("--output_dir")
|
||||
output_dir = Path(output_dir)
|
||||
session_id = request.config.getoption("--session_id")
|
||||
dirmanager = DirManager(output_dir=output_dir, session_id=session_id)
|
||||
dirmanager.create_all_dirs()
|
||||
return dirmanager
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def dotenv_config(request) -> dict[str, str]:
|
||||
dotenv_path = request.config.getoption("--env_file")
|
||||
dotenv_path = Path(dotenv_path)
|
||||
assert dotenv_path.is_file()
|
||||
return dotenv_values(dotenv_path) # type: ignore
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def URL(dotenv_config: dict[str, str]) -> BaseUrl:
|
||||
return BaseUrl(netloc=dotenv_config["DOMAIN"])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def imap_ssl_email_client() -> None:
|
||||
assert os.environ["IMAP_HOST"]
|
||||
assert os.environ["IMAP_PORT"]
|
||||
assert os.environ["IMAP_USER"]
|
||||
assert os.environ["IMAP_PASS"]
|
||||
port = int(os.environ["IMAP_PORT"])
|
||||
imap_client = IMAP4_SSL(host=os.environ["IMAP_HOST"], port=port)
|
||||
imap_client.login(os.environ["IMAP_USER"], os.environ["IMAP_PASS"])
|
||||
imap_client.select("INBOX")
|
||||
yield imap_client
|
||||
imap_client.close()
|
||||
imap_client.logout()
|
||||
190
abratest/runner.py
Normal file
190
abratest/runner.py
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
from dotenv import dotenv_values
|
||||
from loguru import logger
|
||||
|
||||
from abratest.dir_manager import DirManager
|
||||
|
||||
|
||||
@dataclass
|
||||
class Test:
|
||||
test_file: str
|
||||
condition: Callable[[dict[str, str]], bool] | None = None
|
||||
prevent_skip: bool = False
|
||||
|
||||
|
||||
class Runner:
|
||||
name: str = ""
|
||||
test_dir_name: str = ""
|
||||
setups: list[Test] = []
|
||||
tests: list[Test] = []
|
||||
cleanups: list[Test] = []
|
||||
dependencies: list[str] = []
|
||||
_dependency_runners: list[type["Runner"]] = []
|
||||
|
||||
def __init__(self, dotenv_path: Path, DIR: DirManager):
|
||||
self.dotenv_path = dotenv_path
|
||||
self.config: dict[str, str] = dotenv_values(dotenv_path) # type: ignore
|
||||
self.DIR = DIR
|
||||
|
||||
logger.info(f"creating instance of {self.__class__.__name__}")
|
||||
assert self.test_dir_name
|
||||
self.root_dir = Path(__file__).parent
|
||||
|
||||
def run_setups(self):
|
||||
"""runs the setup scripts if available"""
|
||||
self._execute_test_list(self.setups)
|
||||
|
||||
def run_tests(self):
|
||||
"""runs the test scripts if available"""
|
||||
self._execute_test_list(self.tests)
|
||||
|
||||
def run_cleanups(self):
|
||||
"""runs the cleanup scripts if available"""
|
||||
self._execute_test_list(self.cleanups)
|
||||
|
||||
def _execute_test_list(self, test_list: list[Test]):
|
||||
"""runs the main test script and if available and sub test scripts if their running condition is met"""
|
||||
# check if required dependencies have passed
|
||||
if not self._dependencies_passed():
|
||||
logger.warning(f"skipping run_tests() of {self.name}, because some dependencies have not passed")
|
||||
return
|
||||
|
||||
for test in test_list:
|
||||
self._run_test_with_checks(test)
|
||||
|
||||
def _run_test_with_checks(self, test: Test):
|
||||
# dependency passed: true / false
|
||||
# already_passed: true / false
|
||||
# prevent_skip: true / false
|
||||
# condition_available: true / pass
|
||||
# condition_met: true / false
|
||||
|
||||
identifier_string = self.combine_names(self.name, test.test_file)
|
||||
full_test_path = self.DIR.RECIPES / self.name / self.test_dir_name / test.test_file
|
||||
|
||||
# check if test aleady passed
|
||||
if self._is_test_passed(identifier_string, remove_existing=True):
|
||||
if test.prevent_skip:
|
||||
logger.info(f"continuing , test {identifier_string} has passed but prevent_skip=True")
|
||||
else:
|
||||
logger.info(f"skipping {identifier_string}, test has passed")
|
||||
return
|
||||
|
||||
if test.condition and not test.condition(self.config):
|
||||
# test condition is defined but not met
|
||||
logger.info(f"skipping {identifier_string}, test condition is not met")
|
||||
return
|
||||
|
||||
# test condition is undefined or not met
|
||||
logger.info(f"running {identifier_string}")
|
||||
result = self._call_pytest(full_test_path)
|
||||
self._create_result_file(result=result, identifier_string=identifier_string)
|
||||
|
||||
def _is_test_passed(self, identifier_string: str, remove_existing: bool = False) -> bool:
|
||||
"""returns True if the selected test matching identifier_string already passed
|
||||
|
||||
This is determined by the presence of a specific output file in the RESULTS folder that
|
||||
matches identifier_string
|
||||
|
||||
remove_existing: If True, result files matching identifier_string with a status
|
||||
other than 'passed' will be deleted"""
|
||||
|
||||
already_passed = False
|
||||
for result in self.DIR.RESULTS.glob("*"):
|
||||
if identifier_string in result.name:
|
||||
# process any result file (passed / failed / skipped) if it exists
|
||||
if "passed" in result.name:
|
||||
already_passed = True
|
||||
elif remove_existing:
|
||||
result.unlink()
|
||||
return already_passed
|
||||
|
||||
def _call_pytest(self, full_test_path: Path) -> int:
|
||||
"""runs pytest programmatically on a specific file
|
||||
|
||||
all tests in the file [full_test_path] will be run along with command line arguments"""
|
||||
|
||||
command_arguments = []
|
||||
|
||||
# command_arguments.append("--traceconfig")
|
||||
|
||||
command_arguments.append("-v")
|
||||
# command_arguments.append("-rx")
|
||||
command_arguments.append(str(full_test_path))
|
||||
|
||||
command_arguments.append("--env_file")
|
||||
command_arguments.append(str(self.dotenv_path))
|
||||
|
||||
# set root dir for tests output (used in DirManager). this is our custom argument
|
||||
command_arguments.append("--output_dir")
|
||||
command_arguments.append(str(self.DIR.OUTPUT_DIR))
|
||||
|
||||
command_arguments.append("--session_id")
|
||||
command_arguments.append(self.DIR.session_id)
|
||||
|
||||
# artifacts dir from pytest
|
||||
# warning: https://github.com/microsoft/playwright-pytest/issues/111
|
||||
# --output only works with the given context and page fixture
|
||||
# folder needs to be unique! traces will not appear, if every pytest run has same output dir
|
||||
command_arguments.append("--output")
|
||||
command_arguments.append(str(self.DIR.RECORDS / "traces" / full_test_path.stem))
|
||||
|
||||
# tracing
|
||||
command_arguments.append("--tracing")
|
||||
command_arguments.append("retain-on-failure")
|
||||
# command_arguments.append("on")
|
||||
|
||||
# Disable capturing. With -s set, prints will go to console as if pytest is not there.
|
||||
# command_arguments.append("-s")
|
||||
|
||||
# headed
|
||||
# command_arguments.append("--headed")
|
||||
|
||||
# html report. Will be combined into one file later.
|
||||
command_arguments.append(f"--html={self.DIR.RECORDS / 'html' / full_test_path.with_suffix('.html').name}")
|
||||
|
||||
return pytest.main(command_arguments)
|
||||
|
||||
def _create_result_file(
|
||||
self,
|
||||
result: int,
|
||||
identifier_string: str,
|
||||
):
|
||||
"""create result file to indicated passed/failed or skipped test"""
|
||||
|
||||
full_name = self.combine_names(self.result_int_to_str(result), identifier_string)
|
||||
file_path = self.DIR.RESULTS / full_name
|
||||
with open(file_path, "w") as _:
|
||||
pass # create empty file
|
||||
|
||||
def _dependencies_passed(self):
|
||||
"""returns true if all setups of each dependency have passed"""
|
||||
|
||||
# todo: what about conditional setups?
|
||||
|
||||
passed_tests = [r.name for r in self.DIR.RESULTS.glob("*") if "passed" in r.name]
|
||||
results = []
|
||||
for dependency_runner in self._dependency_runners:
|
||||
for setup_name in dependency_runner.setups:
|
||||
dependencie_identifier = self.combine_names(dependency_runner.name, setup_name.test_file)
|
||||
results.append(any(dependencie_identifier in f for f in passed_tests))
|
||||
return all(results)
|
||||
|
||||
@staticmethod
|
||||
def result_int_to_str(result_int: int) -> str:
|
||||
"""converts the pytest exit code (int) into a meaningful string"""
|
||||
match result_int:
|
||||
case -1:
|
||||
return "skipped"
|
||||
case 0:
|
||||
return "passed"
|
||||
case _:
|
||||
return "failed"
|
||||
|
||||
@staticmethod
|
||||
def combine_names(*names: str) -> str:
|
||||
return "-".join(names)
|
||||
40
abratest/utils.py
Normal file
40
abratest/utils.py
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlunparse
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseUrl:
|
||||
netloc: str
|
||||
scheme: str = "https"
|
||||
path: str = ""
|
||||
params: str = ""
|
||||
query: str = ""
|
||||
fragment: str = ""
|
||||
|
||||
def get(self, path: str = ""):
|
||||
return urlunparse((self.scheme, self.netloc, path, self.params, self.query, self.fragment))
|
||||
|
||||
|
||||
def get_session_id() -> str:
|
||||
current_datetime = datetime.now()
|
||||
return current_datetime.strftime("%Y-%m-%d-%H-%M-%S")
|
||||
|
||||
|
||||
def rmtree(root_dir: Path):
|
||||
"""removes a folder with content recursively"""
|
||||
if not root_dir.is_dir():
|
||||
return
|
||||
for child in root_dir.iterdir():
|
||||
if child.is_dir():
|
||||
rmtree(child)
|
||||
else:
|
||||
child.unlink()
|
||||
|
||||
root_dir.rmdir()
|
||||
|
||||
|
||||
def make_url(domain: str) -> str:
|
||||
"""adds 'http://' at the beginning of a string"""
|
||||
return "https://" + domain
|
||||
Loading…
Add table
Add a link
Reference in a new issue