From 746eacf6b7fd8ac98c25ff40603585971dc092e4 Mon Sep 17 00:00:00 2001 From: Marc Olivier Bergeron Date: Mon, 25 May 2026 16:32:25 -0400 Subject: [PATCH] Moving stuff around and using importlib to access resources. --- .github/workflows/tests.yml | 2 +- CHANGELOG.md | 4 +- ctf/__main__.py | 58 ++-- ctf/{ => commands}/askgod/__init__.py | 0 ctf/{ => commands}/askgod/stats.py | 2 +- ctf/{ => commands}/check.py | 9 +- ctf/{ => commands}/deploy.py | 56 ++-- ctf/{ => commands}/destroy.py | 25 +- ctf/{ => commands}/flags.py | 24 +- ctf/{ => commands}/generate.py | 49 ++-- ctf/{ => commands}/init.py | 54 ++-- ctf/{ => commands}/list.py | 19 +- ctf/commands/new.py | 351 ++++++++++++++++++++++++ ctf/{ => commands}/post/__init__.py | 0 ctf/{ => commands}/post/new.py | 10 +- ctf/{ => commands}/redeploy.py | 4 +- ctf/{ => commands}/services.py | 20 +- ctf/{ => commands}/stats.py | 42 ++- ctf/{ => commands}/validate.py | 51 ++-- ctf/{ => commands}/version.py | 2 +- ctf/{ => common}/logger.py | 0 ctf/{ => common}/models.py | 2 +- ctf/{ => common}/utils.py | 191 ++++++------- ctf/{ => common}/validators.py | 165 +++++------ ctf/new.py | 381 -------------------------- ctf/templates/new/common/main.tf.j2 | 2 +- ctf/validate_json_schemas.py | 19 +- 27 files changed, 707 insertions(+), 835 deletions(-) rename ctf/{ => commands}/askgod/__init__.py (100%) rename ctf/{ => commands}/askgod/stats.py (99%) rename ctf/{ => commands}/check.py (85%) rename ctf/{ => commands}/deploy.py (92%) rename ctf/{ => commands}/destroy.py (92%) rename ctf/{ => commands}/flags.py (74%) rename ctf/{ => commands}/generate.py (77%) rename ctf/{ => commands}/init.py (53%) rename ctf/{ => commands}/list.py (75%) create mode 100644 ctf/commands/new.py rename ctf/{ => commands}/post/__init__.py (100%) rename ctf/{ => commands}/post/new.py (97%) rename ctf/{ => commands}/redeploy.py (97%) rename ctf/{ => commands}/services.py (87%) rename ctf/{ => commands}/stats.py (94%) rename ctf/{ => commands}/validate.py (70%) rename ctf/{ => commands}/version.py (74%) rename ctf/{ => common}/logger.py (100%) rename ctf/{ => common}/models.py (98%) rename ctf/{ => common}/utils.py (73%) rename ctf/{ => common}/validators.py (84%) delete mode 100644 ctf/new.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 460004d..bfa8aa3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -163,7 +163,7 @@ jobs: - name: Test deployment looping through tracks working-directory: test-ctf run: | - IFS=" " read -r -a tracks <<< "$(python3 -c 'from ctf.utils import get_all_available_tracks,track_has_virtual_machine,validate_track_can_be_deployed;print(str([t.name for t in get_all_available_tracks() if validate_track_can_be_deployed(t) and not track_has_virtual_machine(t)]).strip("[]\x27").replace("\x27, \x27"," "))')" + IFS=" " read -r -a tracks <<< "$(python3 -c 'from ctf.common.utils import get_all_available_tracks,track_has_virtual_machine,validate_track_can_be_deployed;print(str([t.name for t in get_all_available_tracks() if validate_track_can_be_deployed(t) and not track_has_virtual_machine(t)]).strip("[]\x27").replace("\x27, \x27"," "))')" [ "${#tracks[@]}" -eq 0 ] && exit 1 diff --git a/CHANGELOG.md b/CHANGELOG.md index ad67fb6..d9578aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes -- **`ctf deploy` no longer passes `ansible_incus_remote` as an Ansible extra variable.** +- **`ctf deploy` no longer passes `ansible_incus_remote` as an Ansible extra variable.** `ansible-playbook` with `-e ansible_incus_remote=` overrides inventory on every host, which broke mixed containers / VMs deployments. **Migration:** Do not depend on `ansible_incus_remote` being injected by deploy for playbook-wide VM/cluster targeting. - + Use something like: ``` vars: diff --git a/ctf/__main__.py b/ctf/__main__.py index 97fe8c8..797d812 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -2,10 +2,9 @@ import json import logging import os -import pathlib -import sys import time import urllib.request +from pathlib import Path import rich import typer @@ -14,23 +13,23 @@ from typing_extensions import Annotated from ctf import ENV, STATE -from ctf.askgod import app as askgod_app -from ctf.check import app as check_app -from ctf.deploy import app as deploy_app -from ctf.destroy import app as destroy_app -from ctf.flags import app as flags_app -from ctf.generate import app as generate_app -from ctf.init import app as init_app -from ctf.list import app as list_app -from ctf.logger import LOG -from ctf.new import app as new_app -from ctf.post import app as post_app -from ctf.redeploy import app as redeploy_app -from ctf.services import app as services_app -from ctf.stats import app as stats_app -from ctf.utils import find_ctf_root_directory, get_version, show_version -from ctf.validate import app as validate_app -from ctf.version import app as version_app +from ctf.commands.askgod import app as askgod_app +from ctf.commands.check import app as check_app +from ctf.commands.deploy import app as deploy_app +from ctf.commands.destroy import app as destroy_app +from ctf.commands.flags import app as flags_app +from ctf.commands.generate import app as generate_app +from ctf.commands.init import app as init_app +from ctf.commands.list import app as list_app +from ctf.commands.new import app as new_app +from ctf.commands.post import app as post_app +from ctf.commands.redeploy import app as redeploy_app +from ctf.commands.services import app as services_app +from ctf.commands.stats import app as stats_app +from ctf.commands.validate import app as validate_app +from ctf.commands.version import app as version_app +from ctf.common.logger import LOG +from ctf.common.utils import get_version, show_version app = typer.Typer( help="CLI tool to manage CTF challenges as code. Run from the root CTF repo directory or set the CTF_ROOT_DIR environment variable to run the tool.", @@ -65,13 +64,12 @@ def check_tool_version() -> None: - # Check at most once per day - stamp = ( - pathlib.Path( + stamp: Path = ( + Path( os.environ.get( "XDG_CACHE_HOME", - str(pathlib.Path(os.environ.get("HOME", "~")).expanduser() / ".cache"), + Path(os.environ.get("HOME", "~")).expanduser() / ".cache", ) ) / "ctf-script" @@ -188,17 +186,3 @@ def main(): console = rich.get_console() console.width = 150 if console.width < 150 else console.width app() - - -if __name__ == "__main__": - import sys - - if "version" not in sys.argv and "init" not in sys.argv: - if not os.path.isdir( - s=(p := os.path.join(find_ctf_root_directory(), "challenges")) - ): - LOG.error( - msg=f"Directory `{p}` not found. Make sure this script is ran from the root directory OR set the CTF_ROOT_DIR environment variable to the root directory." - ) - exit(code=1) - main() diff --git a/ctf/askgod/__init__.py b/ctf/commands/askgod/__init__.py similarity index 100% rename from ctf/askgod/__init__.py rename to ctf/commands/askgod/__init__.py diff --git a/ctf/askgod/stats.py b/ctf/commands/askgod/stats.py similarity index 99% rename from ctf/askgod/stats.py rename to ctf/commands/askgod/stats.py index f3ad844..004fd01 100644 --- a/ctf/askgod/stats.py +++ b/ctf/commands/askgod/stats.py @@ -6,7 +6,7 @@ import typer from typing_extensions import Annotated -from ctf.logger import LOG +from ctf.common.logger import LOG app = typer.Typer() diff --git a/ctf/check.py b/ctf/commands/check.py similarity index 85% rename from ctf/check.py rename to ctf/commands/check.py index 2a696e5..dc91e30 100644 --- a/ctf/check.py +++ b/ctf/commands/check.py @@ -1,13 +1,12 @@ -import os import subprocess import typer from typing_extensions import Annotated from ctf import ENV -from ctf.generate import generate -from ctf.logger import LOG -from ctf.utils import check_git_lfs, find_ctf_root_directory, terraform_binary +from ctf.commands.generate import generate +from ctf.common.logger import LOG +from ctf.common.utils import check_git_lfs, find_ctf_root_directory, terraform_binary app = typer.Typer() @@ -46,7 +45,7 @@ def check( # Then run terraform plan. subprocess.run( args=[terraform_binary(), "plan"], - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", check=True, ) diff --git a/ctf/deploy.py b/ctf/commands/deploy.py similarity index 92% rename from ctf/deploy.py rename to ctf/commands/deploy.py index 92cc031..d252a8b 100644 --- a/ctf/deploy.py +++ b/ctf/commands/deploy.py @@ -4,17 +4,18 @@ import subprocess import textwrap import time +from pathlib import Path import typer from rich.prompt import IntPrompt from typing_extensions import Annotated from ctf import ENV, STATE -from ctf.destroy import destroy -from ctf.generate import generate -from ctf.logger import LOG -from ctf.models import Track, TrackYaml -from ctf.utils import ( +from ctf.commands.destroy import destroy +from ctf.commands.generate import generate +from ctf.common.logger import LOG +from ctf.common.models import Track, TrackYaml +from ctf.common.utils import ( add_tracks_to_terraform_modules, check_git_lfs, find_ctf_root_directory, @@ -122,7 +123,7 @@ def deploy( LOG.critical( msg="Git LFS is missing from your system. Install it before deploying." ) - exit(code=1) + exit(1) # Pull LFS files LOG.debug("Pulling Git LFS files for specific tracks.") @@ -161,9 +162,7 @@ def deploy( remote=remote, production=production, track=track.name, - path=os.path.join( - find_ctf_root_directory(), "challenges", track.name, "ansible" - ), + path=find_ctf_root_directory() / "challenges" / track.name / "ansible", playbook="build.yaml", vm_remote=vm_remote, vm_project=vm_project, @@ -187,18 +186,14 @@ def deploy( ): distinct_tracks = regenerated_tracks - if not os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track.name, "ansible" - ) - ) - ): + if not ( + path := find_ctf_root_directory() / "challenges" / track.name / "ansible" + ).exists(): continue if track.has_virtual_machine: incus_list = json.loads( - s=subprocess.run( + subprocess.run( args=["incus", "list", f"--project={track}", "--format", "json"], check=True, capture_output=True, @@ -283,10 +278,10 @@ def deploy( )[0]["address"] ipv6_to_container_name[ipv6_address] = machine["name"] - LOG.debug(msg=f"Mapping: {ipv6_to_container_name}") + LOG.debug(f"Mapping: {ipv6_to_container_name}") if remote == "local": - LOG.debug(msg=f"Parsing track.yaml for track {track}") + LOG.debug(f"Parsing track.yaml for track {track}") track_yaml: TrackYaml = TrackYaml.model_validate( parse_track_yaml(track_name=track.name) ) @@ -342,13 +337,13 @@ def deploy( check=True, ) - LOG.info(msg=f"Running `incus --project={track} list`") + LOG.info(f"Running `incus --project={track} list`") subprocess.run( args=["incus", f"--project={track}", "list"], check=True, env=ENV ) if distinct_tracks: - LOG.info(msg="Applying post-deploy Terraform resources...") + LOG.info("Applying post-deploy Terraform resources...") try: terraform_apply( tracks=tracks, @@ -362,7 +357,7 @@ def deploy( LOG.critical( "Could not apply post-deploy Terraform resources. Fix the Terraform configuration and rerun `ctf deploy`." ) - exit(code=1) + exit(1) if not production and distinct_tracks: tracks_list = list(distinct_tracks) @@ -408,7 +403,7 @@ def terraform_apply( try: subprocess.run( args=args, - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", check=True, ) except subprocess.CalledProcessError: @@ -441,7 +436,7 @@ def terraform_apply( subprocess.run( args=args, - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", check=True, ) @@ -469,7 +464,7 @@ def terraform_apply( remote=remote, force=True, ) - exit(code=0) + exit(0) return set() @@ -478,7 +473,7 @@ def run_ansible_playbook( remote: str, production: bool, track: str, - path: str, + path: Path, playbook: str = "deploy.yaml", vm_remote: str | None = None, vm_project: str | None = None, @@ -503,7 +498,7 @@ def run_ansible_playbook( extra_args += ["-e", "nsec_production=true"] if not skip_pre_common and execute_common: - LOG.info(msg=f"Running pre-common.yaml with ansible for track {track}...") + LOG.info(f"Running pre-common.yaml with ansible for track {track}...") ansible_args = [ "ansible-playbook", os.path.join("..", "..", "..", ".deploy", "ansible", "common.yaml"), @@ -516,7 +511,7 @@ def run_ansible_playbook( check=True, ) - LOG.info(msg=f"Running {playbook} with ansible for track {track}...") + LOG.info(f"Running {playbook} with ansible for track {track}...") ansible_args = [ "ansible-playbook", playbook, @@ -526,7 +521,7 @@ def run_ansible_playbook( subprocess.run(args=ansible_args, cwd=path, check=True) if not skip_post_common and execute_common: - LOG.info(msg=f"Running post-common.yaml with ansible for track {track}...") + LOG.info(f"Running post-common.yaml with ansible for track {track}...") ansible_args = ( [ "ansible-playbook", @@ -543,6 +538,5 @@ def run_ansible_playbook( check=True, ) - artifacts_path = os.path.join(path, "artifacts") - if os.path.exists(path=artifacts_path): + if (artifacts_path := path / "artifacts").exists(): shutil.rmtree(artifacts_path) diff --git a/ctf/destroy.py b/ctf/commands/destroy.py similarity index 92% rename from ctf/destroy.py rename to ctf/commands/destroy.py index ca9ab50..9437c05 100644 --- a/ctf/destroy.py +++ b/ctf/commands/destroy.py @@ -1,5 +1,4 @@ import json -import os import subprocess import typer @@ -7,9 +6,9 @@ from typing_extensions import Annotated from ctf import ENV -from ctf.logger import LOG -from ctf.models import Track -from ctf.utils import ( +from ctf.common.logger import LOG +from ctf.common.models import Track +from ctf.common.utils import ( find_ctf_root_directory, get_terraform_tracks_from_modules, remove_tracks_from_terraform_modules, @@ -65,11 +64,9 @@ def destroy( ) -> None: ENV["INCUS_REMOTE"] = remote - if not os.path.exists( - path=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf") - ): - LOG.critical(msg="Nothing to destroy.") - exit(code=1) + if not (find_ctf_root_directory() / ".deploy" / "modules.tf").exists(): + LOG.critical("Nothing to destroy.") + exit(1) terraform_tracks: set[Track] = get_terraform_tracks_from_modules() @@ -114,7 +111,7 @@ def destroy( LOG.critical( msg="No project to switch to. This should never happen as the default should always exists." ) - exit(code=1) + exit(1) cmd = [ "incus", @@ -123,7 +120,7 @@ def destroy( "default" if "default" in project_list else project_list[0].name, ] - LOG.info(msg=f"Running `{' '.join(cmd)}`") + LOG.info(f"Running `{' '.join(cmd)}`") subprocess.run(args=cmd, check=True, env=ENV) subprocess.run( @@ -139,7 +136,7 @@ def destroy( ] ), ], - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", check=False, ) @@ -183,7 +180,7 @@ def destroy( for module in terraform_tracks: if module in projects: - LOG.warning(msg=f"The project {module.name} was not destroyed properly.") + LOG.warning(f"The project {module.name} was not destroyed properly.") if ( force or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" @@ -234,7 +231,7 @@ def destroy( production=production, ) if total_deployed_tracks == len(terraform_tracks): - LOG.info(msg="Successfully destroyed every track") + LOG.info("Successfully destroyed every track") else: LOG.info( f"Successfully destroyed: {', '.join([track.name for track in terraform_tracks])}" diff --git a/ctf/flags.py b/ctf/commands/flags.py similarity index 74% rename from ctf/flags.py rename to ctf/commands/flags.py index 0bb3783..9bc5260 100644 --- a/ctf/flags.py +++ b/ctf/commands/flags.py @@ -9,9 +9,9 @@ import yaml from typing_extensions import Annotated -from ctf.logger import LOG -from ctf.models import Track -from ctf.utils import find_ctf_root_directory, parse_track_yaml +from ctf.common.logger import LOG +from ctf.common.models import Track +from ctf.common.utils import find_ctf_root_directory, parse_track_yaml app = typer.Typer() @@ -40,15 +40,11 @@ def flags( distinct_tracks: set[Track] = set() for entry in os.listdir( - path=( - challenges_directory := os.path.join( - find_ctf_root_directory(), "challenges" - ) - ) + challenges_directory := (find_ctf_root_directory() / "challenges") ): - if os.path.isdir( - s=(track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): + if (track_directory := challenges_directory / entry).is_dir() and ( + track_directory / "track.yaml" + ).exists(): if not tracks: distinct_tracks.add(Track(name=entry)) elif entry in tracks: @@ -56,11 +52,11 @@ def flags( flags = [] for track in distinct_tracks: - LOG.debug(msg=f"Parsing track.yaml for track {track.name}") + LOG.debug(f"Parsing track.yaml for track {track.name}") track_yaml = parse_track_yaml(track_name=track.name) if len(track_yaml["flags"]) == 0: - LOG.debug(msg=f"No flag in track {track.name}. Skipping...") + LOG.debug(f"No flag in track {track.name}. Skipping...") continue track_flags = track_yaml["flags"] @@ -71,7 +67,7 @@ def flags( flags.extend(track_flags) if not flags: - LOG.warning(msg="No flag found...") + LOG.warning("No flag found...") return if format == OutputFormat.JSON: diff --git a/ctf/generate.py b/ctf/commands/generate.py similarity index 77% rename from ctf/generate.py rename to ctf/commands/generate.py index 738bab8..851155a 100644 --- a/ctf/generate.py +++ b/ctf/commands/generate.py @@ -5,9 +5,9 @@ from typing_extensions import Annotated from ctf import ENV -from ctf.logger import LOG -from ctf.models import Track -from ctf.utils import ( +from ctf.common.logger import LOG +from ctf.common.models import Track +from ctf.common.utils import ( add_tracks_to_terraform_modules, create_terraform_modules_file, does_track_require_build_container, @@ -89,7 +89,7 @@ def generate( ) if distinct_tracks: - LOG.debug(msg=f"Found {len(distinct_tracks)} tracks") + LOG.debug(f"Found {len(distinct_tracks)} tracks") # Generate the Terraform modules file. if not keep_already_deployed: create_terraform_modules_file(remote=remote, production=production) @@ -117,59 +117,60 @@ def generate( for track in distinct_tracks: relpath = os.path.relpath( - os.path.join(find_ctf_root_directory(), ".deploy", "common"), + find_ctf_root_directory() / ".deploy" / "common", ( - terraform_directory := os.path.join( - find_ctf_root_directory(), "challenges", track.name, "terraform" + terraform_directory := ( + find_ctf_root_directory() + / "challenges" + / track.name + / "terraform" ) ), ) # If the file exists and is a symlink, refresh it by deleting it first. - if os.path.exists( - path=(p := os.path.join(terraform_directory, "variables.tf")) - ) and os.path.islink(path=p): - os.unlink(path=p) + if ( + p := (terraform_directory / "variables.tf") + ).exists() and p.is_symlink(): + p.unlink() - LOG.debug(msg=f"Refreshing symlink {p}.") + LOG.debug(f"Refreshing symlink {p}.") - if not os.path.exists(path=p): + if not p.exists(): os.symlink( src=os.path.join(relpath, "variables.tf"), dst=p, ) - LOG.debug(msg=f"Created symlink {p}.") + LOG.debug(f"Created symlink {p}.") # If the file exists and is a symlink, refresh it by deleting it first. - if os.path.exists( - path=(p := os.path.join(terraform_directory, "versions.tf")) - ) and os.path.islink(path=p): - os.unlink(path=p) + if (p := (terraform_directory / "versions.tf")).exists() and p.is_symlink(): + p.unlink() - LOG.debug(msg=f"Refreshing symlink {p}.") + LOG.debug(f"Refreshing symlink {p}.") - if not os.path.exists(path=p): + if not p.exists(): os.symlink( src=os.path.join(relpath, "versions.tf"), dst=p, ) - LOG.debug(msg=f"Created symlink {p}.") + LOG.debug(f"Created symlink {p}.") subprocess.run( args=[terraform_binary(), "init", "-upgrade"], - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", stdout=subprocess.DEVNULL, check=True, ) subprocess.run( args=[terraform_binary(), "validate"], - cwd=os.path.join(find_ctf_root_directory(), ".deploy"), + cwd=find_ctf_root_directory() / ".deploy", check=True, ) else: LOG.critical("No track was found") - exit(code=1) + exit(1) return distinct_tracks diff --git a/ctf/init.py b/ctf/commands/init.py similarity index 53% rename from ctf/init.py rename to ctf/commands/init.py index 0164b84..69793dd 100644 --- a/ctf/init.py +++ b/ctf/commands/init.py @@ -1,12 +1,12 @@ -import os +import importlib.resources import shutil +from pathlib import Path import typer from typing_extensions import Annotated from ctf import ENV -from ctf.logger import LOG -from ctf.utils import get_ctf_script_templates_directory +from ctf.common.logger import LOG app = typer.Typer() @@ -16,8 +16,8 @@ ) def init( path: Annotated[ - str, typer.Argument(help="Directory in which to initialize a CTF") - ] = "", + Path | None, typer.Argument(help="Directory in which to initialize a CTF") + ] = None, force: Annotated[ bool, typer.Option( @@ -27,40 +27,36 @@ def init( ) -> None: # If path is not set, take the one from --location or CTF_ROOT_DIR, else it's the current directory. if not path: - path = ( - str(ENV.get("CTF_ROOT_DIR")) - if "CTF_ROOT_DIR" in ENV - else os.path.join(os.getcwd(), ".") - ) + path = Path(ENV.get("CTF_ROOT_DIR", ".")) + + path = path.expanduser().resolve() created_directory = False - created_assets: list[str] = [] + created_assets: list[Path] = [] try: - if not os.path.isdir(path): - os.mkdir(path) + if not path.is_dir(): LOG.info(f'Creating directory "{path}"') + path.mkdir() created_directory = True elif ( - os.path.isdir(os.path.join(path, "challenges")) - or os.path.isdir(os.path.join(path, ".deploy")) + (path / "challenges").is_dir() or (path / ".deploy").is_dir() ) and not force: LOG.error( f'Directory "{path}" is already initialized. Use --force to overwrite.' ) - exit(code=1) + exit(1) - for asset in os.listdir( - p := os.path.join(get_ctf_script_templates_directory(), "init") - ): - dst_asset = os.path.join(path, asset) - if os.path.isdir(src_asset := os.path.join(p, asset)): - shutil.copytree(src_asset, dst_asset, dirs_exist_ok=True) - LOG.info(f'Created "{dst_asset}" folder') - else: - shutil.copy(src_asset, dst_asset) - LOG.info(f'Created "{dst_asset}" file') + with importlib.resources.path("ctf.templates", "init") as templates_location: + for asset in templates_location.iterdir(): + dst_asset: Path = path / asset.name + if asset.is_dir(): + shutil.copytree(asset, dst_asset, dirs_exist_ok=True) + LOG.info(f'Created "{dst_asset}" folder') + else: + shutil.copy(asset, dst_asset) + LOG.info(f'Created "{dst_asset}" file') - created_assets.append(dst_asset) + created_assets.append(dst_asset) except Exception: import traceback @@ -70,11 +66,11 @@ def init( LOG.info(f'Removed created "{path}" folder') else: for asset in created_assets: - if os.path.isdir(asset): + if asset.is_dir(): shutil.rmtree(asset) LOG.info(f'Removed created "{asset}" folder') else: - os.unlink(asset) + asset.unlink() LOG.info(f'Removed created "{asset}" file') LOG.critical(traceback.format_exc()) diff --git a/ctf/list.py b/ctf/commands/list.py similarity index 75% rename from ctf/list.py rename to ctf/commands/list.py index 76b2069..87accf6 100644 --- a/ctf/list.py +++ b/ctf/commands/list.py @@ -1,4 +1,3 @@ -import os from enum import StrEnum import rich @@ -6,8 +5,8 @@ from rich.table import Table from typing_extensions import Annotated -from ctf.models import Track -from ctf.utils import find_ctf_root_directory, parse_post_yamls, parse_track_yaml +from ctf.common.models import Track +from ctf.common.utils import find_ctf_root_directory, parse_post_yamls, parse_track_yaml app = typer.Typer() @@ -23,15 +22,11 @@ def list_tracks( ] = ListOutputFormat.PRETTY, ) -> None: tracks: set[Track] = set() - for track in os.listdir(path=os.path.join(find_ctf_root_directory(), "challenges")): - if os.path.isdir( - s=os.path.join(find_ctf_root_directory(), "challenges", track) - ) and os.path.exists( - path=os.path.join( - find_ctf_root_directory(), "challenges", track, "track.yaml" - ) - ): - tracks.add(Track(name=track)) + for track in (find_ctf_root_directory() / "challenges").iterdir(): + if (find_ctf_root_directory() / "challenges" / track).is_dir() and ( + find_ctf_root_directory() / "challenges" / track / "track.yaml" + ).exists(): + tracks.add(Track(name=track.name)) parsed_tracks = [] for track in tracks: diff --git a/ctf/commands/new.py b/ctf/commands/new.py new file mode 100644 index 0000000..3f5fd21 --- /dev/null +++ b/ctf/commands/new.py @@ -0,0 +1,351 @@ +import importlib.resources +import os +import re +import secrets +import shutil +from enum import StrEnum +from pathlib import Path + +import jinja2 +import typer +from typing_extensions import Annotated + +from ctf.common.logger import LOG +from ctf.common.utils import find_ctf_root_directory + +app = typer.Typer() + + +class Template(StrEnum): + INFRA_SKELETON = "infra-skeleton" + TRACK_YAML_ONLY = "track-yaml-only" + FILES_ONLY = "files-only" + APACHE_PHP = "apache-php" + PYTHON_SERVICE = "python-service" + RUST_WEBSERVICE = "rust-webservice" + WINDOWS_VM = "windows-vm" + + +@app.command(help="Create a new CTF track with a given name") +def new( + name: Annotated[ + str, + typer.Option( + help="Track name. No space, use dashes if needed.", + prompt="Track name. No space, use dashes if needed.", + ), + ], + template: Annotated[ + Template, + typer.Option( + "--template", + "-t", + help="Template to use for the track.", + prompt="Template to use for the track.", + ), + ] = Template.INFRA_SKELETON, + force: Annotated[ + bool, + typer.Option( + "--force", + help="If directory already exists, delete it and create it again.", + ), + ] = False, + with_build_container: Annotated[ + bool, + typer.Option( + "--with-build", + help="If a build container is required.", + ), + ] = False, + with_virtual_machine: Annotated[ + bool, + typer.Option( + "--vm", + "--with-virtual-machine", + help="If a virtual machine is required.", + ), + ] = False, +) -> None: + LOG.info(f"Creating a new track: {name}") + if not re.match(pattern=r"^[a-z][a-z0-9\-]{0,61}[a-z0-9]$", string=name): + LOG.critical( + msg="""The track name Valid instance names must fulfill the following requirements: +* The name must be between 1 and 63 characters long; +* The name must contain only letters, numbers and dashes from the ASCII table; +* The name must not start with a digit or a dash; +* The name must not end with a dash.""" + ) + exit(1) + + if template == Template.RUST_WEBSERVICE: + with_build_container = True + + if ( + new_challenge_directory := find_ctf_root_directory() / "challenges" / name + ).exists(): + if force: + LOG.debug(f"Deleting {new_challenge_directory}") + shutil.rmtree(new_challenge_directory) + else: + LOG.critical( + "Track already exists with that name. Use `--force` to overwrite the track." + ) + exit(1) + + new_challenge_directory.mkdir() + + LOG.debug(f"Directory {new_challenge_directory} created.") + + with importlib.resources.path("ctf.templates", "new") as templates_location: + env = jinja2.Environment( + loader=jinja2.FileSystemLoader( + searchpath=templates_location, encoding="utf-8" + ) + ) + + ipv6_subnet = f"9000:d37e:c40b:{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}" + + rb = [ + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + ] + hardware_address = f"00:16:3e:{rb[0]}{rb[1]}:{rb[2]}{rb[3]}:{rb[4]}{rb[5]}" + ipv6_address = f"216:3eff:fe{rb[0]}{rb[1]}:{rb[2]}{rb[3]}{rb[4]}{rb[5]}" + full_ipv6_address = f"{ipv6_subnet}:{ipv6_address}" + + track_template = env.get_template(os.path.join("common", "track.yaml.j2")) + render = track_template.render( + data={ + "name": name, + "full_ipv6_address": full_ipv6_address, + "hardware_address": hardware_address, + "is_windows": template == Template.WINDOWS_VM, + "template": template.value, + "with_build": with_build_container, + "with_virtual_machine": with_virtual_machine, + } + ) + with open( + (p := new_challenge_directory / "track.yaml"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + readme_template = env.get_template(name=os.path.join("common", "README.md.j2")) + render = readme_template.render(data={"name": name}) + with open( + (p := new_challenge_directory / "README.md"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + posts_directory: Path = new_challenge_directory / "posts" + posts_directory.mkdir() + + LOG.debug(f"Directory {posts_directory} created.") + + track_template = env.get_template(name=os.path.join("common", "topic.yaml.j2")) + render = track_template.render(data={"name": name}) + with open( + (p := posts_directory / f"{name}.yaml"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + track_template = env.get_template(name=os.path.join("common", "post.yaml.j2")) + render = track_template.render(data={"name": name}) + with open( + (p := os.path.join(posts_directory, f"{name}_flag1.yaml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + if template == Template.TRACK_YAML_ONLY: + return + + files_directory: Path = new_challenge_directory / "files" + files_directory.mkdir() + + LOG.debug(f"Directory {files_directory} created.") + + if template == Template.FILES_ONLY: + return + + terraform_directory: Path = new_challenge_directory / "terraform" + terraform_directory.mkdir() + + LOG.debug(f"Directory {terraform_directory} created.") + + track_template = env.get_template(name=os.path.join("common", "main.tf.j2")) + + render = track_template.render( + data={ + "name": name, + "ipv6_subnet": ipv6_subnet, + "full_ipv6_address": full_ipv6_address, + "with_build": with_build_container, + "with_virtual_machine": with_virtual_machine, + "is_windows": template == Template.WINDOWS_VM, + } + ) + with open( + (p := terraform_directory / "main.tf"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + relpath = os.path.relpath( + find_ctf_root_directory() / ".deploy" / "common", terraform_directory + ) + + os.symlink( + src=os.path.join(relpath, "variables.tf"), + dst=(p := terraform_directory / "variables.tf"), + ) + + LOG.debug(f"Wrote {p}.") + + os.symlink( + src=os.path.join(relpath, "versions.tf"), + dst=(p := terraform_directory / "versions.tf"), + ) + + LOG.debug(f"Wrote {p}.") + + ansible_directory: Path = new_challenge_directory / "ansible" + ansible_directory.mkdir() + + LOG.debug(f"Directory {ansible_directory} created.") + + track_template = env.get_template(name=os.path.join(template, "deploy.yaml.j2")) + render = track_template.render( + data={ + "name": name, + "with_build": with_build_container, + "with_virtual_machine": with_virtual_machine, + } + ) + with open( + (p := ansible_directory / "deploy.yaml"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + if with_build_container: + try: + track_template = env.get_template( + os.path.join(template, "build.yaml.j2") + ) + except jinja2.TemplateNotFound: + track_template = env.get_template( + os.path.join("common", "build.yaml.j2") + ) + + render = track_template.render( + data={"name": name, "with_build": with_build_container} + ) + + with open( + (p := ansible_directory / "build.yaml"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + LOG.debug(f"Wrote {p}.") + + track_template = env.get_template(name=os.path.join("common", "inventory.j2")) + render = track_template.render( + data={ + "name": name, + "with_build": with_build_container, + "with_virtual_machine": with_virtual_machine, + "is_windows": template == Template.WINDOWS_VM, + } + ) + with open( + (p := ansible_directory / "inventory"), mode="w", encoding="utf-8" + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + ansible_challenge_directory: Path = ansible_directory / "challenge" + ansible_challenge_directory.mkdir() + + LOG.debug(f"Directory {ansible_challenge_directory} created.") + + if template == Template.APACHE_PHP: + track_template = env.get_template( + os.path.join(Template.APACHE_PHP, "index.php.j2") + ) + render = track_template.render(data={"name": name}) + with open( + (p := ansible_challenge_directory / "index.php"), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + if template == Template.PYTHON_SERVICE: + track_template = env.get_template( + os.path.join(Template.PYTHON_SERVICE, "app.py.j2") + ) + render = track_template.render(data={"name": name}) + with open( + (p := ansible_challenge_directory / "app.py"), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") + + with open( + (p := ansible_challenge_directory / "flag-1.txt"), + mode="w", + encoding="utf-8", + ) as f: + f.write(f"{{{{ track_flags.{name}_flag_1 }}}} (1/2)\n") + + LOG.debug(f"Wrote {p}.") + + if template == Template.RUST_WEBSERVICE: + # Copy the entire challenge template + shutil.copytree( + templates_location / Template.RUST_WEBSERVICE / "source", + ansible_challenge_directory, + dirs_exist_ok=True, + ) + LOG.debug(f"Wrote files to {ansible_challenge_directory}") + + manifest_template = env.get_template( + os.path.join(Template.RUST_WEBSERVICE, "Cargo.toml.j2") + ) + render = manifest_template.render(data={"name": name}) + with open( + (p := ansible_challenge_directory / "Cargo.toml"), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(f"Wrote {p}.") diff --git a/ctf/post/__init__.py b/ctf/commands/post/__init__.py similarity index 100% rename from ctf/post/__init__.py rename to ctf/commands/post/__init__.py diff --git a/ctf/post/new.py b/ctf/commands/post/new.py similarity index 97% rename from ctf/post/new.py rename to ctf/commands/post/new.py index a8378d1..dd37690 100644 --- a/ctf/post/new.py +++ b/ctf/commands/post/new.py @@ -6,9 +6,9 @@ import typer from typing_extensions import Annotated -from ctf.logger import LOG -from ctf.models import Track -from ctf.utils import ( +from ctf.common.logger import LOG +from ctf.common.models import Track +from ctf.common.utils import ( get_all_available_tracks, parse_track_yaml, ) @@ -72,7 +72,7 @@ def _resolve_post_file_path( name: str | None, tag: str | None, force: bool, -) -> str: +) -> Path: filename = ( f"{track}-{name}.yaml" if name @@ -86,7 +86,7 @@ def _resolve_post_file_path( if not force: filename = _add_counter_to_filename(posts_directory, filename) - return os.path.join(posts_directory, filename) + return posts_directory / filename def _render_post_yaml( diff --git a/ctf/redeploy.py b/ctf/commands/redeploy.py similarity index 97% rename from ctf/redeploy.py rename to ctf/commands/redeploy.py index 3784641..3ad671e 100644 --- a/ctf/redeploy.py +++ b/ctf/commands/redeploy.py @@ -2,8 +2,8 @@ from typing_extensions import Annotated from ctf import ENV -from ctf.deploy import deploy -from ctf.destroy import destroy +from ctf.commands.deploy import deploy +from ctf.commands.destroy import destroy app = typer.Typer() diff --git a/ctf/services.py b/ctf/commands/services.py similarity index 87% rename from ctf/services.py rename to ctf/commands/services.py index f0d99cc..03de2ad 100644 --- a/ctf/services.py +++ b/ctf/commands/services.py @@ -6,8 +6,8 @@ import typer from typing_extensions import Annotated -from ctf.logger import LOG -from ctf.utils import find_ctf_root_directory, parse_track_yaml +from ctf.common.logger import LOG +from ctf.common.utils import find_ctf_root_directory, parse_track_yaml app = typer.Typer() @@ -28,15 +28,11 @@ def services( ) -> None: distinct_tracks: set[str] = set() for entry in os.listdir( - path=( - challenges_directory := os.path.join( - find_ctf_root_directory(), "challenges" - ) - ) + challenges_directory := (find_ctf_root_directory() / "challenges") ): - if os.path.isdir( - s=(track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): + if (track_directory := (challenges_directory / entry)).is_dir() and ( + track_directory / "track.yaml" + ).exists(): if not tracks: distinct_tracks.add(entry) elif entry in tracks: @@ -45,7 +41,7 @@ def services( all_services = [] for track in distinct_tracks: - LOG.debug(msg=f"Parsing track.yaml for track {track}") + LOG.debug(f"Parsing track.yaml for track {track}") track_yaml = parse_track_yaml(track_name=track) services = track_yaml.get("services", []) @@ -56,7 +52,7 @@ def services( ] if len(services) == 0: - LOG.debug(msg=f"No service in track {track}. Skipping...") + LOG.debug(f"No service in track {track}. Skipping...") continue for service in services: diff --git a/ctf/stats.py b/ctf/commands/stats.py similarity index 94% rename from ctf/stats.py rename to ctf/commands/stats.py index 0c3e117..e1fd143 100644 --- a/ctf/stats.py +++ b/ctf/commands/stats.py @@ -10,8 +10,8 @@ import typer from typing_extensions import Annotated -from ctf.logger import LOG -from ctf.utils import find_ctf_root_directory, parse_track_yaml +from ctf.common.logger import LOG +from ctf.common.utils import find_ctf_root_directory, parse_track_yaml try: import pybadges @@ -64,19 +64,19 @@ def stats( ), ] = False, ) -> None: - LOG.debug(msg="Generating statistics...") + LOG.debug("Generating statistics...") stats = {} distinct_tracks: set[str] = set() - for entry in os.listdir( - (challenges_directory := os.path.join(find_ctf_root_directory(), "challenges")) - ): - if os.path.isdir( - (track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.isfile(os.path.join(track_directory, "track.yaml")): + for entry in ( + challenges_directory := (find_ctf_root_directory() / "challenges") + ).iterdir(): + if (track_directory := (challenges_directory / entry)).is_dir() and ( + track_directory / "track.yaml" + ).is_file(): if not tracks: - distinct_tracks.add(entry) + distinct_tracks.add(entry.name) elif entry in tracks: - distinct_tracks.add(entry) + distinct_tracks.add(entry.name) stats["number_of_tracks"] = len(distinct_tracks) stats["number_of_tracks_integrated_with_scenario"] = 0 @@ -139,10 +139,8 @@ def stats( if not qa - track_designers: stats["qa_not_done"].append(track) - if os.path.exists( - path=(files_directory := os.path.join(challenges_directory, track, "files")) - ): - for file in os.listdir(path=files_directory): + if (files_directory := (challenges_directory / track / "files")).exists(): + for _ in files_directory.iterdir(): stats["number_of_files"] += 1 stats["median_flag_value"] = statistics.median(flags) stats["mean_flag_value"] = round(statistics.mean(flags), 2) @@ -169,9 +167,9 @@ def stats( rich.print(json.dumps(stats, indent=2, ensure_ascii=False)) if generate_badges: if not _has_pybadges: - LOG.critical(msg="Module pybadges was not found.") - exit(code=1) - LOG.info(msg="Generating badges...") + LOG.critical("Module pybadges was not found.") + exit(1) + LOG.info("Generating badges...") os.makedirs(name=".badges", exist_ok=True) write_badge( "flag", @@ -230,9 +228,9 @@ def stats( if charts: if not _has_matplotlib: - LOG.critical(msg="Module matplotlib was not found.") - exit(code=1) - LOG.info(msg="Generating charts...") + LOG.critical("Module matplotlib was not found.") + exit(1) + LOG.info("Generating charts...") mpl_logger = logging.getLogger("matplotlib") mpl_logger.setLevel(logging.INFO) os.makedirs(name=".charts", exist_ok=True) @@ -420,7 +418,7 @@ def stats( plt.savefig(os.path.join(".charts", "points_over_time.png")) plt.clf() - LOG.debug(msg="Done...") + LOG.debug("Done...") def write_badge(name: str, svg: str) -> None: diff --git a/ctf/validate.py b/ctf/commands/validate.py similarity index 70% rename from ctf/validate.py rename to ctf/commands/validate.py index 3aca085..4be262b 100644 --- a/ctf/validate.py +++ b/ctf/commands/validate.py @@ -1,4 +1,3 @@ -import os import re import subprocess import textwrap @@ -13,10 +12,10 @@ TimeRemainingColumn, ) -from ctf.logger import LOG -from ctf.utils import find_ctf_root_directory, get_ctf_script_schemas_directory +from ctf.common.logger import LOG +from ctf.common.utils import find_ctf_root_directory, get_ctf_script_schemas_directory +from ctf.common.validators import ValidationError, validators_list from ctf.validate_json_schemas import validate_with_json_schemas -from ctf.validators import ValidationError, validators_list app = typer.Typer() @@ -25,43 +24,39 @@ help="Run many static validations to ensure coherence and quality in the tracks and repo as a whole." ) def validate() -> None: - LOG.info(msg="Starting ctf validate...") + LOG.info("Starting ctf validate...") - LOG.info(msg=f"Found {len(validators_list)} Validators") + LOG.info(f"Found {len(validators_list)} Validators") validators = [validator_class() for validator_class in validators_list] tracks = [] - for track in os.listdir(path=os.path.join(find_ctf_root_directory(), "challenges")): - if os.path.isdir( - s=os.path.join(find_ctf_root_directory(), "challenges", track) - ) and os.path.exists( - path=os.path.join( - find_ctf_root_directory(), "challenges", track, "track.yaml" - ) - ): - tracks.append(track) + for track in (find_ctf_root_directory() / "challenges").iterdir(): + if (find_ctf_root_directory() / "challenges" / track).is_dir() and ( + find_ctf_root_directory() / "challenges" / track / "track.yaml" + ).exists(): + tracks.append(track.name) - LOG.info(msg=f"Found {len(tracks)} tracks") + LOG.info(f"Found {len(tracks)} tracks") errors: list[ValidationError] = [] - LOG.debug(msg="Validating track.yaml files against JSON Schema...") + LOG.debug("Validating track.yaml files against JSON Schema...") validate_with_json_schemas( - schema=os.path.join(get_ctf_script_schemas_directory(), "track.yaml.json"), - files_pattern=os.path.join( - find_ctf_root_directory(), "challenges", "*", "track.yaml" + schema=get_ctf_script_schemas_directory() / "track.yaml.json", + files_pattern=str( + find_ctf_root_directory() / "challenges" / "*" / "track.yaml" ), ) - LOG.debug(msg="Validating discourse post YAML files against JSON Schema...") + LOG.debug("Validating discourse post YAML files against JSON Schema...") validate_with_json_schemas( - schema=os.path.join(get_ctf_script_schemas_directory(), "post.json"), - files_pattern=os.path.join( - find_ctf_root_directory(), "challenges", "*", "posts", "*.yaml" + schema=get_ctf_script_schemas_directory() / "post.json", + files_pattern=str( + find_ctf_root_directory() / "challenges" / "*" / "posts" / "*.yaml" ), ) - LOG.info(msg="Validating terraform files format...") + LOG.info("Validating terraform files format...") r = subprocess.run( args=[ "tofu", @@ -104,7 +99,7 @@ def validate() -> None: ) for validator in validators: - LOG.debug(msg=f"Running {type(validator).__name__}") + LOG.debug(f"Running {type(validator).__name__}") for track in tracks: errors += validator.validate(track_name=track) progress.update(task, advance=1) @@ -115,7 +110,7 @@ def validate() -> None: progress.update(task, advance=1) if not errors: - LOG.info(msg="No error found!") + LOG.info("No error found!") else: LOG.error(msg=f"{len(errors)} errors found.") @@ -148,4 +143,4 @@ def validate() -> None: rich.print(table) - exit(code=1) + exit(1) diff --git a/ctf/version.py b/ctf/commands/version.py similarity index 74% rename from ctf/version.py rename to ctf/commands/version.py index 9b7b38b..c15ffff 100644 --- a/ctf/version.py +++ b/ctf/commands/version.py @@ -1,6 +1,6 @@ import typer -from ctf.utils import show_version +from ctf.common.utils import show_version app = typer.Typer() diff --git a/ctf/logger.py b/ctf/common/logger.py similarity index 100% rename from ctf/logger.py rename to ctf/common/logger.py diff --git a/ctf/models.py b/ctf/common/models.py similarity index 98% rename from ctf/models.py rename to ctf/common/models.py index 6bd4754..b2c1647 100644 --- a/ctf/models.py +++ b/ctf/common/models.py @@ -28,7 +28,7 @@ class Track(BaseModel): @property def location(self): - from ctf.utils import find_ctf_root_directory + from ctf.common.utils import find_ctf_root_directory return find_ctf_root_directory() / "challenges" / self.name diff --git a/ctf/utils.py b/ctf/common/utils.py similarity index 73% rename from ctf/utils.py rename to ctf/common/utils.py index 3c88ad1..f482210 100644 --- a/ctf/utils.py +++ b/ctf/common/utils.py @@ -12,8 +12,8 @@ import yaml from ctf import ENV -from ctf.logger import LOG -from ctf.models import Track, TrackYaml +from ctf.common.logger import LOG +from ctf.common.models import Track, TrackYaml __CTF_ROOT_DIRECTORY = "" @@ -37,47 +37,40 @@ def check_git_lfs() -> bool: def get_all_available_tracks() -> set[Track]: tracks = set() - for entry in os.listdir( - path=( - challenges_directory := os.path.join( - find_ctf_root_directory(), "challenges" - ) - ) - ): - if not os.path.isdir(s=os.path.join(challenges_directory, entry)): + for entry in ( + challenges_directory := (find_ctf_root_directory() / "challenges") + ).iterdir(): + if not (challenges_directory / entry).is_dir(): continue - tracks.add(Track(name=entry)) + tracks.add(Track(name=entry.name)) return tracks def does_track_require_build_container(track: Track) -> bool: - return os.path.isfile( - build_yaml_file_path := os.path.join( - find_ctf_root_directory(), - "challenges", - track.name, - "ansible", - "build.yaml", + return ( + build_yaml_file_path := ( + find_ctf_root_directory() + / "challenges" + / track.name + / "ansible" + / "build.yaml" ) - ) and bool(load_yaml_file(build_yaml_file_path)) + ).is_file() and bool(load_yaml_file(build_yaml_file_path)) def track_has_virtual_machine(track: str | Track) -> bool: track_yaml: TrackYaml = TrackYaml.model_validate( parse_track_yaml(track_name=track.name if isinstance(track, Track) else track) ) - with open( - os.path.join( - find_ctf_root_directory(), - "challenges", - track.name if isinstance(track, Track) else track, - "terraform", - "main.tf", - ), - "r", - ) as f: + with ( + find_ctf_root_directory() + / "challenges" + / (track.name if isinstance(track, Track) else track) + / "terraform" + / "main.tf" + ).open(mode="r") as f: return ( track_yaml.instances and any( @@ -95,40 +88,32 @@ def track_has_virtual_machine(track: str | Track) -> bool: def validate_track_can_be_deployed(track: Track) -> bool: return ( - os.path.exists( - path=os.path.join( - find_ctf_root_directory(), - "challenges", - track.name, - "terraform", - "main.tf", - ) - ) - and os.path.exists( - path=os.path.join( - find_ctf_root_directory(), - "challenges", - track.name, - "ansible", - "deploy.yaml", - ) - ) - and os.path.exists( - path=os.path.join( - find_ctf_root_directory(), - "challenges", - track.name, - "ansible", - "inventory", - ) - ) + ( + find_ctf_root_directory() + / "challenges" + / track.name + / "terraform" + / "main.tf" + ).exists() + and ( + find_ctf_root_directory() + / "challenges" + / track.name + / "ansible" + / "deploy.yaml" + ).exists() + and ( + find_ctf_root_directory() + / "challenges" + / track.name + / "ansible" + / "inventory" + ).exists() ) def add_tracks_to_terraform_modules(tracks: set[Track]): - with open( - file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="a" - ) as fd: + with (find_ctf_root_directory() / ".deploy" / "modules.tf").open(mode="a") as fd: template = jinja2.Environment().from_string( source=textwrap.dedent( text="""\ @@ -161,9 +146,7 @@ def create_terraform_modules_file( remote: str, production: bool = False, ) -> None: - with open( - file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="w+" - ) as fd: + with (find_ctf_root_directory() / ".deploy" / "modules.tf").open(mode="w+") as fd: template = jinja2.Environment().from_string( source=textwrap.dedent( text="""\ @@ -195,14 +178,12 @@ def get_common_modules_output_variables() -> set[str]: variables: set[str] = set() - for file in os.listdir( - path := os.path.join(find_ctf_root_directory(), ".deploy", "common") - ): - if file == "versions.tf": + for file in (find_ctf_root_directory() / ".deploy" / "common").iterdir(): + if file.name == "versions.tf": continue - with open(os.path.join(path, file), "r") as f: - match file: + with file.open(mode="r") as f: + match file.name: case "variables.tf": for i in variable_regex.findall(f.read()): variables.add(i) @@ -229,7 +210,9 @@ def get_common_modules_output_variables() -> set[str]: var_type = input("What is the type? [string] ") or "string" - with open(os.path.join(path, "variables.tf"), "a") as f: + with ( + find_ctf_root_directory() / ".deploy" / "common" / "variables.tf" + ).open(mode="a") as f: f.write("\n") template = jinja2.Environment().from_string( source=textwrap.dedent( @@ -263,9 +246,7 @@ def get_common_modules_output_variables() -> set[str]: def get_terraform_tracks_from_modules() -> set[Track]: - with open( - file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="r" - ) as f: + with (find_ctf_root_directory() / ".deploy" / "modules.tf").open(mode="r") as f: modules_tf = f.read() module_line_regex = re.compile( @@ -349,40 +330,30 @@ def remove_tracks_from_terraform_modules( add_tracks_to_terraform_modules(tracks=(current_tracks - tracks)) -def get_all_file_paths_recursively(path: str) -> Generator[str, None, None]: - if os.path.isfile(path=path): +def get_all_file_paths_recursively(path: Path) -> Generator[Path, None, None]: + if path.is_file(): yield remove_ctf_script_root_directory_from_path(path=path) else: - for file in os.listdir(path=path): - for f in get_all_file_paths_recursively(path=os.path.join(path, file)): + for file in path.iterdir(): + for f in get_all_file_paths_recursively(file): yield f -def get_ctf_script_root_directory() -> str: - return os.path.dirname(p=__file__) +def get_ctf_script_schemas_directory() -> Path: + return find_ctf_root_directory() / "schemas" -def get_ctf_script_templates_directory() -> str: - return os.path.join(get_ctf_script_root_directory(), "templates") +def remove_ctf_script_root_directory_from_path(path: Path) -> Path: + return Path(os.path.relpath(path=path, start=find_ctf_root_directory())) -def get_ctf_script_schemas_directory() -> str: - return os.path.join(find_ctf_root_directory(), "schemas") - - -def remove_ctf_script_root_directory_from_path(path: str) -> str: - return os.path.relpath(path=path, start=find_ctf_root_directory()) - - -def load_yaml_file(file: str) -> dict[str, Any]: +def load_yaml_file(file: Path) -> dict[str, Any]: return yaml.safe_load(stream=open(file, mode="r", encoding="utf-8")) def parse_track_yaml(track_name: str) -> dict[str, Any]: r = load_yaml_file( - p := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "track.yaml" - ) + p := (find_ctf_root_directory() / "challenges" / track_name / "track.yaml") ) r["file_location"] = remove_ctf_script_root_directory_from_path(path=p) @@ -391,15 +362,11 @@ def parse_track_yaml(track_name: str) -> dict[str, Any]: def parse_post_yamls(track_name: str) -> list[dict]: posts = [] - for post in os.listdir( - path=( - posts_dir := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "posts" - ) - ) - ): - if post.endswith(".yml") or post.endswith(".yaml"): - r = load_yaml_file(os.path.join(posts_dir, post)) + for post in ( + posts_dir := (find_ctf_root_directory() / "challenges" / track_name / "posts") + ).iterdir(): + if post.name.endswith(".yml") or post.name.endswith(".yaml"): + r = load_yaml_file(posts_dir / post) r["file_location"] = remove_ctf_script_root_directory_from_path( path=posts_dir ) @@ -413,31 +380,27 @@ def find_ctf_root_directory() -> Path: if __CTF_ROOT_DIRECTORY: return Path(__CTF_ROOT_DIRECTORY) - path: str = ( - str(ENV.get("CTF_ROOT_DIR")) - if "CTF_ROOT_DIR" in ENV - else os.path.join(os.getcwd(), ".") - ) + path: Path = (Path(ENV.get("CTF_ROOT_DIR", "."))).expanduser().resolve() if not is_ctf_dir(path=path): - while path != (path := os.path.dirname(p=path)): + while path != (path := path / ".."): ctf_dir = is_ctf_dir(path) if ctf_dir: break - if path == "/": + if path == Path("/"): LOG.critical( msg='Could not automatically find the root directory nor the "CTF_ROOT_DIR" environment variable. To initialize a new root directory, use `ctf init [path]`' ) exit(1) - LOG.debug(msg=f"Found root directory: {path}") - return Path(__CTF_ROOT_DIRECTORY := path) + LOG.debug(f"Found root directory: {path}") + return (__CTF_ROOT_DIRECTORY := path) -def is_ctf_dir(path): +def is_ctf_dir(path: Path): ctf_dir = True - dir = os.listdir(path=path) + dir = [p.name for p in path.iterdir()] if ".deploy" not in dir: ctf_dir = False if "challenges" not in dir: @@ -456,7 +419,7 @@ def show_version(value: bool) -> None: raise typer.Exit() -def terraform_binary() -> str: +def terraform_binary() -> Path: path = shutil.which(cmd="tofu") if not path: path = shutil.which(cmd="terraform") @@ -464,4 +427,4 @@ def terraform_binary() -> str: if not path: raise Exception("Couldn't find Terraform or OpenTofu") - return path + return Path(path) diff --git a/ctf/validators.py b/ctf/common/validators.py similarity index 84% rename from ctf/validators.py rename to ctf/common/validators.py index f6662f0..e97ad6f 100644 --- a/ctf/validators.py +++ b/ctf/common/validators.py @@ -2,9 +2,10 @@ import glob import os import re +from pathlib import Path -from ctf.models import TrackYaml, ValidationError -from ctf.utils import ( +from ctf.common.models import TrackYaml, ValidationError +from ctf.common.utils import ( find_ctf_root_directory, get_all_file_paths_recursively, parse_post_yamls, @@ -29,13 +30,9 @@ def __init__(self): self.files_mapping = {} def validate(self, track_name: str) -> list[ValidationError]: - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "files" - ) - ) - ): + if ( + path := (find_ctf_root_directory() / "challenges" / track_name / "files") + ).exists(): for file in get_all_file_paths_recursively(path=path): # Lower the file name to avoid human error file = os.path.relpath(path=file, start=path).lower() @@ -123,11 +120,16 @@ def validate(self, track_name: str) -> list[ValidationError]: def finalize(self) -> list[ValidationError]: errors: list[ValidationError] = [] - sound_path = os.path.join( - find_ctf_root_directory(), "challenges", "*", "files", "askgod", "sounds" + sound_path = ( + find_ctf_root_directory() + / "challenges" + / "*" + / "files" + / "askgod" + / "sounds" ) for sound_tag, track_names in self.sound_tags_mapping.items(): - if len(glob.glob(pathname=os.path.join(sound_path, sound_tag))) == 0: + if len(glob.glob(pathname=sound_path / sound_tag)) == 0: errors.append( ValidationError( error_name="Fireworks sound file not found", @@ -139,11 +141,11 @@ def finalize(self) -> list[ValidationError]: ) ) - gif_path = os.path.join( - find_ctf_root_directory(), "challenges", "*", "files", "askgod", "gifs" + gif_path: Path = ( + find_ctf_root_directory() / "challenges" / "*" / "files" / "askgod" / "gifs" ) for gif_tag, track_names in self.gif_tags_mapping.items(): - if len(glob.glob(pathname=os.path.join(gif_path, gif_tag))) == 0: + if len(glob.glob(pathname=gif_path / gif_tag)) == 0: errors.append( ValidationError( error_name="Fireworks gif file not found", @@ -184,15 +186,13 @@ def validate(self, track_name: str) -> list[ValidationError]: for discourse_post in discourse_posts: if discourse_post.get("type", "") == "post": self.discourse_posts.append((track_name, discourse_post)) - if not os.path.exists( - os.path.join( - find_ctf_root_directory(), - "challenges", - track_name, - "posts", - discourse_post["topic"] + ".yaml", - ) - ): + if not ( + find_ctf_root_directory() + / "challenges" + / track_name + / "posts" + / str(discourse_post["topic"] + ".yaml") + ).exists(): errors.append( ValidationError( error_name="Discourse post topic not found", @@ -200,11 +200,11 @@ def validate(self, track_name: str) -> list[ValidationError]: track_name=track_name, details={ "Topic": discourse_post["topic"], - "Posts directory": os.path.join( - find_ctf_root_directory(), - "challenges", - track_name, - "posts", + "Posts directory": str( + find_ctf_root_directory() + / "challenges" + / track_name + / "posts" ), }, ) @@ -259,60 +259,51 @@ def validate(self, track_name: str) -> list[ValidationError]: files = [] # Checking placeholders in terraform/main.tf - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), - "challenges", - track_name, - "terraform", - "main.tf", - ) + if ( + path := ( + find_ctf_root_directory() + / "challenges" + / track_name + / "terraform" + / "main.tf" ) - ): + ).exists(): files += [path] # Checking placeholders in track.yml - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "track.yaml" - ) + if ( + path := ( + find_ctf_root_directory() / "challenges" / track_name / "track.yaml" ) - ): + ).exists(): files += [path] # Checking placeholders in ansible/inventory - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), - "challenges", - track_name, - "ansible", - "inventory", - ) + if ( + path := ( + find_ctf_root_directory() + / "challenges" + / track_name + / "ansible" + / "inventory" ) - ): + ).exists(): files += [path] # Checking placeholders in posts/*.yaml - if integrated_with_scenario and os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "posts" + if ( + integrated_with_scenario + and ( + path := ( + find_ctf_root_directory() / "challenges" / track_name / "posts" ) - ) + ).exists() ): - files += list(glob.glob(pathname=os.path.join(path, "*.yaml"))) + files += list(glob.glob(pathname=str(path / "*.yaml"))) # Checking placeholders in ansible/*.yaml - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "ansible" - ) - ) - ): - files += list(glob.glob(pathname=os.path.join(path, "*.yaml"))) + if ( + path := (find_ctf_root_directory() / "challenges" / track_name / "ansible") + ).exists(): + files += list(glob.glob(pathname=str(path / "*.yaml"))) for file in files: with open(file=file, mode="r") as f: @@ -326,8 +317,10 @@ def validate(self, track_name: str) -> list[ValidationError]: error_name="Placeholder value found", error_description="A placeholder value was found in a challenge file. This indicates that a value was not changed.", details={ - "File location": remove_ctf_script_root_directory_from_path( - path=file + "File location": str( + remove_ctf_script_root_directory_from_path( + path=file + ) ), "Value found": "\n".join(s), }, @@ -347,14 +340,10 @@ def validate(self, track_name: str) -> list[ValidationError]: files = [] # Checking placeholders in posts/*.yaml - if os.path.exists( - path=( - path := os.path.join( - find_ctf_root_directory(), "challenges", track_name, "posts" - ) - ) - ): - files += list(glob.glob(pathname=os.path.join(path, "*.yaml"))) + if ( + path := (find_ctf_root_directory() / "challenges" / track_name / "posts") + ).exists(): + files += list(glob.glob(pathname=str(path / "*.yaml"))) for file in files: file_name = os.path.basename(file) @@ -443,14 +432,9 @@ def validate(self, track_name: str) -> list[ValidationError]: services.append(service.name) if services: - if not os.path.exists( - path=os.path.join( - find_ctf_root_directory(), - "challenges", - track_name, - "terraform", - ) - ): + if not ( + find_ctf_root_directory() / "challenges" / track_name / "terraform" + ).exists(): errors.append( ValidationError( error_name="Orphan service", @@ -538,8 +522,11 @@ def validate(self, track_name: str) -> list[ValidationError]: error_description="At least one discourse post should be defined for the track.", track_name=track_name, details={ - "Posts directory": os.path.join( - find_ctf_root_directory(), "challenges", track_name, "posts" + "Posts directory": str( + find_ctf_root_directory() + / "challenges" + / track_name + / "posts" ) }, ) diff --git a/ctf/new.py b/ctf/new.py deleted file mode 100644 index 52d9155..0000000 --- a/ctf/new.py +++ /dev/null @@ -1,381 +0,0 @@ -import os -import re -import secrets -import shutil -from enum import StrEnum - -import jinja2 -import typer -from typing_extensions import Annotated - -from ctf.logger import LOG -from ctf.utils import find_ctf_root_directory, get_ctf_script_templates_directory - -app = typer.Typer() - - -class Template(StrEnum): - INFRA_SKELETON = "infra-skeleton" - TRACK_YAML_ONLY = "track-yaml-only" - FILES_ONLY = "files-only" - APACHE_PHP = "apache-php" - PYTHON_SERVICE = "python-service" - RUST_WEBSERVICE = "rust-webservice" - WINDOWS_VM = "windows-vm" - - -@app.command(help="Create a new CTF track with a given name") -def new( - name: Annotated[ - str, - typer.Option( - help="Track name. No space, use dashes if needed.", - prompt="Track name. No space, use dashes if needed.", - ), - ], - template: Annotated[ - Template, - typer.Option( - "--template", - "-t", - help="Template to use for the track.", - prompt="Template to use for the track.", - ), - ] = Template.INFRA_SKELETON, - force: Annotated[ - bool, - typer.Option( - "--force", - help="If directory already exists, delete it and create it again.", - ), - ] = False, - with_build_container: Annotated[ - bool, - typer.Option( - "--with-build", - help="If a build container is required.", - ), - ] = False, - with_virtual_machine: Annotated[ - bool, - typer.Option( - "--vm", - "--with-virtual-machine", - help="If a virtual machine is required.", - ), - ] = False, -) -> None: - LOG.info(msg=f"Creating a new track: {name}") - if not re.match(pattern=r"^[a-z][a-z0-9\-]{0,61}[a-z0-9]$", string=name): - LOG.critical( - msg="""The track name Valid instance names must fulfill the following requirements: -* The name must be between 1 and 63 characters long; -* The name must contain only letters, numbers and dashes from the ASCII table; -* The name must not start with a digit or a dash; -* The name must not end with a dash.""" - ) - exit(code=1) - - if template == Template.RUST_WEBSERVICE: - with_build_container = True - - if os.path.exists( - path=( - new_challenge_directory := os.path.join( - find_ctf_root_directory(), "challenges", name - ) - ) - ): - if force: - LOG.debug(msg=f"Deleting {new_challenge_directory}") - shutil.rmtree(new_challenge_directory) - else: - LOG.critical( - "Track already exists with that name. Use `--force` to overwrite the track." - ) - exit(code=1) - - os.mkdir(new_challenge_directory) - - LOG.debug(msg=f"Directory {new_challenge_directory} created.") - - env = jinja2.Environment( - loader=jinja2.FileSystemLoader( - searchpath=( - new_template_path := os.path.join( - get_ctf_script_templates_directory(), "new" - ) - ), - encoding="utf-8", - ) - ) - - ipv6_subnet = f"9000:d37e:c40b:{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}" - - rb = [ - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - ] - hardware_address = f"00:16:3e:{rb[0]}{rb[1]}:{rb[2]}{rb[3]}:{rb[4]}{rb[5]}" - ipv6_address = f"216:3eff:fe{rb[0]}{rb[1]}:{rb[2]}{rb[3]}{rb[4]}{rb[5]}" - full_ipv6_address = f"{ipv6_subnet}:{ipv6_address}" - - track_template = env.get_template(name=os.path.join("common", "track.yaml.j2")) - render = track_template.render( - data={ - "name": name, - "full_ipv6_address": full_ipv6_address, - "hardware_address": hardware_address, - "is_windows": template == Template.WINDOWS_VM, - "template": template.value, - "with_build": with_build_container, - "with_virtual_machine": with_virtual_machine, - } - ) - with open( - file=(p := os.path.join(new_challenge_directory, "track.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - readme_template = env.get_template(name=os.path.join("common", "README.md.j2")) - render = readme_template.render(data={"name": name}) - with open( - file=(p := os.path.join(new_challenge_directory, "README.md")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - posts_directory = os.path.join(new_challenge_directory, "posts") - - os.mkdir(path=posts_directory) - - LOG.debug(msg=f"Directory {posts_directory} created.") - - track_template = env.get_template(name=os.path.join("common", "topic.yaml.j2")) - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(posts_directory, f"{name}.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - track_template = env.get_template(name=os.path.join("common", "post.yaml.j2")) - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(posts_directory, f"{name}_flag1.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.TRACK_YAML_ONLY: - return - - files_directory = os.path.join(new_challenge_directory, "files") - - os.mkdir(path=files_directory) - - LOG.debug(msg=f"Directory {files_directory} created.") - - if template == Template.FILES_ONLY: - return - - terraform_directory = os.path.join(new_challenge_directory, "terraform") - - os.mkdir(path=terraform_directory) - - LOG.debug(msg=f"Directory {terraform_directory} created.") - - track_template = env.get_template(name=os.path.join("common", "main.tf.j2")) - - render = track_template.render( - data={ - "name": name, - "ipv6_subnet": ipv6_subnet, - "full_ipv6_address": full_ipv6_address, - "with_build": with_build_container, - "with_virtual_machine": with_virtual_machine, - "is_windows": template == Template.WINDOWS_VM, - } - ) - with open( - file=(p := os.path.join(terraform_directory, "main.tf")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - relpath = os.path.relpath( - os.path.join(find_ctf_root_directory(), ".deploy", "common"), - terraform_directory, - ) - - os.symlink( - src=os.path.join(relpath, "variables.tf"), - dst=(p := os.path.join(terraform_directory, "variables.tf")), - ) - - LOG.debug(msg=f"Wrote {p}.") - - os.symlink( - src=os.path.join(relpath, "versions.tf"), - dst=(p := os.path.join(terraform_directory, "versions.tf")), - ) - - LOG.debug(msg=f"Wrote {p}.") - - ansible_directory = os.path.join(new_challenge_directory, "ansible") - - os.mkdir(path=ansible_directory) - - LOG.debug(msg=f"Directory {ansible_directory} created.") - - track_template = env.get_template(name=os.path.join(template, "deploy.yaml.j2")) - render = track_template.render( - data={ - "name": name, - "with_build": with_build_container, - "with_virtual_machine": with_virtual_machine, - } - ) - with open( - file=(p := os.path.join(ansible_directory, "deploy.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - if with_build_container: - try: - track_template = env.get_template( - name=os.path.join(template, "build.yaml.j2") - ) - except jinja2.TemplateNotFound: - track_template = env.get_template( - name=os.path.join("common", "build.yaml.j2") - ) - - render = track_template.render( - data={"name": name, "with_build": with_build_container} - ) - - with open( - file=(p := os.path.join(ansible_directory, "build.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - LOG.debug(msg=f"Wrote {p}.") - - track_template = env.get_template(name=os.path.join("common", "inventory.j2")) - render = track_template.render( - data={ - "name": name, - "with_build": with_build_container, - "with_virtual_machine": with_virtual_machine, - "is_windows": template == Template.WINDOWS_VM, - } - ) - with open( - file=(p := os.path.join(ansible_directory, "inventory")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - ansible_challenge_directory = os.path.join(ansible_directory, "challenge") - - os.mkdir(path=ansible_challenge_directory) - - LOG.debug(msg=f"Directory {ansible_challenge_directory} created.") - - if template == Template.APACHE_PHP: - track_template = env.get_template( - name=os.path.join(Template.APACHE_PHP, "index.php.j2") - ) - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "index.php")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.PYTHON_SERVICE: - track_template = env.get_template( - name=os.path.join(Template.PYTHON_SERVICE, "app.py.j2") - ) - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "app.py")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - with open( - file=(p := os.path.join(ansible_challenge_directory, "flag-1.txt")), - mode="w", - encoding="utf-8", - ) as f: - f.write(f"{{{{ track_flags.{name}_flag_1 }}}} (1/2)\n") - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.RUST_WEBSERVICE: - # Copy the entire challenge template - shutil.copytree( - os.path.join( - new_template_path, - Template.RUST_WEBSERVICE, - "source", - ), - ansible_challenge_directory, - dirs_exist_ok=True, - ) - LOG.debug(msg=f"Wrote files to {ansible_challenge_directory}") - - manifest_template = env.get_template( - name=os.path.join(Template.RUST_WEBSERVICE, "Cargo.toml.j2") - ) - render = manifest_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "Cargo.toml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") diff --git a/ctf/templates/new/common/main.tf.j2 b/ctf/templates/new/common/main.tf.j2 index 14b4802..d8a8d40 100644 --- a/ctf/templates/new/common/main.tf.j2 +++ b/ctf/templates/new/common/main.tf.j2 @@ -56,7 +56,7 @@ resource "incus_network" "this" { "ipv6.address" = "{{ data.ipv6_subnet }}::1/64" }, var.already_deployed ? { - "security.acls" = join(",", [var.ctf_acl_network /*Uncomment if using your own ACL.*//*, incus_network_acl.this[0].name*/]) + "security.acls" = join(",", [var.ctf_acl_network /*Uncomment the following if using your own ACL.*/ /*, incus_network_acl.this[0].name*/]) } : {}, var.deploy == "production" ? { "ipv4.address" = "none" diff --git a/ctf/validate_json_schemas.py b/ctf/validate_json_schemas.py index d0b4c9d..17fd452 100644 --- a/ctf/validate_json_schemas.py +++ b/ctf/validate_json_schemas.py @@ -1,6 +1,7 @@ import argparse import glob import json +from pathlib import Path import jsonschema import rich @@ -14,18 +15,18 @@ ) from rich.table import Table -from ctf.logger import LOG +from ctf.common.logger import LOG -def validate_with_json_schemas(schema: str, files_pattern: str) -> None: - LOG.debug(msg="Starting JSON Schema validator") - LOG.debug(msg=f"Schema: {schema}") +def validate_with_json_schemas(schema: Path, files_pattern: str) -> None: + LOG.debug("Starting JSON Schema validator") + LOG.debug(f"Schema: {schema}") - schema = json.load(open(file=schema, mode="r", encoding="utf-8")) + schema = json.load(open(schema, mode="r", encoding="utf-8")) if not isinstance(schema, dict): LOG.error(msg=f"Loaded schema was not a dictionary: {schema}") - exit(code=1) + exit(1) errors = [] with Progress( @@ -37,7 +38,7 @@ def validate_with_json_schemas(schema: str, files_pattern: str) -> None: files = list(glob.glob(pathname=files_pattern)) task = progress.add_task(f"Validating JSON ({files_pattern})", total=len(files)) for file in files: - LOG.debug(msg=f"Validating {file}") + LOG.debug(f"Validating {file}") yaml_document = yaml.safe_load( stream=open(file=file, mode="r", encoding="utf-8") ) @@ -55,9 +56,9 @@ def validate_with_json_schemas(schema: str, files_pattern: str) -> None: for filename, error in errors: table.add_row(filename, error.message) rich.print(table) - exit(code=1) + exit(1) else: - LOG.debug(msg="No error found!") + LOG.debug("No error found!") if __name__ == "__main__":