* add full integration test of cli / pytest_abra with all tests

* save path of runner_*.py in runner subclass to improve test discovery -> allows for same test name in two different runners

* reorganize output dir names

* use URL fixture everywhere

* rework coordinator interface

* add --session_id to cli args

* add log results table

* plenty of refactoring

* add assert messages

* add plenty of tests

* add /docs dir with plenty of documentation

* fix authentik setup

* add authentik cleanup, remove test user

* add random test user credential generation and integrate into test routine. random creds are saved to STATES

Reviewed-on: local-it-infrastructure/e2e_tests#16
Co-authored-by: Daniel <d.brummerloh@gmail.com>
Co-committed-by: Daniel <d.brummerloh@gmail.com>
This commit is contained in:
Daniel 2023-12-14 14:03:58 +01:00 committed by dan
parent 016b88a68d
commit 2dd765a974
36 changed files with 1145 additions and 432 deletions

View file

@ -1,6 +1,6 @@
from pytest_abra.coordinator import Coordinator
from pytest_abra.dir_manager import DirManager
from pytest_abra.env_manager import EnvFile
from pytest_abra.env_manager import EnvFile, EnvManager
from pytest_abra.runner import ConditionArgs, Runner, Test
from pytest_abra.utils import BaseUrl
@ -12,4 +12,5 @@ __all__ = [
"DirManager",
"BaseUrl",
"EnvFile",
"EnvManager",
]

View file

@ -2,21 +2,29 @@ import argparse
import os
from pathlib import Path
import pkg_resources # type: ignore
from loguru import logger
from pytest_abra import Coordinator
from pytest_abra.dir_manager import DirManager
from pytest_abra.utils import get_datetime_string
from pytest_abra.utils import get_session_id
def get_version():
return pkg_resources.get_distribution("pytest_abra").version
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--version", "-V", action="version", version=get_version(), help="output the version number")
parser.add_argument("--env_paths", type=str, help="List of loaded env files separated with ;", required=True)
parser.add_argument("--recipes_dir", type=Path, help="List of loaded env files separated with ;", required=True)
parser.add_argument("--output_dir", type=Path, help="List of loaded env files separated with ;", required=True)
parser.add_argument("--recipes_dir", type=Path, help="Dir of abra recipes and respective runners", required=True)
parser.add_argument("--output_dir", type=Path, help="Dir of test outputs", required=True)
parser.add_argument("--timeout", type=int, help="Set Playwright timeout in ms", default=20_000)
parser.add_argument("--debug", action="store_true", help="Enable Playwright debug mode")
parser.add_argument("--resume", action="store_true", help="Re-run the most recent test, skipping passed tests")
parser.add_argument("--session_id", help="Session dir name (inside output_dir). Overwrites --resume")
args = parser.parse_args()
env_paths = [Path(s) for s in args.env_paths.split(";")]
@ -27,17 +35,13 @@ def run():
# ----------------------------- define session_id ---------------------------- #
session_id = "test-" + get_datetime_string()
if args.resume:
latest_session_id = DirManager.get_latest_session_id(args.output_dir)
if latest_session_id:
session_id = DirManager.get_latest_session_id(args.output_dir)
session_id = get_session_id(args.output_dir, args.resume, args.session_id)
# ------------------------------- setup logging ------------------------------ #
# todo: move to Coordinator
DIR = DirManager(output_dir=args.output_dir, session_id=session_id)
log_file = DIR.RECORDS / "coordinator.log"
log_file = DIR.RESULTS / "coordinator.log"
logger.add(log_file)
# ---------------------------- initialize and run ---------------------------- #
@ -49,7 +53,7 @@ def run():
recipes_dir=args.recipes_dir,
timeout=args.timeout,
)
coordinator.setup_test()
coordinator.run_test()
coordinator.prepare_tests()
coordinator.run_tests()
coordinator.combine_html()
coordinator.collect_traces()

View file

