import shutil from pathlib import Path from dotenv import dotenv_values from loguru import logger from src.dirmanager import DirManager from src.html_helper import merge_html_files from src.runner import Runner from src.tests_authentik.runner_authentik import RunnerAuthentik from src.tests_nextcloud.runner_nextcloud import RunnerNextcloud from src.tests_wordpress.runner_wordpress import RunnerWordpress from src.utils import rmtree # Register all runners here. Each .env file with TYPE=authentik will be run with RunnerAuthentik RUNNER_DICT: dict[str, type[Runner]] = { "authentik": RunnerAuthentik, "wordpress": RunnerWordpress, "nextcloud": RunnerNextcloud, } class Coordinator: def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str): out_string = "".join([e.name + "\n" for e in env_paths_list]) out_string += f"output_dir = {output_dir}\n" out_string += f"session_id = {session_id}" logger.info(f"initialize Coordinator instance with\nenv_paths_list =\n{out_string}") self.DIR = DirManager(output_dir=output_dir, session_id=session_id) self.output_dir = output_dir self.session_id = session_id self.env_paths: dict[str, Path] = dict() self.env_configs: dict[str, dict[str, str]] = dict() # todo: needed? self._parse_env_files(env_paths_list) def _parse_env_files(self, env_paths: list[Path]): for env_path in env_paths: assert env_path.is_file(), f"the env file {env_path} does not exist" config: dict[str, str] = dotenv_values(env_path) # type: ignore assert "TYPE" in config, f"the env file {env_path} does not specify the required TYPE key." env_type = config["TYPE"] self.env_paths[env_type] = env_path self.env_configs[env_type] = config # todo: needed? def setup_test(self): logger.info("calling setup_test()") self.DIR.create_all_dirs() self._copy_env_files() def _copy_env_files(self): """Copies all env filesto STATES/env_files. Files will be renamed to their own TYPE value.""" env_files_dir = self.DIR.STATES / "env_files" env_files_dir.mkdir(exist_ok=True) for type_key, env_path in self.env_paths.items(): shutil.copy(env_path, env_files_dir / type_key) def run_test(self): logger.info("calling run_test()") self.runners: list[Runner] = self._load_runners(self.env_paths.values()) for runner in self.runners: runner.run_tests() for runner in self.runners: runner.run_cleanup() logger.info("run_test() finished") def _load_runners(self, env_files: list[Path]) -> list[Runner]: runners = [] for env_file in env_files: config: dict[str, str] = dotenv_values(env_file) # type: ignore RunnerClass = RUNNER_DICT[config["TYPE"]] runners.append(RunnerClass(dotenv_path=env_file, output_dir=self.output_dir, session_id=self.session_id)) return runners def combine_html(self): in_path = str(self.DIR.RECORDS / "html") out_path = str(self.DIR.RECORDS / "full-report.html") title = "combined.html" merge_html_files(in_path, out_path, title) def collect_traces(self): """moves all traces into SESSION/RECORDS dir if tests are rerun and generate another trace, the new trace will get a unique name such as tracename-0 tracename-1 ... """ def get_new_path(root_dir: Path, base_name: str, index=0) -> Path: new_name_alt = base_name + f"-{index}" if not (root_dir / new_name_alt).is_dir(): return root_dir / new_name_alt else: index += 1 return get_new_path(root_dir, base_name, index=index) trace_root_dir = self.DIR.RECORDS / "traces" for f in trace_root_dir.rglob("*/trace.zip"): new_path = get_new_path(self.DIR.RECORDS, f.parent.name) f.parent.rename(new_path) rmtree(trace_root_dir)