Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 295 additions & 11 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3329,6 +3329,179 @@ def _resolve_installed_extension(
raise typer.Exit(1)


def _validate_safe_cache_dir(project_root: Path) -> Path:
"""Validate (and create) the per-project download cache directory safely.

Walks ``.specify/extensions/.cache/downloads`` one component at a time,
refusing symlinked components and re-resolving each existing/new component
under ``project_root`` to catch non-symlink directory aliases (Windows
junctions / mount points) and mid-creation parent swaps. ``mkdir(parents=
True)`` would silently traverse a symlinked ancestor, so each component is
checked and created individually instead.

Returns the validated download directory. Raises ``typer.Exit`` on any
safety violation (after printing a user-facing error).
"""
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
project_root_resolved = project_root.resolve()
try:
current = project_root
for part in download_dir.relative_to(project_root).parts:
current = current / part
if current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
if current.exists():
if not current.is_dir():
console.print(
f"[red]Error:[/red] Download cache path is not a directory: {current}"
)
raise typer.Exit(1)
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)
continue
# Race-tolerant create: a concurrent `extension add --from`
# process may have created the same component between the
# `current.exists()` check above and this line. Treat
# `FileExistsError` as success and re-run the symlink /
# containment / is_dir checks against whatever is now there.
try:
current.mkdir()
except FileExistsError:
pass
if not current.is_dir():
console.print(
f"[red]Error:[/red] Download cache path is not a directory: {current}"
)
raise typer.Exit(1)
if current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)
Comment on lines +3347 to +3387
except OSError as exc:
console.print(
f"[red]Error:[/red] Could not prepare download cache directory: {exc}"
)
raise typer.Exit(1)
return download_dir


def _safe_open_download_zip(project_root: Path, download_dir: Path, zip_filename: str) -> int:
"""Open ``download_dir / zip_filename`` for writing without following any symlink.

Closes the TOCTOU window between cache-ancestor validation and the actual
file write. On POSIX, walks each component of ``download_dir`` using
``dir_fd`` + ``O_NOFOLLOW`` so a symlink swap of *any* component
(including ancestors) after validation is rejected. The leaf is created
with ``O_EXCL`` so the unique UUID filename cannot collide with anything
an attacker pre-staged.

On platforms without ``dir_fd``/``O_NOFOLLOW`` support (notably Windows),
no atomic primitive exists in the standard library to bind the leaf open
to a previously-validated ancestor chain. A path-based open after a
separate validation walk is racy: an attacker who can write inside the
project (junction creation on Windows does not require elevation) can
swap a cache ancestor to a junction between the validation loop and the
leaf open, redirecting the write outside the project root. To avoid that
silent escape, this helper fails closed on such platforms instead.
Callers should surface a clear error and direct users to ``--dev`` or
catalog-based installs as alternatives.

Returns an open file descriptor; the caller owns and must close it.
Raises ``OSError`` (e.g. ``ELOOP``, ``EEXIST``, ``EACCES``) if any
component is a symlink or the leaf already exists, or
``NotImplementedError`` on platforms lacking ``dir_fd`` support.
"""
o_nofollow = getattr(os, "O_NOFOLLOW", 0)
o_directory = getattr(os, "O_DIRECTORY", 0)
leaf_flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | o_nofollow

if os.open not in os.supports_dir_fd:
# Fail closed: see docstring. No safe primitive available here.
Comment on lines +3422 to +3427
raise NotImplementedError(
"URL-based extension installs require POSIX-style "
"dir_fd + O_NOFOLLOW support, which is not available on this "
"platform. Use --dev for a local directory, or install from "
"the bundled catalog instead."
)

Comment on lines +3412 to +3434
rel_parts = download_dir.relative_to(project_root).parts
parent_fd = os.open(project_root, os.O_RDONLY | o_directory)
try:
for part in rel_parts:
new_fd = os.open(
part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd
)
os.close(parent_fd)
parent_fd = new_fd
return os.open(zip_filename, leaf_flags, 0o600, dir_fd=parent_fd)
finally:
os.close(parent_fd)


def _safe_unlink_download_zip(
project_root: Path, download_dir: Path, zip_filename: str
) -> None:
"""Best-effort unlink of the downloaded ZIP without following symlinks.

On POSIX, walks each cache ancestor with ``dir_fd`` + ``O_NOFOLLOW`` and
unlinks via ``os.unlink(zip_filename, dir_fd=parent_fd)`` so an attacker
who swaps a cache ancestor between the write and the cleanup cannot
redirect the unlink onto a same-named file outside the project.

On platforms without ``dir_fd``/``O_NOFOLLOW`` support, this function is
a no-op for the same reason :func:`_safe_open_download_zip` fails closed
there: a path-based ``unlink`` after a separate validation walk is racy
and could be redirected through a swapped ancestor onto a same-named
file outside the project. Skipping cleanup leaves a stale ZIP in the
cache directory but does not leak deletes; subsequent installs use
fresh UUID filenames so the stale file is harmless. (In practice this
branch is unreachable in the normal install flow because
:func:`_safe_open_download_zip` already fails closed before any ZIP is
written.)

Errors are swallowed; this is best-effort cleanup.
"""
o_nofollow = getattr(os, "O_NOFOLLOW", 0)
o_directory = getattr(os, "O_DIRECTORY", 0)

if os.open not in os.supports_dir_fd:
Comment on lines +3472 to +3475
# Fail closed: see docstring.
return

rel_parts = download_dir.relative_to(project_root).parts
try:
parent_fd = os.open(project_root, os.O_RDONLY | o_directory)
except OSError:
return
try:
for part in rel_parts:
try:
new_fd = os.open(
part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd
)
except OSError:
return
os.close(parent_fd)
parent_fd = new_fd
try:
os.unlink(zip_filename, dir_fd=parent_fd)
except OSError:
pass
finally:
try:
os.close(parent_fd)
except OSError:
pass