@ -1,15 +1,18 @@
import importlib
import json
import re
import sys
from pathlib import Path
from loguru import logger
from tabulate import tabulate # type: ignore
from pytest_abra.dir_manager import DirManager
from pytest_abra.env_manager import EnvFile, EnvManager
from pytest_abra.html_helper import merge_html_reports
from pytest_abra.runner import Runner
from pytest_abra.utils import rmtree
from pytest_abra.shared_types import TestResult
from pytest_abra.utils import generate_random_string, load_json_to_environ, rmtree
class Coordinator:
@ -32,21 +35,24 @@ class Coordinator:
self.ENV = EnvManager(env_paths=env_paths, RUNNER_DICT=self.RUNNER_DICT)
self.TIMEOUT = timeout
def setup_test(self) -> None:
logger.info("calling setup_test()")
def prepare_tests(self) -> None:
logger.info("calling prepare_tests()")
self.DIR.create_all_dirs()
self.ENV.copy_env_files(self.DIR)
self.ENV.copy_env_files(self.ENV.env_files, self.DIR)
self.load_test_credentials(self.DIR)
def run_test(self) -> None:
logger.info("calling run_test()")
def run_tests(self) -> None:
logger.info("calling run_tests()")
self.runners: list[Runner] = self._load_runners(self.ENV.env_files)
status_list: list[TestResult] = []
for runner in self.runners:
runner.run_setups()
status_list.extend(runner.run_setups())
for runner in self.runners:
runner.run_tests()
status_list.extend(runner.run_tests())
for runner in self.runners:
runner.run_cleanups()
logger.info("run_test() finished")
status_list.extend(runner.run_cleanups())
status_table = tabulate([[t.test_name, t.status] for t in status_list], headers=["name", "status"])
logger.info(f"run_tests() finished\n{status_table}")
def _load_runners(self, env_files: list[EnvFile]) -> list[Runner]:
"""Creates an instance of the correct Runner class for each given env file"""
@ -58,13 +64,13 @@ class Coordinator:
def combine_html(self) -> None:
"""combines all generated pytest html reports into one"""
in_dir_path = str(self.DIR.RECORDS / "html")
out_file_path = str(self.DIR.RECORDS / "full-report.html")
in_dir_path = str(self.DIR.RESULTS / "html")
out_file_path = str(self.DIR.RESULTS / "full-report.html")
title = "combined.html"
merge_html_reports(in_dir_path, out_file_path, title)
def collect_traces(self):
"""moves all traces into SESSION/RECORDS dir
"""moves all traces into SESSION/RESULTS dir
if tests are rerun and generate another trace, the new trace will get a unique name such as
tracename-0
@ -80,14 +86,34 @@ class Coordinator:
index += 1
return get_new_path(root_dir, base_name, index=index)
trace_root_dir = self.DIR.RECORDS / "traces"
trace_root_dir = self.DIR.RESULTS / "traces"
for f in trace_root_dir.rglob("*/trace.zip"):
new_path = get_new_path(self.DIR.RECORDS, f.parent.name)
new_path = get_new_path(self.DIR.RESULTS, f.parent.name)
f.parent.rename(new_path)
rmtree(trace_root_dir)
@staticmethod
def create_runner_dict(recipes_dir: Path) -> dict[str, type["Runner"]]:
def load_test_credentials(DIR: DirManager):
"""Load test user credentials. If not available, create them randomly.
Test users are created during testing but should be deleted after the test. In case test
users are not deleted after tests by accident, the user credentials are not known to an
attacker."""
test_credentials_path = DIR.STATES / "credentials_test.json"
if not test_credentials_path.is_file():
test_credentials = {
"TEST_USER": "test-" + generate_random_string(6),
"TEST_PASS": generate_random_string(12, punctuation=True),
}
with open(test_credentials_path, "w") as json_file:
json.dump(test_credentials, json_file)
load_json_to_environ(test_credentials_path)
@staticmethod
def create_runner_dict(recipes_dir: Path) -> dict[str, type[Runner]]:
"""Creates a dictionary holding all the RunnerClasses that can be discovered in recipes_dir
example:
@ -101,18 +127,19 @@ class Coordinator:
because recipes_dir is added to sys.path.
"""
RUNNER_DICT: dict[str, type["Runner"]] = dict()
RUNNER_DICT: dict[str, type[Runner]] = dict()
runner_discovery_pattern = re.compile("Runner.+")
# make it possible to import modules from recipes_dir
sys.path.append(recipes_dir.as_posix())
for module_path in recipes_dir.rglob("*/runner*.py"):
for module_path in recipes_dir.rglob("*/runner_*.py"):
rel_path = module_path.relative_to(recipes_dir).as_posix().replace("/", ".").replace(".py", "")
module = importlib.import_module(rel_path)
runner_class_names = [name for name in dir(module) if runner_discovery_pattern.match(name)]
assert len(runner_class_names) == 1
runner_class_name = runner_class_names[0]
RunnerClass: type[Runner] = getattr(module, runner_class_name)
RunnerClass._tests_path = module_path.parent
RUNNER_DICT[RunnerClass.env_type] = RunnerClass
return RUNNER_DICT

