import shutil from pathlib import Path from dotenv import dotenv_values from loguru import logger from src.dirmanager import DirManager from src.env_file_helper import DependencyRule, EnvFile, sort_env_files_by_rule from src.html_helper import merge_html_files from src.runner import Runner from src.tests_authentik.runner_authentik import RunnerAuthentik from src.tests_nextcloud.runner_nextcloud import RunnerNextcloud from src.tests_wordpress.runner_wordpress import RunnerWordpress from src.utils import rmtree # Register all runners here. Each .env file with TYPE=authentik will be run with RunnerAuthentik RUNNER_DICT: dict[str, type[Runner]] = { "authentik": RunnerAuthentik, "wordpress": RunnerWordpress, "nextcloud": RunnerNextcloud, } class Coordinator: def __init__(self, env_paths_list: list[Path], output_dir: Path, session_id: str): out_string = "".join([e.name + "\n" for e in env_paths_list]) out_string += f"output_dir = {output_dir}\n" out_string += f"session_id = {session_id}" logger.info(f"initialize Coordinator instance with\nenv_paths_list =\n{out_string}") self.DIR = DirManager(output_dir=output_dir, session_id=session_id) self.output_dir = output_dir self.session_id = session_id self.env_files: list[EnvFile] = self._parse_env_files(env_paths_list) def _parse_env_files(self, env_paths: list[Path]) -> list[EnvFile]: """Returns a list of EnvFile objects created from the given env files""" env_files: list[EnvFile] = [] for env_path in env_paths: assert env_path.is_file(), f"the env file {env_path} does not exist" config: dict[str, str] = dotenv_values(env_path) # type: ignore assert "TYPE" in config, f"the env file {env_path} does not specify the required TYPE key." env_type = config["TYPE"] env_files.append(EnvFile(env_path=env_path, config=config, env_type=env_type)) return env_files def setup_test(self): logger.info("calling setup_test()") self.DIR.create_all_dirs() self._copy_env_files() def _copy_env_files(self): """Copies all env filesto STATES/env_files. Files will be renamed to their own TYPE value.""" env_files_dir = self.DIR.STATES / "env_files" env_files_dir.mkdir(exist_ok=True) for env_file in self.env_files: shutil.copy(env_file.env_path, env_files_dir / env_file.env_type) def run_test(self): logger.info("calling run_test()") self.runners: list[Runner] = self._load_runners(self.env_files) for runner in self.runners: runner.run_tests() for runner in self.runners: runner.run_cleanup() logger.info("run_test() finished") def _load_runners(self, env_files: list[EnvFile]) -> list[Runner]: runners = [] for env_file in env_files: RunnerClass = RUNNER_DICT[env_file.config["TYPE"]] runners.append( RunnerClass(dotenv_path=env_file.env_path, output_dir=self.output_dir, session_id=self.session_id) ) return runners def combine_html(self): in_path = str(self.DIR.RECORDS / "html") out_path = str(self.DIR.RECORDS / "full-report.html") title = "combined.html" merge_html_files(in_path, out_path, title) def collect_traces(self): """moves all traces into SESSION/RECORDS dir if tests are rerun and generate another trace, the new trace will get a unique name such as tracename-0 tracename-1 ... """ def get_new_path(root_dir: Path, base_name: str, index=0) -> Path: new_name_alt = base_name + f"-{index}" if not (root_dir / new_name_alt).is_dir(): return root_dir / new_name_alt else: index += 1 return get_new_path(root_dir, base_name, index=index) trace_root_dir = self.DIR.RECORDS / "traces" for f in trace_root_dir.rglob("*/trace.zip"): new_path = get_new_path(self.DIR.RECORDS, f.parent.name) f.parent.rename(new_path) rmtree(trace_root_dir)