def _resolve_catalog_extension(
argument: str,
catalog,
Expand Down Expand Up @@ -3603,6 +3776,16 @@ def extension_add(
console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)")
raise typer.Exit(1)

# For URL installs, validate (and create) the download cache *before*
# constructing ExtensionManager. The manager's constructor reads
# ``.specify/extensions/.registry`` via ``ExtensionRegistry``; if a
# ``.specify/extensions`` ancestor is symlinked, that read would follow
# the symlink before any guard ran. Performing the safe per-component
# walk first ensures the manager only ever opens registry files inside a
# validated, project-contained tree.
if from_url:
_download_dir = _validate_safe_cache_dir(project_root)

manager = ExtensionManager(project_root)
speckit_version = get_speckit_version()

Expand Down Expand Up @@ -3641,27 +3824,128 @@ def extension_add(
console.print("Only install extensions from sources you trust.\n")
console.print(f"Downloading from {from_url}...")

# Download ZIP to temp location
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
download_dir.mkdir(parents=True, exist_ok=True)
zip_path = download_dir / f"{extension}-url-download.zip"
# The download cache directory has already been safely
# validated/created above (before ExtensionManager construction)
# via `_validate_safe_cache_dir(project_root)`. Reuse the same
# validated `download_dir` here.
import uuid as _uuid
download_dir = _download_dir
project_root_resolved = project_root.resolve()

safe_name = Path(extension).name.replace("/", "_").replace("\\", "_") or "download"
safe_name = safe_name[:64] # cap length to avoid filesystem errors
zip_filename = f"{safe_name}-{_uuid.uuid4().hex[:8]}.zip"
zip_path = download_dir / zip_filename
Comment thread
mnriem marked this conversation as resolved.

# Containment guard: the resolved zip_path must stay inside the
# validated cache directory.
try:
zip_path.resolve().relative_to(download_dir.resolve())
except ValueError:
console.print("[red]Error:[/red] Invalid extension name (path traversal detected)")
raise typer.Exit(1)

try:
from specify_cli.authentication.http import open_url as _open_url

with _open_url(from_url, timeout=60) as response:
zip_data = response.read()
zip_path.write_bytes(zip_data)

# Install from downloaded ZIP
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
raise typer.Exit(1)

# Re-validate cache containment immediately before opening the
# write fd. On POSIX, `_safe_open_download_zip` additionally
# re-walks ancestors with `dir_fd + O_NOFOLLOW` so any swap is
# rejected at open time as well; on platforms without that
# support the helper fails closed (see its docstring).
try:
download_dir.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)

# Outer try/finally guarantees the symlink-safe cleanup runs
# regardless of where in the (open -> write -> install)
# pipeline a failure occurs, including a partial write that
# raises OSError after the fd has been opened. The narrower
# try/except blocks below classify *which* phase failed so
# error messages remain accurate.
try:
try:
# Open the ZIP file for writing without following any
# symlink in the path. On POSIX, the helper walks
# each ancestor of `download_dir` using
# dir_fd + O_NOFOLLOW so a symlink swap of *any*
# component (including ancestors) between validation
# and this write is rejected. On platforms without
# dir_fd / O_NOFOLLOW (Windows), the helper raises
# NotImplementedError; URL installs require the
# POSIX-style atomic primitives to be safe against
# ancestor swaps.
try:
_fd = _safe_open_download_zip(
project_root, download_dir, zip_filename
)
except NotImplementedError as _ni:
console.print(f"[red]Error:[/red] {_ni}")
raise typer.Exit(1)
except OSError as _open_err:
console.print(
f"[red]Error:[/red] Could not safely create download file: {_open_err}"
)
raise typer.Exit(1)
try:
with os.fdopen(_fd, "wb") as _zf:
_zf.write(zip_data)
except OSError as _write_err:
# Write failed (e.g. ENOSPC, EIO). os.fdopen took
# ownership of the fd; the with-block close will
# have already run on the way out.
console.print(
f"[red]Error:[/red] Failed to write download cache: {_write_err}"
)
raise typer.Exit(1)
except BaseException:
# Non-OSError failure before/inside the
# with-block: defensively close the fd in case
# os.fdopen never took ownership, then re-raise
# so the outer finally still cleans up.
try:
os.close(_fd)
except OSError:
pass
raise
except OSError as e:
# Catches OSError raised from `_safe_open_download_zip`
# below the typed handlers above; preserves the
# narrow "cache write" framing.
console.print(
f"[red]Error:[/red] Failed to write download cache: {e}"
)
raise typer.Exit(1)

# Install from downloaded ZIP. Errors from the manager
# (ValidationError / CompatibilityError / ExtensionError)
# are caught by the outer handler so users see the
# extension-specific message rather than a generic
# "Failed to write download cache".
manifest = manager.install_from_zip(
zip_path, speckit_version, priority=priority
)
Comment on lines +3933 to +3935
finally:
# Clean up downloaded ZIP
if zip_path.exists():
zip_path.unlink()
# Symlink-safe cleanup of the downloaded ZIP. Runs on
# every exit path (success, write failure, install
# failure) so a partial ZIP from an interrupted write is
# not left behind. On platforms without dir_fd /
# O_NOFOLLOW the helper is a no-op (fails closed) since
# a path-based unlink there could be redirected through
# a swapped ancestor onto a same-named file outside the
# project; in that case `_safe_open_download_zip` has
# already failed closed, so no ZIP exists to clean up.
_safe_unlink_download_zip(
project_root, download_dir, zip_filename
)

else:
# Try bundled extensions first (shipped with spec-kit)
Expand Down
Loading
Loading