View file

@ -3,7 +3,8 @@
import os
import re
from datetime import datetime, timedelta
# from datetime import datetime, timedelta
from pathlib import Path
from typing import Generator, Protocol, TypedDict
@ -11,7 +12,7 @@ import pytest
from dotenv import dotenv_values
from icecream import ic # type: ignore
from imbox import Imbox # type: ignore
from playwright.sync_api import APIRequestContext, BrowserContext, Playwright, expect
from playwright.sync_api import BrowserContext, expect
from pytest import Parser
from pytest_abra import BaseUrl, DirManager, EnvFile
@ -49,9 +50,9 @@ def DIR(request) -> DirManager:
DIR.OUTPUT
DIR.SESSION
DIR.RECORDS
DIR.STATES
DIR.RESULTS"""
DIR.RESULTS
DIR.STATUS"""
output_dir = request.config.getoption("--output_dir")
assert output_dir, "pytest argument --output_dir not set"
@ -93,13 +94,13 @@ def URL(env_config: dict[str, str]) -> BaseUrl:
@pytest.fixture(scope="session")
def imap_client() -> None:
def imap_client() -> Generator[Imbox, None, None]:
"""imap email client using credentials from environment variables"""
assert os.environ["IMAP_HOST"]
assert os.environ["IMAP_PORT"]
assert os.environ["IMAP_USER"]
assert os.environ["IMAP_PASS"]
assert os.environ["IMAP_HOST"], "required environment variable is undefined"
assert os.environ["IMAP_PORT"], "required environment variable is undefined"
assert os.environ["IMAP_USER"], "required environment variable is undefined"
assert os.environ["IMAP_PASS"], "required environment variable is undefined"
imbox = Imbox(
hostname=os.environ["IMAP_HOST"],
@ -138,9 +139,8 @@ def imap_recent_messages(imap_client: Imbox) -> list[Message]:
for uid, message in messages:
print(uid, message.subject, message.date)"""
N_MINUTES = 30
n_minutes_ago = datetime.now() - timedelta(minutes=N_MINUTES)
# N_MINUTES = 30
# n_minutes_ago = datetime.now() - timedelta(minutes=N_MINUTES)
uids: list[bytes] = []
messages: list[Message] = []
# for uid, message in imap_client.messages(date__gt=n_minutes_ago):
@ -150,14 +150,3 @@ def imap_recent_messages(imap_client: Imbox) -> list[Message]:
messages.append(message)
return messages
@pytest.fixture(scope="session")
def api_request_context(
playwright: Playwright,
DIR: DirManager,
) -> Generator[APIRequestContext, None, None]:
state_file = DIR.STATES / "authentik_admin_state.json"
request_context = playwright.request.new_context(storage_state=state_file)
yield request_context
request_context.dispose()

View file

@ -11,11 +11,11 @@ class DirManager:
The structures is as follows:
tests dir/
session_id-1/
records
results
states
status
session_id-2/
records
results
...
"""
@ -32,11 +32,11 @@ class DirManager:
dirs: list[Path] = [
self.OUTPUT_DIR,
self.SESSION,
self.RECORDS,
self.HTML,
self.STATES,
self.ENV_FILES,
self.RESULTS,
self.HTML,
self.STATUS,
]
for d in dirs:
d.mkdir(exist_ok=True)
@ -49,14 +49,6 @@ class DirManager:
def SESSION(self):
return self.OUTPUT_DIR / self.session_id
@property
def RECORDS(self):
return self.SESSION / "records"
@property
def HTML(self):
return self.RECORDS / "html"
@property
def STATES(self):
return self.SESSION / "states"
@ -69,6 +61,14 @@ class DirManager:
def RESULTS(self):
return self.SESSION / "results"
@property
def HTML(self):
return self.RESULTS / "html"
@property
def STATUS(self):
return self.SESSION / "status"
@property
def RECIPES(self):
return self.recipes_dir
@ -80,7 +80,13 @@ class DirManager:
@staticmethod
def get_latest_session_id(output_dir: Path) -> Optional[str]:
"""returns the name of the newest dir inside of output_dir"""
"""returns the name of the newest dir inside of output_dir
if output_dir does not exists or is empty, None is returned"""
if not output_dir.is_dir():
return None
all_dirs = [d for d in output_dir.iterdir() if d.is_dir()]
if all_dirs:
newest_dir: Path = max(all_dirs, key=lambda x: x.stat().st_ctime)

View file

@ -4,9 +4,10 @@ from typing import TYPE_CHECKING, NamedTuple
from dotenv import dotenv_values
from pytest_abra.utils import files_are_same
if TYPE_CHECKING:
from pytest_abra.dir_manager import DirManager
from pytest_abra.runner import Runner
from pytest_abra import DirManager, Runner
class EnvFile(NamedTuple):
@ -45,6 +46,7 @@ class EnvManager:
def _get_dependency_rules(env_files: list[EnvFile], RUNNER_DICT: dict[str, type["Runner"]]) -> list[DependencyRule]:
dependency_rules: list[DependencyRule] = []
for env_file in env_files:
assert env_file.env_type in RUNNER_DICT, f"no runner for env_type={env_file.env_type} found in RUNNER_DICT"
child_runner_class = RUNNER_DICT[env_file.env_type]
for dependency in child_runner_class.dependencies:
dependency_rule = DependencyRule(child=child_runner_class.env_type, dependency=dependency)
@ -93,11 +95,25 @@ class EnvManager:
"Could not resolve test order. This is possibly due to a circular dependency (a on b, b on c, c on a)"
)
def copy_env_files(self, DIR: "DirManager") -> None:
"""Copies all env files to STATES/env_files. Files will be renamed to
<index>-<env_type>-<original_name>
00-authentik-login.test.dev.local-it.cloud.env"""
@staticmethod
def copy_env_files(env_files: list[EnvFile], DIR: "DirManager") -> None:
"""Copies all env files to STATES/env_files.
for index, env_file in enumerate(self.env_files):
Files will be renamed to <index>-<env_type>-<original_name>. Example:
00-authentik-login.test.dev.local-it.cloud.env
Does nothing when called twice with same env_files. Throws an AssertionError if either
contents or filenames of env_files have changed (probably test rerun with different input)"""
dir_was_not_empty = len(list(DIR.ENV_FILES.iterdir())) > 0
for index, env_file in enumerate(env_files):
file_name = "-".join([str(index).zfill(2), env_file.env_type, env_file.env_path.name])
if dir_was_not_empty:
# check that the copied env files have not changed
present_files = [f.name for f in DIR.ENV_FILES.iterdir()]
assert (
file_name in present_files and files_are_same(env_file.env_path, DIR.ENV_FILES / file_name)
), "It appears that you are resuming a test while the input env files have changed. Start a new test instead"
shutil.copy(env_file.env_path, DIR.ENV_FILES / file_name)

