diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index df537b062f..2ec344361e 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -3329,6 +3329,179 @@ def _resolve_installed_extension( raise typer.Exit(1) +def _validate_safe_cache_dir(project_root: Path) -> Path: + """Validate (and create) the per-project download cache directory safely. + + Walks ``.specify/extensions/.cache/downloads`` one component at a time, + refusing symlinked components and re-resolving each existing/new component + under ``project_root`` to catch non-symlink directory aliases (Windows + junctions / mount points) and mid-creation parent swaps. ``mkdir(parents= + True)`` would silently traverse a symlinked ancestor, so each component is + checked and created individually instead. + + Returns the validated download directory. Raises ``typer.Exit`` on any + safety violation (after printing a user-facing error). + """ + download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" + project_root_resolved = project_root.resolve() + try: + current = project_root + for part in download_dir.relative_to(project_root).parts: + current = current / part + if current.is_symlink(): + console.print("[red]Error:[/red] Refusing to use symlinked download cache directory") + raise typer.Exit(1) + if current.exists(): + if not current.is_dir(): + console.print( + f"[red]Error:[/red] Download cache path is not a directory: {current}" + ) + raise typer.Exit(1) + try: + current.resolve().relative_to(project_root_resolved) + except (OSError, ValueError): + console.print("[red]Error:[/red] Download cache directory escapes project root") + raise typer.Exit(1) + continue + # Race-tolerant create: a concurrent `extension add --from` + # process may have created the same component between the + # `current.exists()` check above and this line. Treat + # `FileExistsError` as success and re-run the symlink / + # containment / is_dir checks against whatever is now there. + try: + current.mkdir() + except FileExistsError: + pass + if not current.is_dir(): + console.print( + f"[red]Error:[/red] Download cache path is not a directory: {current}" + ) + raise typer.Exit(1) + if current.is_symlink(): + console.print("[red]Error:[/red] Refusing to use symlinked download cache directory") + raise typer.Exit(1) + try: + current.resolve().relative_to(project_root_resolved) + except (OSError, ValueError): + console.print("[red]Error:[/red] Download cache directory escapes project root") + raise typer.Exit(1) + except OSError as exc: + console.print( + f"[red]Error:[/red] Could not prepare download cache directory: {exc}" + ) + raise typer.Exit(1) + return download_dir + + +def _safe_open_download_zip(project_root: Path, download_dir: Path, zip_filename: str) -> int: + """Open ``download_dir / zip_filename`` for writing without following any symlink. + + Closes the TOCTOU window between cache-ancestor validation and the actual + file write. On POSIX, walks each component of ``download_dir`` using + ``dir_fd`` + ``O_NOFOLLOW`` so a symlink swap of *any* component + (including ancestors) after validation is rejected. The leaf is created + with ``O_EXCL`` so the unique UUID filename cannot collide with anything + an attacker pre-staged. + + On platforms without ``dir_fd``/``O_NOFOLLOW`` support (notably Windows), + no atomic primitive exists in the standard library to bind the leaf open + to a previously-validated ancestor chain. A path-based open after a + separate validation walk is racy: an attacker who can write inside the + project (junction creation on Windows does not require elevation) can + swap a cache ancestor to a junction between the validation loop and the + leaf open, redirecting the write outside the project root. To avoid that + silent escape, this helper fails closed on such platforms instead. + Callers should surface a clear error and direct users to ``--dev`` or + catalog-based installs as alternatives. + + Returns an open file descriptor; the caller owns and must close it. + Raises ``OSError`` (e.g. ``ELOOP``, ``EEXIST``, ``EACCES``) if any + component is a symlink or the leaf already exists, or + ``NotImplementedError`` on platforms lacking ``dir_fd`` support. + """ + o_nofollow = getattr(os, "O_NOFOLLOW", 0) + o_directory = getattr(os, "O_DIRECTORY", 0) + leaf_flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | o_nofollow + + if os.open not in os.supports_dir_fd: + # Fail closed: see docstring. No safe primitive available here. + raise NotImplementedError( + "URL-based extension installs require POSIX-style " + "dir_fd + O_NOFOLLOW support, which is not available on this " + "platform. Use --dev for a local directory, or install from " + "the bundled catalog instead." + ) + + rel_parts = download_dir.relative_to(project_root).parts + parent_fd = os.open(project_root, os.O_RDONLY | o_directory) + try: + for part in rel_parts: + new_fd = os.open( + part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd + ) + os.close(parent_fd) + parent_fd = new_fd + return os.open(zip_filename, leaf_flags, 0o600, dir_fd=parent_fd) + finally: + os.close(parent_fd) + + +def _safe_unlink_download_zip( + project_root: Path, download_dir: Path, zip_filename: str +) -> None: + """Best-effort unlink of the downloaded ZIP without following symlinks. + + On POSIX, walks each cache ancestor with ``dir_fd`` + ``O_NOFOLLOW`` and + unlinks via ``os.unlink(zip_filename, dir_fd=parent_fd)`` so an attacker + who swaps a cache ancestor between the write and the cleanup cannot + redirect the unlink onto a same-named file outside the project. + + On platforms without ``dir_fd``/``O_NOFOLLOW`` support, this function is + a no-op for the same reason :func:`_safe_open_download_zip` fails closed + there: a path-based ``unlink`` after a separate validation walk is racy + and could be redirected through a swapped ancestor onto a same-named + file outside the project. Skipping cleanup leaves a stale ZIP in the + cache directory but does not leak deletes; subsequent installs use + fresh UUID filenames so the stale file is harmless. (In practice this + branch is unreachable in the normal install flow because + :func:`_safe_open_download_zip` already fails closed before any ZIP is + written.) + + Errors are swallowed; this is best-effort cleanup. + """ + o_nofollow = getattr(os, "O_NOFOLLOW", 0) + o_directory = getattr(os, "O_DIRECTORY", 0) + + if os.open not in os.supports_dir_fd: + # Fail closed: see docstring. + return + + rel_parts = download_dir.relative_to(project_root).parts + try: + parent_fd = os.open(project_root, os.O_RDONLY | o_directory) + except OSError: + return + try: + for part in rel_parts: + try: + new_fd = os.open( + part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd + ) + except OSError: + return + os.close(parent_fd) + parent_fd = new_fd + try: + os.unlink(zip_filename, dir_fd=parent_fd) + except OSError: + pass + finally: + try: + os.close(parent_fd) + except OSError: + pass + + def _resolve_catalog_extension( argument: str, catalog, @@ -3603,6 +3776,16 @@ def extension_add( console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)") raise typer.Exit(1) + # For URL installs, validate (and create) the download cache *before* + # constructing ExtensionManager. The manager's constructor reads + # ``.specify/extensions/.registry`` via ``ExtensionRegistry``; if a + # ``.specify/extensions`` ancestor is symlinked, that read would follow + # the symlink before any guard ran. Performing the safe per-component + # walk first ensures the manager only ever opens registry files inside a + # validated, project-contained tree. + if from_url: + _download_dir = _validate_safe_cache_dir(project_root) + manager = ExtensionManager(project_root) speckit_version = get_speckit_version() @@ -3641,27 +3824,128 @@ def extension_add( console.print("Only install extensions from sources you trust.\n") console.print(f"Downloading from {from_url}...") - # Download ZIP to temp location - download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" - download_dir.mkdir(parents=True, exist_ok=True) - zip_path = download_dir / f"{extension}-url-download.zip" + # The download cache directory has already been safely + # validated/created above (before ExtensionManager construction) + # via `_validate_safe_cache_dir(project_root)`. Reuse the same + # validated `download_dir` here. + import uuid as _uuid + download_dir = _download_dir + project_root_resolved = project_root.resolve() + + safe_name = Path(extension).name.replace("/", "_").replace("\\", "_") or "download" + safe_name = safe_name[:64] # cap length to avoid filesystem errors + zip_filename = f"{safe_name}-{_uuid.uuid4().hex[:8]}.zip" + zip_path = download_dir / zip_filename + + # Containment guard: the resolved zip_path must stay inside the + # validated cache directory. + try: + zip_path.resolve().relative_to(download_dir.resolve()) + except ValueError: + console.print("[red]Error:[/red] Invalid extension name (path traversal detected)") + raise typer.Exit(1) try: from specify_cli.authentication.http import open_url as _open_url with _open_url(from_url, timeout=60) as response: zip_data = response.read() - zip_path.write_bytes(zip_data) - - # Install from downloaded ZIP - manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority) except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}") raise typer.Exit(1) + + # Re-validate cache containment immediately before opening the + # write fd. On POSIX, `_safe_open_download_zip` additionally + # re-walks ancestors with `dir_fd + O_NOFOLLOW` so any swap is + # rejected at open time as well; on platforms without that + # support the helper fails closed (see its docstring). + try: + download_dir.resolve().relative_to(project_root_resolved) + except (OSError, ValueError): + console.print("[red]Error:[/red] Download cache directory escapes project root") + raise typer.Exit(1) + + # Outer try/finally guarantees the symlink-safe cleanup runs + # regardless of where in the (open -> write -> install) + # pipeline a failure occurs, including a partial write that + # raises OSError after the fd has been opened. The narrower + # try/except blocks below classify *which* phase failed so + # error messages remain accurate. + try: + try: + # Open the ZIP file for writing without following any + # symlink in the path. On POSIX, the helper walks + # each ancestor of `download_dir` using + # dir_fd + O_NOFOLLOW so a symlink swap of *any* + # component (including ancestors) between validation + # and this write is rejected. On platforms without + # dir_fd / O_NOFOLLOW (Windows), the helper raises + # NotImplementedError; URL installs require the + # POSIX-style atomic primitives to be safe against + # ancestor swaps. + try: + _fd = _safe_open_download_zip( + project_root, download_dir, zip_filename + ) + except NotImplementedError as _ni: + console.print(f"[red]Error:[/red] {_ni}") + raise typer.Exit(1) + except OSError as _open_err: + console.print( + f"[red]Error:[/red] Could not safely create download file: {_open_err}" + ) + raise typer.Exit(1) + try: + with os.fdopen(_fd, "wb") as _zf: + _zf.write(zip_data) + except OSError as _write_err: + # Write failed (e.g. ENOSPC, EIO). os.fdopen took + # ownership of the fd; the with-block close will + # have already run on the way out. + console.print( + f"[red]Error:[/red] Failed to write download cache: {_write_err}" + ) + raise typer.Exit(1) + except BaseException: + # Non-OSError failure before/inside the + # with-block: defensively close the fd in case + # os.fdopen never took ownership, then re-raise + # so the outer finally still cleans up. + try: + os.close(_fd) + except OSError: + pass + raise + except OSError as e: + # Catches OSError raised from `_safe_open_download_zip` + # below the typed handlers above; preserves the + # narrow "cache write" framing. + console.print( + f"[red]Error:[/red] Failed to write download cache: {e}" + ) + raise typer.Exit(1) + + # Install from downloaded ZIP. Errors from the manager + # (ValidationError / CompatibilityError / ExtensionError) + # are caught by the outer handler so users see the + # extension-specific message rather than a generic + # "Failed to write download cache". + manifest = manager.install_from_zip( + zip_path, speckit_version, priority=priority + ) finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() + # Symlink-safe cleanup of the downloaded ZIP. Runs on + # every exit path (success, write failure, install + # failure) so a partial ZIP from an interrupted write is + # not left behind. On platforms without dir_fd / + # O_NOFOLLOW the helper is a no-op (fails closed) since + # a path-based unlink there could be redirected through + # a swapped ancestor onto a same-named file outside the + # project; in that case `_safe_open_download_zip` has + # already failed closed, so no ZIP exists to clean up. + _safe_unlink_download_zip( + project_root, download_dir, zip_filename + ) else: # Try bundled extensions first (shipped with spec-kit) diff --git a/tests/test_extension_add_path_traversal.py b/tests/test_extension_add_path_traversal.py new file mode 100644 index 0000000000..789b51432a --- /dev/null +++ b/tests/test_extension_add_path_traversal.py @@ -0,0 +1,369 @@ +"""Tests for path traversal guard in `specify extension add --from` (GHSA-67q9-p54f-7cpr). + +The extension argument is used to construct a temporary ZIP download path. +Without sanitisation, absolute paths or ``../`` segments in the argument can +escape the intended cache directory, causing arbitrary file writes and deletes. +""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import os + +import pytest +from typer.testing import CliRunner + +from specify_cli import app + +runner = CliRunner() + + +def _require_symlinks(tmp_path) -> None: + """Skip the calling test when symlink creation is not permitted. + + On Windows, symlink creation requires elevated privileges or developer + mode and may raise ``OSError`` (``EPERM``/``EACCES``/``WinError 1314``). + The shared-infra test suite uses the same pattern. + """ + probe_target = tmp_path / "_symlink_probe_target" + probe_target.mkdir(exist_ok=True) + probe_link = tmp_path / "_symlink_probe_link" + try: + probe_link.symlink_to(probe_target, target_is_directory=True) + except (OSError, NotImplementedError) as exc: + pytest.skip(f"Symlink creation not supported in this environment: {exc}") + finally: + if probe_link.exists() or probe_link.is_symlink(): + try: + probe_link.unlink() + except OSError: + pass + +def _is_absolute_like(payload: str) -> bool: + """Detect payloads that may be treated as an absolute path on any supported OS. + + Used to skip the delete-side regression test for those payloads, since + constructing a sentinel relative to ``cache_dir`` for an absolute-like + payload would land outside the pytest sandbox (especially on Windows + where ``Path('C:\\\\...')`` and ``\\\\\\\\server\\\\share`` are roots). + The write-side test still covers containment for these cases. + """ + if payload.startswith(("/", "\\")): + return True + # Drive letter forms: C:\..., C:/... + if len(payload) >= 2 and payload[1] == ":" and payload[0].isalpha(): + return True + return False + + +TRAVERSAL_PAYLOADS = [ + "../pwned", + "../../etc/passwd", + "subdir/../../escape", + "..\\pwned", + "..\\..\\etc\\passwd", + "/tmp/evil", + # Deep relative traversal long enough to escape from + # `.specify/extensions/.cache/downloads` (4 segments) all the way out + # of project_root and beyond. + "../../../../../etc/shadow", + # Windows absolute path forms. The CI matrix runs these tests on + # windows-latest, so the sanitiser must reject drive-letter and UNC + # roots in addition to POSIX absolute paths. + "C:\\tmp\\evil", + "C:/tmp/evil", + "\\\\server\\share\\evil", +] + + +@pytest.fixture() +def project_dir(tmp_path, monkeypatch): + proj = tmp_path / "project" + proj.mkdir() + (proj / ".specify").mkdir() + monkeypatch.chdir(proj) + return proj + + +def _mock_open_url(): + """Return a mock open_url that yields fake ZIP bytes.""" + mock_response = MagicMock() + mock_response.read.return_value = b"PK\x03\x04fake" + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=False) + return mock_response + + +class TestExtensionAddFromPathTraversal: + """Path traversal payloads in the extension name must not escape the download cache.""" + + @pytest.mark.parametrize("bad_name", TRAVERSAL_PAYLOADS) + def test_traversal_payload_writes_inside_cache(self, project_dir, bad_name): + """The zip_path passed to install_from_zip must resolve inside the cache dir.""" + cache_dir = project_dir / ".specify" / "extensions" / ".cache" / "downloads" + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip", return_value=MagicMock(id="x", name="X", version="1.0.0")) as mock_install: + result = runner.invoke( + app, + ["extension", "add", bad_name, "--from", "https://example.com/ext.zip"], + ) + assert result.exit_code == 0, result.output + mock_install.assert_called_once() + zip_arg = mock_install.call_args[0][0] # positional arg: zip_path + zip_arg.resolve().relative_to(cache_dir.resolve()) # raises ValueError if outside + + @pytest.mark.parametrize("bad_name", [p for p in TRAVERSAL_PAYLOADS if not _is_absolute_like(p)]) + def test_traversal_payload_cannot_delete_outside_cache(self, project_dir, bad_name): + """The finally-block cleanup must not delete files outside the cache dir. + + Absolute-like payloads (POSIX absolute, Windows drive letter, UNC) are + excluded here to avoid writing sentinels outside the pytest sandbox; + the write-side test above already proves the production guard contains + absolute payloads. + """ + # Place a sentinel at the path the pre-fix code would have constructed + cache_dir = project_dir / ".specify" / "extensions" / ".cache" / "downloads" + cache_dir.mkdir(parents=True, exist_ok=True) + pre_fix_path = cache_dir / f"{bad_name}-url-download.zip" + pre_fix_path.parent.mkdir(parents=True, exist_ok=True) + pre_fix_path.write_text("sentinel") + + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip", return_value=MagicMock(id="x", name="X", version="1.0.0")): + result = runner.invoke( + app, + ["extension", "add", bad_name, "--from", "https://example.com/ext.zip"], + ) + # The command must succeed: a non-zero exit would mean the install + # failed before reaching cleanup, which would leave the sentinel + # untouched for the wrong reason and mask any cleanup-side regression. + assert result.exit_code == 0, result.output + assert pre_fix_path.exists(), f"Sentinel deleted by cleanup: {pre_fix_path}" + assert pre_fix_path.read_text() == "sentinel" + + def test_clean_name_is_unaffected(self, project_dir): + """A normal extension name should not trigger the guard.""" + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install: + mock_install.return_value = MagicMock(id="my-ext", name="My Ext", version="1.0.0") + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code == 0, result.output + mock_install.assert_called_once() + # Verify the zip path is inside the cache + zip_arg = mock_install.call_args[0][0] + cache_dir = project_dir / ".specify" / "extensions" / ".cache" / "downloads" + zip_arg.resolve().relative_to(cache_dir.resolve()) + assert "path traversal" not in (result.output or "").lower() + + +class TestExtensionAddFromSymlinkedCache: + """A symlinked download-cache ancestor must be rejected before any write.""" + + @pytest.mark.parametrize( + "ancestor_parts", + [ + (".specify",), + (".specify", "extensions"), + (".specify", "extensions", ".cache"), + (".specify", "extensions", ".cache", "downloads"), + ], + ) + def test_symlinked_ancestor_is_refused(self, tmp_path, monkeypatch, ancestor_parts): + """Symlinking any cache ancestor outside the project must abort the command.""" + _require_symlinks(tmp_path) + project_dir = tmp_path / "project" + project_dir.mkdir() + outside = tmp_path / "outside" + outside.mkdir() + monkeypatch.chdir(project_dir) + + # Build the parent of the symlinked component using real directories. + parent = project_dir + for part in ancestor_parts[:-1]: + parent = parent / part + parent.mkdir() + + # Replace the final ancestor component with a symlink pointing outside the project. + symlink_path = parent / ancestor_parts[-1] + symlink_path.symlink_to(outside, target_is_directory=True) + + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install: + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code != 0 + assert "symlinked download cache" in (result.output or "").lower() + mock_install.assert_not_called() + # No download was written through the symlink + assert list(outside.iterdir()) == [] + + +class TestExtensionAddFromAncestorEscape: + """A non-symlink ancestor whose ``.resolve()`` lands outside the project must be refused. + + Simulates the Windows junction / mount-point case using a manually-pre-resolved + directory swap: we cannot create real junctions on POSIX, but we can patch the + ``.resolve()`` of an existing ancestor to land outside the project root. The + production guard must reject it before any download happens. + """ + + def test_ancestor_resolves_outside_project_is_refused(self, tmp_path, monkeypatch): + project_dir = tmp_path / "project" + project_dir.mkdir() + outside = tmp_path / "outside" + outside.mkdir() + monkeypatch.chdir(project_dir) + + # Pre-create the cache ancestors as real directories so the per-component + # walk reaches the resolve()/relative_to() check on each existing one. + cache_root = project_dir / ".specify" / "extensions" / ".cache" + cache_root.mkdir(parents=True) + + real_resolve = Path.resolve + + def fake_resolve(self, *a, **kw): + # Pretend the .cache ancestor resolves outside the project. + if self == cache_root: + return real_resolve(outside, *a, **kw) + return real_resolve(self, *a, **kw) + + with patch.object(Path, "resolve", fake_resolve), \ + patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install: + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code != 0 + assert "escapes project root" in (result.output or "").lower() + mock_install.assert_not_called() + assert list(outside.iterdir()) == [] + + +class TestExtensionAddFromTOCTOUWrite: + """The download write must not follow a symlink swapped in after validation. + + These tests rely on POSIX ``O_NOFOLLOW`` + ``dir_fd``-relative open and on + the runner being able to create symlinks. Skipped on platforms where either + is unavailable (notably Windows without elevated privileges). + """ + + def test_swapped_zip_path_symlink_is_refused(self, tmp_path, monkeypatch): + """If zip_path is replaced by a symlink between validation and write, refuse.""" + if not getattr(os, "O_NOFOLLOW", 0) or os.open not in os.supports_dir_fd: + pytest.skip("Requires POSIX O_NOFOLLOW + dir_fd support") + _require_symlinks(tmp_path) + project_dir = tmp_path / "project" + project_dir.mkdir() + (project_dir / ".specify").mkdir() + outside = tmp_path / "outside" + outside.mkdir() + target = outside / "stolen.bin" + monkeypatch.chdir(project_dir) + + from specify_cli import _safe_open_download_zip as _orig_safe_open + + def swap_then_open(project_root, download_dir, zip_filename): + # Race: pre-stage a symlink at the leaf inside the validated cache. + leaf = download_dir / zip_filename + try: + leaf.symlink_to(target) + except (OSError, NotImplementedError): + pytest.skip("Cannot create symlink for leaf-swap race") + return _orig_safe_open(project_root, download_dir, zip_filename) + + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install, \ + patch("specify_cli._safe_open_download_zip", side_effect=swap_then_open): + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code != 0 + mock_install.assert_not_called() + # The write must not have followed the symlink to `outside`. + assert not target.exists(), "write followed swapped leaf symlink" + + def test_swapped_ancestor_symlink_is_refused(self, tmp_path, monkeypatch): + """If a *cache ancestor* is replaced by a symlink between validation and write, refuse. + + Covers the case `O_NOFOLLOW` alone misses: it only protects the final + component, but the per-ancestor walk in ``_safe_open_download_zip`` + (using ``dir_fd`` + ``O_NOFOLLOW`` on each component) catches this. + """ + if not getattr(os, "O_NOFOLLOW", 0) or os.open not in os.supports_dir_fd: + pytest.skip("Requires POSIX O_NOFOLLOW + dir_fd support") + _require_symlinks(tmp_path) + project_dir = tmp_path / "project" + project_dir.mkdir() + (project_dir / ".specify").mkdir() + outside = tmp_path / "outside" + outside.mkdir() + monkeypatch.chdir(project_dir) + + # Pre-create the cache ancestors so the production validation walk + # passes (the swap happens *after* validation, before the write). + cache_dir = project_dir / ".specify" / "extensions" / ".cache" / "downloads" + cache_dir.mkdir(parents=True) + ancestor_to_swap = project_dir / ".specify" / "extensions" / ".cache" + + from specify_cli import _safe_open_download_zip as _orig_safe_open + + def swap_then_open(project_root, download_dir, zip_filename): + # Race: replace `.cache` with a symlink to `outside` after the + # caller's per-ancestor validation finished. + import shutil + shutil.rmtree(ancestor_to_swap) + try: + ancestor_to_swap.symlink_to(outside, target_is_directory=True) + except (OSError, NotImplementedError): + pytest.skip("Cannot create symlink for ancestor-swap race") + return _orig_safe_open(project_root, download_dir, zip_filename) + + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install, \ + patch("specify_cli._safe_open_download_zip", side_effect=swap_then_open): + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code != 0 + # Clean CLI error from the OSError handler, not a raw traceback. + assert "could not safely create download file" in (result.output or "").lower() \ + or "failed to write download cache" in (result.output or "").lower() + mock_install.assert_not_called() + # No download leaked through the swapped ancestor symlink. + assert list(outside.iterdir()) == [] + + +class TestExtensionAddFromCacheAncestorIsFile: + """A non-directory file at a cache-ancestor path must be refused with a clean error.""" + + def test_file_at_extensions_path_is_rejected_cleanly(self, tmp_path, monkeypatch): + project_dir = tmp_path / "project" + project_dir.mkdir() + (project_dir / ".specify").mkdir() + # Place a regular file where `.specify/extensions/` would be created. + (project_dir / ".specify" / "extensions").write_text("i am a file") + monkeypatch.chdir(project_dir) + + with patch("specify_cli.authentication.http.open_url", return_value=_mock_open_url()), \ + patch("specify_cli.extensions.ExtensionManager.install_from_zip") as mock_install: + result = runner.invoke( + app, + ["extension", "add", "my-ext", "--from", "https://example.com/ext.zip"], + ) + + assert result.exit_code != 0 + assert "not a directory" in (result.output or "").lower() + mock_install.assert_not_called()