View file

@ -6,9 +6,10 @@ from typing import TYPE_CHECKING, Callable, NamedTuple
import pytest
from loguru import logger
from pytest_abra.shared_types import STATUS, TestResult
if TYPE_CHECKING:
from pytest_abra.coordinator import Coordinator
from pytest_abra.env_manager import EnvFile
from pytest_abra import Coordinator, DirManager, EnvFile
class ConditionArgs(NamedTuple):
@ -30,6 +31,7 @@ class Runner:
tests: list[Test] = []
cleanups: list[Test] = []
dependencies: list[str] = []
_tests_path: Path = Path()
def __init__(self, coordinator: "Coordinator", runner_index: int):
self.coordinator = coordinator
@ -41,62 +43,58 @@ class Runner:
logger.info(f"creating instance of {self.__class__.__name__}")
def run_setups(self):
def run_setups(self) -> list[TestResult]:
"""runs the setup scripts if available"""
self._execute_test_list(self.setups)
return self._execute_tests_list(self.setups)
def run_tests(self):
def run_tests(self) -> list[TestResult]:
"""runs the test scripts if available"""
self._execute_test_list(self.tests)
return self._execute_tests_list(self.tests)
def run_cleanups(self):
def run_cleanups(self) -> list[TestResult]:
"""runs the cleanup scripts if available"""
self._execute_test_list(self.cleanups)
return self._execute_tests_list(self.cleanups)
def _execute_test_list(self, test_list: list[Test]):
"""runs the main test script and if available and sub test scripts if their running condition is met"""
def _execute_tests_list(self, test_list: list[Test]) -> list[TestResult]:
"""Runs all tests given in the list. If condition is defined, it is also checked."""
# check if required dependencies have passed
if not self._dependencies_passed():
logger.warning(f"skipping run_tests() of {self.env_type} (one or more dependencies have not passed)")
return
return [TestResult("skipped_dep", test.test_file) for test in test_list]
for test in test_list:
self._run_test_with_checks(test)
def _run_test_with_checks(self, test: Test):
# dependency passed: true / false
# already_passed: true / false
# prevent_skip: true / false
# condition_available: true / pass
# condition_met: true / false
return [self._run_test_with_checks(test) for test in test_list]
def _run_test_with_checks(self, test: Test) -> TestResult:
identifier_string = self.combine_names(self.env_type, test.test_file)
results = list(self.DIR.RECIPES.rglob(test.test_file))
assert len(results) == 1, f"{test.test_file} should exist exactly 1 time, but found {len(results)} times"
full_test_path = results[0]
test_files = list(self._tests_path.rglob(test.test_file))
assert len(test_files) == 1, f"{test.test_file} should exist exactly once, but found {len(test_files)} times"
full_test_path = test_files[0]
# check if test aleady passed
if self._is_test_passed(identifier_string, remove_existing=True):
if self._is_test_passed(self.DIR, identifier_string):
if test.prevent_skip:
logger.info(f"continuing {identifier_string} (passed before but prevent_skip=True)")
else:
logger.info(f"skipping {identifier_string} (test has passed)")
return
return TestResult("skipped_pas", test.test_file)
if test.condition:
condition_result = self._run_condition(test.condition)
condition_result = self._call_condition_function(test.condition)
if not condition_result:
# test condition is defined but not met
logger.info(f"skipping {identifier_string} (test condition is not met)")
return
self._create_status_file(self.DIR, status="skipped_con", identifier_string=identifier_string)
return TestResult("skipped_con", test.test_file)
# test condition is undefined or not met
logger.info(f"running {identifier_string}")
result = self._call_pytest(full_test_path)
self._create_result_file(result=result, identifier_string=identifier_string)
exit_code = self._call_pytest(full_test_path)
status = self.exit_code_to_str(exit_code)
self._create_status_file(self.DIR, status=status, identifier_string=identifier_string)
return TestResult(status, test.test_file)
def _run_condition(self, condition_function: Callable[[ConditionArgs], bool]):
def _call_condition_function(self, condition_function: Callable[[ConditionArgs], bool]):
"""run the test condition function with multiple arguments"""
# more arguments can be added later without changing the function signature
conditon_args = ConditionArgs(
@ -106,24 +104,40 @@ class Runner:
)
return condition_function(conditon_args)
def _is_test_passed(self, identifier_string: str, remove_existing: bool = False) -> bool:
"""returns True if the selected test matching identifier_string already passed
@classmethod
def _create_status_file(
cls,
DIR: "DirManager",
status: STATUS,
identifier_string: str,
):
"""create result file to indicated passed/failed/skipped test"""
This is determined by the presence of a specific output file in the RESULTS folder that
matches identifier_string
# remove matching files
for status_file in cls._get_status_files(DIR, identifier_string):
status_file.unlink()
remove_existing: If True, result files matching identifier_string with a status
other than 'passed' will be deleted"""
full_name = cls.combine_names(status, identifier_string)
file_path = DIR.STATUS / full_name
with open(file_path, "w") as _:
pass # create empty file
already_passed = False
for result in self.DIR.RESULTS.glob("*"):
if identifier_string in result.name:
# process any result file (passed / failed / skipped) if it exists
if "passed" in result.name:
already_passed = True
elif remove_existing:
result.unlink()
return already_passed
@staticmethod
def _get_status_files(DIR: "DirManager", identifier_string: str) -> list[Path]:
return [f for f in DIR.STATUS.glob("*") if identifier_string in f.name]
@classmethod
def _is_test_passed(cls, DIR: "DirManager", identifier_string: str) -> bool:
"""returns True if the selected test matching identifier_string already passed"""
matching_files = cls._get_status_files(DIR, identifier_string)
if len(matching_files) == 1:
status_file = matching_files[0]
if "passed" in status_file.name:
return True
elif len(matching_files) > 1:
logger.warning("more than one matching status file found")
return False
def _call_pytest(self, full_test_path: Path) -> int:
"""runs pytest programmatically with a specific file
@ -155,7 +169,7 @@ class Runner:
# --output only works with the given context and page fixture
# folder needs to be unique! traces will not appear, if every pytest run has same output dir
command_arguments.append("--output")
command_arguments.append(str(self.DIR.RECORDS / "traces" / full_test_path.stem))
command_arguments.append(str(self.DIR.RESULTS / "traces" / full_test_path.stem))
# tracing
command_arguments.append("--tracing") # "on", "off", "retain-on-failure"
@ -170,28 +184,16 @@ class Runner:
# command_arguments.append("--headed")
# html report. Will be combined into one file later.
command_arguments.append(f"--html={self.DIR.RECORDS / 'html' / full_test_path.with_suffix('.html').name}")
command_arguments.append(f"--html={self.DIR.RESULTS / 'html' / full_test_path.with_suffix('.html').name}")
return pytest.main(command_arguments)
def _create_result_file(
self,
result: int,
identifier_string: str,
):
"""create result file to indicated passed/failed or skipped test"""
full_name = self.combine_names(self.result_int_to_str(result), identifier_string)
file_path = self.DIR.RESULTS / full_name
with open(file_path, "w") as _:
pass # create empty file
def _dependencies_passed(self):
"""returns true if all setups of each dependency have passed"""
# todo: what about conditional setups?
passed_tests = [r.name for r in self.DIR.RESULTS.glob("*") if "passed" in r.name]
passed_tests = [r.name for r in self.DIR.STATUS.glob("*") if "passed" in r.name]
results = []
for dependency in self.dependencies:
dependency_runner = self.coordinator.RUNNER_DICT[dependency]
@ -201,11 +203,9 @@ class Runner:
return all(results)
@staticmethod
def result_int_to_str(result_int: int) -> str:
def exit_code_to_str(result_int: int) -> STATUS:
"""converts the pytest exit code (int) into a meaningful string"""
match result_int:
case -1:
return "skipped"
case 0:
return "passed"
case _:

View file

@ -0,0 +1,16 @@
from typing import Literal, NamedTuple
"""
passed: test passed
failed: test failed
skipped_con: test skipped because condition was not met
skipped_dep: test skipped because dependencies did not finish
skipped_pas: test skipped because it passed before
"""
STATUS = Literal["passed", "failed", "skipped_con", "skipped_dep", "skipped_pas"]
class TestResult(NamedTuple):
status: STATUS
test_name: str

View file

@ -1,8 +1,17 @@
import json
import os
import random
import string
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import urlunparse
from loguru import logger
from pytest_abra.dir_manager import DirManager
@dataclass
class BaseUrl:
@ -24,7 +33,7 @@ def get_datetime_string() -> str:
return current_datetime.strftime("%Y-%m-%d-%H-%M-%S")
def rmtree(root_dir: Path):
def rmtree(root_dir: Path) -> None:
"""removes a folder with content recursively"""
if not root_dir.is_dir():
return
@ -35,3 +44,43 @@ def rmtree(root_dir: Path):
child.unlink()
root_dir.rmdir()
def generate_random_string(length: int, punctuation=False) -> str:
"""returns a random string of the given length"""
characters = string.ascii_letters + string.digits
if punctuation:
characters += string.punctuation
random_string = "".join(random.choice(characters) for _ in range(length))
return random_string
def load_json_to_environ(cred_file: Path) -> None:
"""Load the contents of a json file directly into os.environ. Variable names are inherited"""
if not cred_file.is_file():
logger.warning(f"{cred_file} could not be found, no credentials loaded")
return
with open(cred_file, "r") as f:
CREDENTIALS = json.load(f)
for key, value in CREDENTIALS.items():
os.environ[key] = value
def get_session_id(args_output_dir: Path, args_resume: bool, args_session_id: Optional[str]) -> str:
"""converts the cli arguments to the correct session_id"""
session_id = args_session_id
if not session_id:
session_id = "test-" + get_datetime_string()
if args_resume:
latest_session_id = DirManager.get_latest_session_id(args_output_dir)
if latest_session_id:
session_id = latest_session_id
return session_id
def files_are_same(file1: Path, file2: Path) -> bool:
with open(file1, "r") as f1, open(file2, "r") as f2:
return f1.read() == f2.read()