diff --git a/CHANGES b/CHANGES index 8471ae7e0..562c3586c 100644 --- a/CHANGES +++ b/CHANGES @@ -39,6 +39,44 @@ _Notes on upcoming releases will be added here_ ### New features +#### `vcspull sync`: parallel batch sync via `--jobs N` + +Sync up to N repositories in parallel (default `min(8, CPU*2)`). The +batch's wallclock now scales with the slowest few repos rather than +the sum of all of them -- a 50-repo workspace of already-up-to-date +repos finishes in roughly ~1/Nth the time it used to. + +Tweak with `--jobs N` / `-j N` or the `VCSPULL_JOBS` environment +variable. Pass `--jobs 1` to force the legacy single-spinner UX bit- +for-bit. The `min(8, ...)` cap is empirical -- it stays polite to +GitHub's per-IP rate limits while still saturating most local mirrors; +big workspaces of 50+ repos may want `VCSPULL_JOBS=2-3` to avoid +bursts. + +In a TTY, the indicator switches from a single spinner row + 3-line +live-trail panel to a fixed-height active region of N spinner rows at +the bottom of the terminal. Permanent `✓ Synced ` +lines scroll into scrollback above the active region as repos finish +-- the same trick `cargo build` and `pueue` use. The 3-line panel is +disabled in multi-slot mode (a shared deque with N concurrent writers +reads as noise); each slot's most-recent libvcs progress message +becomes the per-row suffix instead. + +JSON / NDJSON events emit in completion order via +`asyncio.as_completed`, matching the streaming model and using +constant memory regardless of repo count. + +The orchestrator is asyncio-based (`asyncio.Semaphore(jobs)` + +`asyncio.as_completed`) over per-task daemon threads. Daemon threads +bridge libvcs's synchronous `update_repo` into the loop and avoid the +default `ThreadPoolExecutor`'s atexit-join footgun: a wedged libvcs +subprocess at interpreter shutdown would otherwise hang the process. + +`--exit-on-error` in parallel mode sets a stop event so queued tasks +short-circuit before starting, but in-flight tasks are allowed to +complete so their output is captured -- the user still sees what was +already running. + #### `vcspull sync`: collapsing 3-line live-trail above the spinner Streaming subprocess output (git's `From `, `* [new branch]`, diff --git a/src/vcspull/cli/__init__.py b/src/vcspull/cli/__init__.py index 6cf53da44..50a7cc6e3 100644 --- a/src/vcspull/cli/__init__.py +++ b/src/vcspull/cli/__init__.py @@ -477,6 +477,7 @@ def cli(_args: list[str] | None = None) -> None: log_file=getattr(args, "log_file", None), no_log_file=getattr(args, "no_log_file", False), panel_lines=getattr(args, "panel_lines", None), + jobs=getattr(args, "jobs", None), ) elif args.subparser_name == "list": list_repos( diff --git a/src/vcspull/cli/_progress.py b/src/vcspull/cli/_progress.py index 116c04913..1a02966d3 100644 --- a/src/vcspull/cli/_progress.py +++ b/src/vcspull/cli/_progress.py @@ -1,4 +1,4 @@ -"""Live status indicator for ``vcspull sync``. +r"""Live status indicator for ``vcspull sync``. Shows the user which repository is currently being synced and how long it has been running. In a TTY a single-line braille spinner refreshes every ~100 ms; @@ -6,6 +6,13 @@ "still syncing" heartbeats so the output stream keeps moving without flooding the log. +When ``slots > 1`` the indicator switches into multi-row mode: a fixed-height +active region of ``slots`` spinner rows lives at the bottom of the terminal, +and permanent ``✓ Synced ...`` lines scroll into scrollback above it as +repos finish. This is the same trick ``cargo build`` and ``pueue`` use -- +write the permanent line ABOVE the active region so a ``\n`` from the +viewport bottom scrolls one row out of the active region into history. + Inspired by ``tmuxp``'s spinner module -- stdlib + ANSI only, no ``rich`` dependency. """ @@ -14,6 +21,7 @@ import atexit import collections +import dataclasses import io import itertools import logging @@ -83,13 +91,30 @@ def _close_indicator_quietly(indicator: SyncStatusIndicator) -> None: atexit.register(_restore_cursors_on_exit) +@dataclasses.dataclass +class _Slot: + """Per-slot state for multi-slot indicator mode.""" + + name: str + started_at: float + last_message: str = "" + + class SyncStatusIndicator: """Owns the "which repo is running now" UI for a sync session. - Exactly one repo is considered "active" at a time -- this matches the - current sequential sync loop. Callers drive the indicator with + In single-slot mode (``slots=1``, the default) exactly one repo is + considered "active" at a time. Callers drive the indicator with :meth:`start_repo` / :meth:`stop_repo`, or via the context manager returned by :meth:`repo`. + + In multi-slot mode (``slots > 1``) the indicator owns a fixed-height + active region of ``slots`` rows. Callers reserve a slot via + :meth:`acquire_slot`, push per-slot progress messages via + :meth:`update_slot_message`, and finalise via :meth:`release_slot` + with a permanent line that scrolls into scrollback above the active + region. The single-row legacy methods continue to work in + multi-slot mode -- they map to slot 0. """ def __init__( @@ -100,6 +125,7 @@ def __init__( tty: bool | None = None, colors: Colors | None = None, output_lines: int = _DEFAULT_OUTPUT_LINES, + slots: int = 1, ) -> None: self._stream = stream if stream is not None else sys.stdout # Respect the explicit tty override (tests, ``--color=never``) but @@ -118,6 +144,14 @@ def __init__( # captured streams in tests. self._colors = colors if colors is not None else Colors(ColorMode.NEVER) + self._slot_count = max(1, slots) + # Multi-slot mode disables the live-trail panel: the panel is a + # single-source-of-output deque and looks like noise when N + # workers feed it concurrently. Each slot's most-recent message + # becomes the per-row suffix instead. + if self._slot_count > 1: + output_lines = 0 + # Live-trail panel above the spinner. ``0`` disables the panel and # ``add_output_line`` falls back to plain ``write()`` semantics. # ``-1`` means unbounded -- rare; useful when piping ``-vv`` output @@ -146,6 +180,20 @@ def __init__( self._last_line_len = 0 self._cursor_hidden = False + # Multi-slot state. Even in single-slot mode the legacy + # ``_active_repo`` / ``_repo_started_at`` continue to drive the + # render path, so ``_slots`` is unused there. In multi-slot mode + # ``acquire_slot`` / ``release_slot`` populate ``_slots`` and + # ``_pending_permanents`` queues lines drained at the next tick. + self._slots: list[_Slot | None] = [None] * self._slot_count + self._pending_permanents: list[str] = [] + # Number of active-region rows we drew last frame. The next + # frame walks up ``_prev_active_rows - 1`` rows + CR to reach the + # top of the previous active region. Stays at ``slot_count`` + # during the run; reset to ``0`` by ``write()`` / ``close()`` + # after they erase the region. + self._prev_active_rows = 0 + # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @@ -279,6 +327,112 @@ def add_output_line(self, text: str) -> None: if stripped: self._panel_buffer.append(stripped) + def acquire_slot(self, name: str) -> int: + """Reserve an idle slot for ``name``; returns the slot index. + + In multi-slot mode the dispatcher acquires a slot before kicking + off a per-repo daemon thread; the index is later passed to + :meth:`release_slot` and :meth:`update_slot_message`. Raises + ``RuntimeError`` if no idle slot is available -- the caller is + expected to gate dispatch on a semaphore matching ``slot_count``, + so over-subscription is a programming error. + + Returns ``-1`` when the indicator is disabled; callers should + treat that as "no row to update" and skip per-slot message + updates. + """ + if not self._enabled: + return -1 + with self._lock: + for idx in range(self._slot_count): + if self._slots[idx] is None: + now = time.monotonic() + self._slots[idx] = _Slot(name=name, started_at=now) + if self._last_heartbeat_at is None: + self._last_heartbeat_at = now + break + else: + msg = ( + f"acquire_slot called with all {self._slot_count} slots " + "busy; gate dispatch on a semaphore matching slot_count" + ) + raise RuntimeError(msg) + + if self._tty: + self._ensure_tty_thread() + else: + # Headless mode: emit a once-on-start line per repo. + self._emit_line(f"Syncing {name}") + return idx + + def release_slot( + self, + slot: int, + final_line: str | None = None, + ) -> bool: + r"""Release ``slot``; queue ``final_line`` to scroll into scrollback. + + When ``final_line`` is provided AND the indicator is rendering on + a TTY in multi-slot mode, the line is appended to a pending- + permanents queue that the next render tick drains -- writing it + ABOVE the active region so a ``\n`` from the viewport bottom + scrolls one row out of the active region into history. Returns + ``True`` when the line was queued (caller should skip its own + ``formatter.emit_text``); ``False`` otherwise (caller emits + normally). + + Mirrors the contract of :meth:`stop_repo` so dispatchers can + treat the slot-aware and legacy call sites uniformly. + """ + if not self._enabled: + return False + if slot < 0 or slot >= self._slot_count: + return False + with self._lock: + self._slots[slot] = None + if not self._tty: + # Headless: caller is responsible for surfacing + # ``final_line`` itself. We've still freed the slot. + return False + if final_line is not None: + self._pending_permanents.append(final_line) + return final_line is not None + + def update_slot_message(self, slot: int, message: str) -> None: + """Update the per-slot 'last activity' string shown after the name. + + Multi-row equivalent of :meth:`add_output_line` for a specific + slot. Long messages are trimmed; only the last non-blank line + of a multi-line chunk is kept (libvcs's progress callback often + delivers multi-line bursts; only the most recent line is + interesting in the per-slot suffix). + + In non-TTY mode (CI logs, ``capsys`` capture) the per-slot + suffix has no render path, so we fall through to the same + write-the-chunk-to-the-stream behaviour as + :meth:`add_output_line`. This keeps libvcs progress output + visible in log capture and pipelines. + """ + if not self._enabled or slot < 0 or slot >= self._slot_count: + return + if not message: + return + if not self._tty: + payload = message if message.endswith("\n") else message + "\n" + self.write(payload) + return + last_line = "" + for line in message.splitlines(): + candidate = line.rstrip() + if candidate: + last_line = candidate + if not last_line: + return + with self._lock: + current = self._slots[slot] + if current is not None: + current.last_message = last_line + def repo(self, name: str) -> _RepoContext: """Context manager form of :meth:`start_repo` / :meth:`stop_repo`.""" return _RepoContext(self, name) @@ -304,11 +458,17 @@ def write(self, text: str) -> None: return with self._lock: try: - if self._tty and (self._last_line_len or self._panel_visible_lines): - # Walk back over the panel + spinner before printing - # so the log record lands on a fresh row instead of - # over-writing the trail. The next render redraws the - # whole frame from scratch. + if self._tty and self._slot_count > 1 and self._prev_active_rows > 0: + # Multi-slot: walk back over the active region (no panel). + # The next render redraws the whole frame from scratch. + self._stream.write(_CURSOR_TO_COL0 + _ERASE_LINE) + for _ in range(self._prev_active_rows - 1): + self._stream.write("\x1b[1A" + _ERASE_LINE) + self._prev_active_rows = 0 + elif self._tty and (self._last_line_len or self._panel_visible_lines): + # Single-slot: walk back over the panel + spinner before + # printing so the log record lands on a fresh row instead + # of over-writing the trail. self._stream.write(_CURSOR_TO_COL0 + _ERASE_LINE) for _ in range(self._panel_visible_lines): self._stream.write("\x1b[1A" + _ERASE_LINE) @@ -333,10 +493,33 @@ def close(self) -> None: self._thread = None if thread is not None: thread.join(timeout=1.0) - # Erase the whole frame (panel + spinner) on close, not just the - # spinner row -- otherwise leftover panel rows linger in - # scrollback when the indicator is closed mid-repo (e.g. on - # KeyboardInterrupt before ``stop_repo``). + + # Multi-slot mode: drain any pending permanents the render thread + # never picked up (race between the last ``release_slot`` and the + # ``stop_event`` flip). They scroll into scrollback as plain lines. + if self._slot_count > 1 and self._pending_permanents: + with self._lock: + pending = self._pending_permanents + self._pending_permanents = [] + try: + # Walk back over the active region once before emitting -- + # otherwise the pending lines land BELOW the active rows + # which is the opposite of "scroll into scrollback". + if self._tty and self._prev_active_rows > 0: + self._stream.write(_CURSOR_TO_COL0 + _ERASE_LINE) + for _ in range(self._prev_active_rows - 1): + self._stream.write("\x1b[1A" + _ERASE_LINE) + self._prev_active_rows = 0 + for line in pending: + self._stream.write(line + "\n") + self._stream.flush() + except (OSError, ValueError): + pass + + # Erase the whole frame (panel + spinner, or N slot rows) on close + # -- otherwise leftover panel rows linger in scrollback when the + # indicator is closed mid-repo (e.g. on KeyboardInterrupt before + # ``stop_repo``). self._clear_block() if self._cursor_hidden: try: @@ -376,14 +559,17 @@ def _tty_loop(self) -> None: frames = itertools.cycle(_SPINNER_FRAMES) while not self._stop_event.is_set(): frame = next(frames) - with self._lock: - name = self._active_repo - started = self._repo_started_at - if name is not None and started is not None: - elapsed = time.monotonic() - started - self._render_tty(frame, name, elapsed) + if self._slot_count > 1: + self._render_tty_multi(frame) else: - self._clear_line() + with self._lock: + name = self._active_repo + started = self._repo_started_at + if name is not None and started is not None: + elapsed = time.monotonic() - started + self._render_tty(frame, name, elapsed) + else: + self._clear_line() self._stop_event.wait(_TTY_REFRESH_INTERVAL) def _render_tty(self, frame: str, name: str, elapsed: float) -> None: @@ -450,6 +636,99 @@ def _render_tty(self, frame: str, name: str, elapsed: float) -> None: except (OSError, ValueError): pass + def _format_slot_row( + self, + frame: str, + slot: _Slot, + elapsed: float, + ) -> tuple[str, str]: + """Build the (visible, coloured) pair for one slot row. + + ``visible`` is used for column-width tracking (no ANSI), ``coloured`` + is the actual write payload. Mirrors :meth:`_render_tty`'s shape so + the multi-row render reads consistently with the single-row one. + """ + coloured_frame = self._colors.info(frame) + coloured_name = self._colors.info(slot.name) + suffix = "" + if slot.last_message: + # Trim long messages to keep the row from wrapping. Libvcs + # progress chunks include things like ``Receiving objects: + # 100% (1234/1234)`` -- ~80 columns is enough for the part + # that matters. + short = slot.last_message[:80] + suffix = f" {self._colors.muted(short)}" + visible_suffix = f" {short}" + else: + visible_suffix = "" + visible = f"{frame} Syncing {slot.name} ... {elapsed:4.1f}s{visible_suffix}" + coloured = ( + f"{coloured_frame} Syncing {coloured_name} ... {elapsed:4.1f}s{suffix}" + ) + return visible, coloured + + def _render_tty_multi(self, frame: str) -> None: + r"""Render the multi-row active region in one synchronized write. + + Layout each frame: + + - Walk up ``_prev_active_rows - 1`` rows (cursor sits at end of + previous frame's bottom row), then ``\r``. + - For each pending permanent line: ``ERASE_LINE + line + \n`` -- + this is the cargo/pueue trick that scrolls one row out of the + active region into scrollback when we're at the viewport bottom. + - For each slot (idle or active): ``ERASE_LINE + content`` with + a trailing ``\n`` on every row except the last, so the cursor + stays at the end of the bottom slot row for the next frame's + walk-up math. + + Idle slots render as blank rows so the active-region height is + constant at ``slot_count`` -- predictable geometry beats variable + height for the walk-up arithmetic. + """ + with self._lock: + slots = list(self._slots) + pending = self._pending_permanents + self._pending_permanents = [] + prev_rows = self._prev_active_rows + any_active = any(s is not None for s in slots) + # Nothing to draw, nothing previously drawn -- skip the tick. + if not pending and not any_active and prev_rows == 0: + return + + now = time.monotonic() + parts: list[str] = [_SYNC_START] + if prev_rows > 0: + if prev_rows > 1: + parts.append(f"\x1b[{prev_rows - 1}A") + parts.append(_CURSOR_TO_COL0) + + # Drain pending permanents -- these scroll into scrollback. + parts.extend(_ERASE_LINE + permanent + "\n" for permanent in pending) + + # Render each slot row. Idle slots render blank. + last_idx = len(slots) - 1 + for idx, slot in enumerate(slots): + if slot is None: + coloured = "" + else: + elapsed = now - slot.started_at + _, coloured = self._format_slot_row(frame, slot, elapsed) + if idx < last_idx: + parts.append(_ERASE_LINE + coloured + "\n") + else: + # Bottom row: no trailing \n so the cursor stays put + # for the next frame's walk-up. + parts.append(_ERASE_LINE + coloured) + + parts.append(_SYNC_END) + try: + self._stream.write("".join(parts)) + self._stream.flush() + self._prev_active_rows = len(slots) + except (OSError, ValueError): + pass + def _emit_line(self, line: str) -> None: try: self._stream.write(line + "\n") @@ -476,9 +755,25 @@ def _clear_block(self) -> None: call lands on a clean row. Without this, the panel rows would stay sticky in scrollback and the ``stop_repo`` -> permanent-line transition would scroll-leak history. + + In multi-slot mode the active region is ``_prev_active_rows`` tall + and the legacy single-row state is unused; walk back over the + slot rows instead of the panel + spinner row. """ if not self._tty: return + if self._slot_count > 1: + if self._prev_active_rows == 0: + return + try: + self._stream.write(_CURSOR_TO_COL0 + _ERASE_LINE) + for _ in range(self._prev_active_rows - 1): + self._stream.write("\x1b[1A" + _ERASE_LINE) + self._stream.flush() + except (OSError, ValueError): + pass + self._prev_active_rows = 0 + return try: # Walk up panel rows + the spinner row, erasing each. self._stream.write(_CURSOR_TO_COL0 + _ERASE_LINE) @@ -547,6 +842,7 @@ def build_indicator( tty: bool | None = None, colors: Colors | None = None, output_lines: int = _DEFAULT_OUTPUT_LINES, + slots: int = 1, ) -> SyncStatusIndicator: """Return a ``SyncStatusIndicator`` configured for the current session. @@ -555,7 +851,9 @@ def build_indicator( ``output_lines`` controls the live-trail panel above the spinner: ``3`` (default) shows the last 3 streamed lines; ``0`` hides the - panel entirely; ``-1`` is unbounded. + panel entirely; ``-1`` is unbounded. ``slots > 1`` switches the + indicator into multi-row mode for parallel ``vcspull sync --jobs N`` + runs and forces ``output_lines`` to ``0`` internally. """ enabled = human and color != "never" # io.StringIO satisfies the TextIO protocol at runtime; tests use this to @@ -571,4 +869,5 @@ def build_indicator( tty=tty, colors=resolved_colors, output_lines=output_lines, + slots=slots, ) diff --git a/src/vcspull/cli/sync.py b/src/vcspull/cli/sync.py index 6f6b44ebf..e2efb7a1b 100644 --- a/src/vcspull/cli/sync.py +++ b/src/vcspull/cli/sync.py @@ -683,6 +683,19 @@ def create_sync_subparser(parser: argparse.ArgumentParser) -> argparse.ArgumentP action="store_true", help="disable the debug log file entirely", ) + parser.add_argument( + "-j", + "--jobs", + dest="jobs", + type=int, + default=None, + metavar="N", + help=( + "number of repositories to sync in parallel " + f"(default: min({_DEFAULT_MAX_JOBS}, CPU*2); env: VCSPULL_JOBS). " + "Pass 1 to force the legacy serial UX." + ), + ) try: import shtab @@ -703,6 +716,21 @@ class _TimedOutRepo: duration: float +@dataclass +class _ParallelResult: + """Per-repo outcome surfaced by ``_run_parallel_sync_loop_async``. + + ``slot`` is the indicator slot the repo occupied (``-1`` when the + indicator is disabled or the task was cancelled before + ``acquire_slot`` ran). The as-completed loop uses the slot to call + ``release_slot`` with the permanent line. + """ + + repo: ConfigDict + slot: int + outcome: _SyncOutcome + + @dataclass class _SyncOutcome: """Result of attempting to sync a single repository.""" @@ -760,6 +788,37 @@ def _resolve_panel_lines(cli_value: int | None) -> int: return _DEFAULT_PANEL_LINES +#: Cap on the default ``--jobs`` value. The plan-build phase already runs +#: with ``DEFAULT_PLAN_CONCURRENCY = max(1, min(32, CPU * 2))``; for +#: ``sync`` we use the same heuristic but cap at 8 because each worker +#: runs a real ``git fetch`` and bursting much higher hits per-IP rate +#: limits on GitHub. Users with authenticated tokens or local mirrors +#: can override via ``--jobs N`` or ``VCSPULL_JOBS=N``. +_DEFAULT_MAX_JOBS = 8 + + +def _resolve_jobs(cli_jobs: int | None) -> int: + """Resolve the parallel-sync concurrency: CLI flag > env > default. + + Mirrors the precedence shape of :func:`_resolve_repo_timeout`. A value + of ``1`` forces the legacy serial sync path bit-for-bit (same UX as + before this change shipped). Larger values switch the dispatcher into + asyncio-orchestrated parallel mode. + """ + if cli_jobs is not None and cli_jobs > 0: + return cli_jobs + env_value = os.environ.get("VCSPULL_JOBS") + if env_value: + try: + parsed = int(env_value) + except ValueError: + log.warning("Ignoring non-integer VCSPULL_JOBS=%s", env_value) + else: + if parsed > 0: + return parsed + return max(1, min(_DEFAULT_MAX_JOBS, (os.cpu_count() or 4) * 2)) + + class _SyncInterruptedAfterSummary(KeyboardInterrupt): """Internal marker: ``_sync_impl`` already emitted the interrupt summary. @@ -1040,6 +1099,7 @@ def sync( log_file: str | pathlib.Path | None = None, no_log_file: bool = False, panel_lines: int | None = None, + jobs: int | None = None, ) -> None: """Entry point for ``vcspull sync``.""" # Prevent git from blocking on credential prompts during batch sync @@ -1047,6 +1107,7 @@ def sync( repo_timeout = _resolve_repo_timeout(timeout) resolved_panel_lines = _resolve_panel_lines(panel_lines) + resolved_jobs = _resolve_jobs(jobs) # Set up the debug log file early so libvcs's per-repo activity is also # captured. The path is surfaced to the user only when something failed @@ -1093,6 +1154,7 @@ def sync( log_file_path=log_file_path, dry_run=dry_run, panel_lines=resolved_panel_lines, + jobs=resolved_jobs, ) except KeyboardInterrupt as err: # Catch Ctrl-C from ANY phase of the sync -- the repo loop (where @@ -1147,6 +1209,7 @@ def _sync_impl( repo_timeout: int, log_file_path: pathlib.Path | None, panel_lines: int, + jobs: int, ) -> None: """Run the core body of :func:`sync`. @@ -1332,6 +1395,7 @@ def _sync_impl( color=color, colors=colors, output_lines=panel_lines, + slots=jobs, ) progress_callback: ProgressCallback @@ -1362,22 +1426,43 @@ def silent_progress(output: str, timestamp: datetime) -> None: interrupted = False try: - _run_sync_loop( - found_repos=found_repos, - formatter=formatter, - colors=colors, - summary=summary, - timed_out_repos=timed_out_repos, - progress_callback=progress_callback, - is_human=is_human, - repo_timeout=repo_timeout, - exit_on_error=exit_on_error, - include_worktrees=include_worktrees, - dry_run=dry_run, - parser=parser, - log_file_path=log_file_path, - indicator=indicator, - ) + if jobs <= 1: + _run_sync_loop( + found_repos=found_repos, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=timed_out_repos, + progress_callback=progress_callback, + is_human=is_human, + repo_timeout=repo_timeout, + exit_on_error=exit_on_error, + include_worktrees=include_worktrees, + dry_run=dry_run, + parser=parser, + log_file_path=log_file_path, + indicator=indicator, + ) + else: + asyncio.run( + _run_parallel_sync_loop_async( + found_repos=found_repos, + jobs=jobs, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=timed_out_repos, + progress_callback=progress_callback, + is_human=is_human, + repo_timeout=repo_timeout, + exit_on_error=exit_on_error, + include_worktrees=include_worktrees, + dry_run=dry_run, + parser=parser, + log_file_path=log_file_path, + indicator=indicator, + ), + ) except KeyboardInterrupt: # Ctrl-C during the loop: stop the indicator cleanly, print a # partial summary via the formatter, then hand termination @@ -1434,6 +1519,164 @@ def silent_progress(output: str, timestamp: datetime) -> None: formatter.finalize() +def _emit_repo_result( + *, + repo: ConfigDict, + outcome: _SyncOutcome, + formatter: OutputFormatter, + colors: Colors, + summary: dict[str, int], + timed_out_repos: list[_TimedOutRepo], + is_human: bool, + final_writer: Callable[[str | None], bool], +) -> str: + """Update summary, emit event, write permanent line for one outcome. + + ``final_writer`` is the indicator hook -- ``stop_repo`` for the serial + path or a slot-bound ``release_slot`` for the parallel path. It + accepts the permanent line (or ``None``) and returns ``True`` when + the indicator wrote the line itself (caller then skips its own + ``formatter.emit_text``). Returns the outcome's status so the caller + can fork on success / failure / timeout for control flow. + """ + repo_name = str(repo.get("name", "unknown")) + repo_path = repo.get("path", "unknown") + workspace_label = repo.get("workspace_root", "") + display_repo_path = str(PrivatePath(repo_path)) + + summary["total"] = summary.get("total", 0) + 1 + + event: dict[str, t.Any] = { + "reason": "sync", + "name": repo_name, + "path": display_repo_path, + "workspace_root": str(workspace_label), + } + + if outcome.status == "timed_out": + summary["timed_out"] = summary.get("timed_out", 0) + 1 + summary["failed"] = summary.get("failed", 0) + 1 + timed_out_repos.append( + _TimedOutRepo( + name=repo_name, + path=str(repo_path), + workspace_root=str(workspace_label), + duration=outcome.duration, + ), + ) + event["status"] = "timed_out" + event["duration_ms"] = int(outcome.duration * 1000) + if outcome.captured_output: + event["details"] = outcome.captured_output.strip() + permanent = ( + f"{colors.warning('-')} Timed out {colors.info(repo_name)} " + f"after {colors.warning(f'{outcome.duration:.1f}s')} " + f"{colors.muted('->')} {display_repo_path}" + ) + wrote_final = final_writer(permanent if is_human else None) + formatter.emit(event) + if not wrote_final: + formatter.emit_text(permanent) + return "timed_out" + + if outcome.status == "failed": + summary["failed"] = summary.get("failed", 0) + 1 + err = outcome.error + err_msg = str(err) if err is not None else "unknown error" + event["status"] = "error" + event["error"] = err_msg + if outcome.captured_output: + event["details"] = outcome.captured_output.strip() + permanent = ( + f"{colors.error('✗')} Failed syncing {colors.info(repo_name)}: " + f"{colors.error(err_msg)}" + ) + wrote_final = final_writer(permanent if is_human else None) + formatter.emit(event) + if is_human: + log.debug("Failed syncing %s", repo_name) + if log.isEnabledFor(logging.DEBUG) and err is not None: + import traceback + + traceback.print_exception(type(err), err, err.__traceback__) + if not wrote_final: + formatter.emit_text(permanent) + return "failed" + + summary["synced"] = summary.get("synced", 0) + 1 + event["status"] = "synced" + permanent = ( + f"{colors.success('✓')} Synced {colors.info(repo_name)} " + f"{colors.muted('→')} {display_repo_path}" + ) + wrote_final = final_writer(permanent if is_human else None) + formatter.emit(event) + if not wrote_final: + formatter.emit_text(permanent) + return "synced" + + +def _emit_worktree_results( + *, + repo: ConfigDict, + formatter: OutputFormatter, + colors: Colors, + summary: dict[str, int], + dry_run: bool, +) -> int: + """Run worktree sync for ``repo`` (if configured) and emit results. + + Returns the number of worktree errors so callers can fold them into + ``--exit-on-error`` semantics. + """ + worktrees_config = repo.get("worktrees") + if not worktrees_config: + return 0 + workspace_label = repo.get("workspace_root", "") + workspace_path = expand_dir(pathlib.Path(str(workspace_label))) + repo_path_obj = pathlib.Path(str(repo.get("path", ""))) + + wt_result = sync_all_worktrees( + repo_path_obj, + worktrees_config, + workspace_path, + dry_run=dry_run, + ) + for entry in wt_result.entries: + ref_display = f"{entry.ref_type}:{entry.ref_value}" + wt_path_display = str(PrivatePath(entry.worktree_path)) + if entry.action == WorktreeAction.CREATE: + sym = colors.success("+") + ref = colors.info(ref_display) + arrow = colors.muted("→") + formatter.emit_text( + f" {sym} worktree {ref} {arrow} {wt_path_display}", + ) + elif entry.action == WorktreeAction.UPDATE: + sym = colors.warning("~") + ref = colors.info(ref_display) + arrow = colors.muted("→") + formatter.emit_text( + f" {sym} worktree {ref} {arrow} {wt_path_display}", + ) + elif entry.action == WorktreeAction.BLOCKED: + sym = colors.warning("⚠") + ref = colors.info(ref_display) + formatter.emit_text( + f" {sym} worktree {ref} blocked: {entry.detail}", + ) + elif entry.action == WorktreeAction.ERROR: + formatter.emit_text( + f" {colors.error('✗')} worktree {colors.info(ref_display)} " + f"error: {entry.error}", + ) + summary["worktree_created"] = summary.get("worktree_created", 0) + wt_result.created + summary["worktree_updated"] = summary.get("worktree_updated", 0) + wt_result.updated + summary["worktree_failed"] = summary.get("worktree_failed", 0) + wt_result.errors + summary["failed"] = summary.get("failed", 0) + wt_result.errors + return wt_result.errors + + def _run_sync_loop( *, found_repos: list[ConfigDict], @@ -1454,20 +1697,9 @@ def _run_sync_loop( """Iterate the repositories and drive the watchdog + indicator.""" for repo in found_repos: repo_name = repo.get("name", "unknown") - repo_path = repo.get("path", "unknown") - workspace_label = repo.get("workspace_root", "") - display_repo_path = str(PrivatePath(repo_path)) - summary["total"] += 1 indicator.heartbeat() - event: dict[str, t.Any] = { - "reason": "sync", - "name": repo_name, - "path": display_repo_path, - "workspace_root": str(workspace_label), - } - # Manual ``start_repo`` / ``stop_repo`` instead of the # ``with indicator.repo(...)`` context manager: we want # ``stop_repo`` to receive the permanent line so the spinner @@ -1477,7 +1709,7 @@ def _run_sync_loop( # spinner clears, then the formatter writes the permanent line # in a separate stream call. That two-step is the source of the # flicker reporters have called out. - indicator.start_repo(repo_name) + indicator.start_repo(str(repo_name)) try: outcome = _sync_repo_with_watchdog( repo, @@ -1493,32 +1725,18 @@ def _run_sync_loop( indicator.stop_repo() raise - if outcome.status == "timed_out": - summary["timed_out"] += 1 - summary["failed"] += 1 - timed_out_repos.append( - _TimedOutRepo( - name=repo_name, - path=str(repo_path), - workspace_root=str(workspace_label), - duration=outcome.duration, - ), - ) - event["status"] = "timed_out" - event["duration_ms"] = int(outcome.duration * 1000) - if outcome.captured_output: - event["details"] = outcome.captured_output.strip() - permanent = ( - f"{colors.warning('-')} Timed out {colors.info(repo_name)} " - f"after {colors.warning(f'{outcome.duration:.1f}s')} " - f"{colors.muted('->')} {display_repo_path}" - ) - wrote_final = indicator.stop_repo( - final_line=permanent if is_human else None, - ) - formatter.emit(event) - if not wrote_final: - formatter.emit_text(permanent) + status = _emit_repo_result( + repo=repo, + outcome=outcome, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=timed_out_repos, + is_human=is_human, + final_writer=lambda line: indicator.stop_repo(final_line=line), + ) + + if status == "timed_out": if exit_on_error: _emit_rerun_recipe( formatter, @@ -1538,113 +1756,236 @@ def _run_sync_loop( raise SystemExit(EXIT_ON_ERROR_MSG) continue - if outcome.status == "failed": - summary["failed"] += 1 - err = outcome.error - err_msg = str(err) if err is not None else "unknown error" - event["status"] = "error" - event["error"] = err_msg - if outcome.captured_output: - event["details"] = outcome.captured_output.strip() - permanent = ( - f"{colors.error('✗')} Failed syncing {colors.info(repo_name)}: " - f"{colors.error(err_msg)}" - ) - wrote_final = indicator.stop_repo( - final_line=permanent if is_human else None, - ) - formatter.emit(event) - if is_human: - log.debug("Failed syncing %s", repo_name) - if log.isEnabledFor(logging.DEBUG) and err is not None: - import traceback - - traceback.print_exception(type(err), err, err.__traceback__) - if not wrote_final: - formatter.emit_text(permanent) + if status == "failed": if exit_on_error: _emit_summary(formatter, colors, summary) formatter.finalize() if parser is not None: parser.exit(status=1, message=EXIT_ON_ERROR_MSG) - raise SystemExit(EXIT_ON_ERROR_MSG) from err + raise SystemExit(EXIT_ON_ERROR_MSG) from outcome.error continue - summary["synced"] += 1 - event["status"] = "synced" - permanent = ( - f"{colors.success('✓')} Synced {colors.info(repo_name)} " - f"{colors.muted('→')} {display_repo_path}" - ) - wrote_final = indicator.stop_repo( - final_line=permanent if is_human else None, - ) - formatter.emit(event) - if not wrote_final: - formatter.emit_text(permanent) - # Sync worktrees if enabled and configured - worktrees_config = repo.get("worktrees") - if include_worktrees and worktrees_config: - workspace_path = expand_dir(pathlib.Path(str(workspace_label))) - repo_path_obj = pathlib.Path(str(repo_path)) - - wt_result = sync_all_worktrees( - repo_path_obj, - worktrees_config, - workspace_path, + if include_worktrees: + errors = _emit_worktree_results( + repo=repo, + formatter=formatter, + colors=colors, + summary=summary, dry_run=dry_run, ) + if exit_on_error and errors > 0: + _emit_summary(formatter, colors, summary) + formatter.finalize() + if parser is not None: + parser.exit(status=1, message=EXIT_ON_ERROR_MSG) + raise SystemExit(EXIT_ON_ERROR_MSG) - for entry in wt_result.entries: - ref_display = f"{entry.ref_type}:{entry.ref_value}" - wt_path_display = str(PrivatePath(entry.worktree_path)) - if entry.action == WorktreeAction.CREATE: - sym = colors.success("+") - ref = colors.info(ref_display) - arrow = colors.muted("→") - formatter.emit_text( - f" {sym} worktree {ref} {arrow} {wt_path_display}", - ) - elif entry.action == WorktreeAction.UPDATE: - sym = colors.warning("~") - ref = colors.info(ref_display) - arrow = colors.muted("→") - formatter.emit_text( - f" {sym} worktree {ref} {arrow} {wt_path_display}", - ) - elif entry.action == WorktreeAction.BLOCKED: - sym = colors.warning("⚠") - ref = colors.info(ref_display) - formatter.emit_text( - f" {sym} worktree {ref} blocked: {entry.detail}", - ) - elif entry.action == WorktreeAction.ERROR: - formatter.emit_text( - f" {colors.error('✗')} worktree {colors.info(ref_display)} " - f"error: {entry.error}", +async def _run_parallel_sync_loop_async( + *, + found_repos: list[ConfigDict], + jobs: int, + formatter: OutputFormatter, + colors: Colors, + summary: dict[str, int], + timed_out_repos: list[_TimedOutRepo], + progress_callback: ProgressCallback, + is_human: bool, + repo_timeout: int, + exit_on_error: bool, + include_worktrees: bool, + dry_run: bool, + parser: argparse.ArgumentParser | None, + log_file_path: pathlib.Path | None, + indicator: SyncStatusIndicator, +) -> None: + """Iterate the repositories in parallel via asyncio + daemon threads. + + ``asyncio.Semaphore(jobs)`` caps in-flight workers; per-task daemon + threads bridge libvcs's synchronous ``update_repo`` into the loop; + ``asyncio.as_completed`` streams results in completion order. Daemon + threads (instead of ``asyncio.to_thread``) avoid the default + ``ThreadPoolExecutor`` atexit-join footgun documented in + :func:`_sync_repo_with_watchdog` -- a wedged libvcs subprocess at + interpreter shutdown would otherwise hang the process. + """ + semaphore = asyncio.Semaphore(jobs) + loop = asyncio.get_running_loop() + stop_event = asyncio.Event() # set on first failure with --exit-on-error + + def _safe_set_result(fut: asyncio.Future[t.Any], value: t.Any) -> None: + if not fut.done(): + fut.set_result(value) + + def _safe_set_exception(fut: asyncio.Future[t.Any], exc: BaseException) -> None: + if not fut.done(): + fut.set_exception(exc) + + async def _sync_one(repo: ConfigDict) -> _ParallelResult: + repo_name = str(repo.get("name", "unknown")) + if stop_event.is_set(): + return _ParallelResult( + repo=repo, + slot=-1, + outcome=_SyncOutcome( + status="failed", + error=RuntimeError("cancelled before start"), + ), + ) + async with semaphore: + if stop_event.is_set(): + return _ParallelResult( + repo=repo, + slot=-1, + outcome=_SyncOutcome( + status="failed", + error=RuntimeError("cancelled before start"), + ), + ) + slot_idx = indicator.acquire_slot(repo_name) + future: asyncio.Future[_SyncOutcome] = loop.create_future() + + def _slot_progress(output: str, timestamp: datetime) -> None: + # Per-slot progress callback. Falls back to the global + # callback (which usually goes to add_output_line for the + # legacy panel, or silent for JSON / NDJSON) when the + # indicator is disabled and slot_idx is -1. + if slot_idx >= 0: + indicator.update_slot_message(slot_idx, output) + else: + progress_callback(output, timestamp) + + def _worker() -> None: + try: + outcome = _sync_repo_with_watchdog( + repo, + progress_callback=_slot_progress, + timeout=repo_timeout, + is_human=is_human, ) + loop.call_soon_threadsafe(_safe_set_result, future, outcome) + except BaseException as exc: + loop.call_soon_threadsafe(_safe_set_exception, future, exc) + + threading.Thread( + target=_worker, + daemon=True, + name=f"vcspull-sync-{repo_name}", + ).start() + try: + outcome = await future + except BaseException: + # Cancellation before the worker delivered: free the slot + # so other tasks can proceed. The daemon thread continues + # until libvcs's internal timeout fires; harmless. + if slot_idx >= 0: + indicator.release_slot(slot_idx) + raise + return _ParallelResult(repo=repo, slot=slot_idx, outcome=outcome) + + tasks = [ + asyncio.create_task(_sync_one(r), name=f"sync:{r.get('name', 'unknown')}") + for r in found_repos + ] + + exit_on_error_fired = False + try: + for coro in asyncio.as_completed(tasks): + try: + result = await coro + except asyncio.CancelledError: + # Task was cancelled (--exit-on-error or KeyboardInterrupt). + # Drain the rest of as_completed so all tasks finish. + continue + except KeyboardInterrupt: + raise + except BaseException as exc: + # Worker raised something unusual (e.g. SystemExit). Log + # and continue -- per-repo failures are usually represented + # via _SyncOutcome.status='failed', not exception escape. + log.error("Worker exception: %s", exc, exc_info=True) + continue - # Tally worktree results into summary - summary["worktree_created"] = ( - summary.get("worktree_created", 0) + wt_result.created + slot = result.slot + outcome = result.outcome + repo = result.repo + + if slot < 0 and stop_event.is_set(): + # Cancelled before start; don't count toward summary. + continue + + def _release(line: str | None, _slot: int = slot) -> bool: + if _slot < 0: + return False + return indicator.release_slot(_slot, final_line=line) + + status = _emit_repo_result( + repo=repo, + outcome=outcome, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=timed_out_repos, + is_human=is_human, + final_writer=_release, ) - summary["worktree_updated"] = ( - summary.get("worktree_updated", 0) + wt_result.updated + + if status in ("timed_out", "failed"): + if exit_on_error and not exit_on_error_fired: + exit_on_error_fired = True + # ``stop_event`` short-circuits queued tasks (those + # still waiting on the semaphore). In-flight tasks + # are allowed to complete so their output is + # captured and emitted -- mirrors the serial path's + # promise that the user sees results for repos that + # had already started. + stop_event.set() + continue + + # synced -- run worktree sync (sequential across repos; the + # main sync was the parallel hot path). + if include_worktrees: + errors = _emit_worktree_results( + repo=repo, + formatter=formatter, + colors=colors, + summary=summary, + dry_run=dry_run, + ) + if exit_on_error and errors > 0 and not exit_on_error_fired: + exit_on_error_fired = True + stop_event.set() + except (KeyboardInterrupt, asyncio.CancelledError): + stop_event.set() + for task in tasks: + task.cancel() + # Drain so cancellations finish; daemon worker threads are abandoned + # and forcibly terminated when the process exits via _exit_on_sigint. + await asyncio.gather(*tasks, return_exceptions=True) + raise + + if exit_on_error_fired: + # Drain remaining results so cancelled tasks complete cleanly. + await asyncio.gather(*tasks, return_exceptions=True) + # Mirror serial path's exit semantics. + if timed_out_repos: + _emit_rerun_recipe( + formatter, + colors, + timed_out_repos=timed_out_repos, + timeout=repo_timeout, ) - summary["worktree_failed"] = ( - summary.get("worktree_failed", 0) + wt_result.errors + _emit_summary(formatter, colors, summary) + if log_file_path is not None: + formatter.emit_text( + f"{colors.info('->')} Full debug log: " + f"{colors.muted(str(log_file_path))}", ) - # Count worktree errors as failures for exit code - summary["failed"] += wt_result.errors - - if exit_on_error and wt_result.errors > 0: - _emit_summary(formatter, colors, summary) - formatter.finalize() - if parser is not None: - parser.exit(status=1, message=EXIT_ON_ERROR_MSG) - raise SystemExit(EXIT_ON_ERROR_MSG) + formatter.finalize() + if parser is not None: + parser.exit(status=1, message=EXIT_ON_ERROR_MSG) + raise SystemExit(EXIT_ON_ERROR_MSG) def _emit_summary( diff --git a/tests/cli/test_sync_progress.py b/tests/cli/test_sync_progress.py index 6084755ba..4c5ff0df7 100644 --- a/tests/cli/test_sync_progress.py +++ b/tests/cli/test_sync_progress.py @@ -506,3 +506,219 @@ def test_render_tty_skips_stale_tick_when_active_repo_changed() -> None: after = stream.getvalue() # Stale tick: nothing should land on the stream. assert after == before + + +def test_multi_slot_acquires_distinct_indices() -> None: + """Each ``acquire_slot`` returns a different idle index until full.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=False, + slots=4, + ) + + indices = [ + indicator.acquire_slot("alpha"), + indicator.acquire_slot("beta"), + indicator.acquire_slot("gamma"), + ] + assert indices == [0, 1, 2] + + # Release one and re-acquire -- the freed slot is reused. + indicator.release_slot(1) + next_idx = indicator.acquire_slot("delta") + assert next_idx == 1 + + +def test_multi_slot_oversubscription_raises() -> None: + """All slots busy + acquire_slot -> RuntimeError (caller must gate).""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=False, + slots=2, + ) + indicator.acquire_slot("a") + indicator.acquire_slot("b") + with pytest.raises(RuntimeError): + indicator.acquire_slot("c") + + +def test_multi_slot_disabled_acquire_returns_minus_one() -> None: + """A disabled indicator's ``acquire_slot`` returns -1 (no row to update).""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=False, + stream=stream, + tty=False, + slots=4, + ) + assert indicator.acquire_slot("a") == -1 + + +def test_multi_slot_panel_is_force_disabled_above_one_slot() -> None: + """``slots > 1`` forces the live-trail panel off internally.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=4, + output_lines=3, # caller asked for a panel, but multi-slot disables it + ) + assert indicator._panel_buffer.maxlen == 0 + + +def test_multi_slot_release_queues_pending_for_tty() -> None: + """``release_slot(slot, final_line=...)`` queues for the next tick (TTY).""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=2, + ) + slot = indicator.acquire_slot("alpha") + wrote = indicator.release_slot(slot, final_line="✓ Synced alpha") + # The line is queued for the next render tick to scroll into + # scrollback above the active region; caller skips its own + # ``formatter.emit_text``. + assert wrote is True + assert "✓ Synced alpha" in indicator._pending_permanents + + +def test_multi_slot_release_non_tty_returns_false() -> None: + """Non-TTY ``release_slot`` returns False so the caller emits the line.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=False, + slots=2, + ) + slot = indicator.acquire_slot("alpha") + wrote = indicator.release_slot(slot, final_line="✓ Synced alpha") + assert wrote is False + + +def test_render_tty_multi_writes_one_row_per_slot() -> None: + """Active region renders ``slot_count`` rows in one synchronized write.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=3, + ) + indicator.acquire_slot("alpha") + indicator.acquire_slot("beta") + # Third slot intentionally idle. + + indicator._render_tty_multi(frame="⠋") + + output = stream.getvalue() + # Three rows in the active region: alpha, beta, blank idle slot. + assert "Syncing alpha" in output + assert "Syncing beta" in output + # Pending permanents queue is empty (nothing released yet). + assert indicator._pending_permanents == [] + # Synchronized-output bracket wraps the frame. + assert "\x1b[?2026h" in output + assert "\x1b[?2026l" in output + # Active region geometry tracked for the next walk-up. + assert indicator._prev_active_rows == 3 + + +def test_render_tty_multi_drains_pending_permanents() -> None: + """Pending permanents drain at the next tick + scroll into scrollback.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=2, + ) + indicator.acquire_slot("alpha") + indicator.acquire_slot("beta") + indicator._render_tty_multi(frame="⠋") # establish prev_active_rows + stream.truncate(0) + stream.seek(0) + + indicator.release_slot(0, final_line="✓ Synced alpha") + indicator._render_tty_multi(frame="⠙") + + output = stream.getvalue() + # Permanent line is written ABOVE the active region (with \n) so + # it scrolls into scrollback. + assert "✓ Synced alpha" in output + # Drained: no longer queued. + assert indicator._pending_permanents == [] + + +def test_update_slot_message_non_tty_falls_through_to_stream() -> None: + """Non-TTY ``update_slot_message`` writes the chunk to the stream. + + In non-TTY mode the per-slot suffix has no render path, so libvcs + progress lines must surface to the stream so log capture / pipelines + still see them. This is the equivalent of ``add_output_line``'s + non-TTY fallback. + """ + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=False, + slots=4, + ) + slot = indicator.acquire_slot("alpha") + + indicator.update_slot_message(slot, "Already on 'master'\n") + assert "Already on 'master'" in stream.getvalue() + + +def test_update_slot_message_tty_stores_per_slot_suffix() -> None: + """TTY ``update_slot_message`` stores the last line under the slot.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=2, + ) + slot = indicator.acquire_slot("alpha") + indicator.update_slot_message( + slot, + "Receiving objects: 10%\nReceiving objects: 20%\n", + ) + + stored = indicator._slots[slot] + assert stored is not None + assert stored.last_message == "Receiving objects: 20%" + + +def test_close_clears_multi_slot_active_region() -> None: + """``close()`` walks back over the slot rows and emits any pending lines.""" + stream = io.StringIO() + indicator = SyncStatusIndicator( + enabled=True, + stream=stream, + tty=True, + slots=3, + ) + indicator.acquire_slot("alpha") + indicator._render_tty_multi(frame="⠋") # write the active region + indicator.release_slot(0, final_line="✓ Synced alpha") + indicator.close() + + # After close, prev_active_rows is reset and the pending permanent + # surfaces somewhere in the captured stream. + assert indicator._prev_active_rows == 0 + assert "✓ Synced alpha" in stream.getvalue() + + +def test_build_indicator_forwards_slots() -> None: + """``build_indicator(slots=N)`` constructs a multi-slot indicator.""" + indicator = build_indicator(human=True, color="auto", slots=4) + assert indicator._slot_count == 4 diff --git a/tests/cli/test_sync_watchdog.py b/tests/cli/test_sync_watchdog.py index 695cb376e..0a9ce756a 100644 --- a/tests/cli/test_sync_watchdog.py +++ b/tests/cli/test_sync_watchdog.py @@ -474,3 +474,205 @@ def test_resolve_panel_lines_ignores_bogus_env_value( monkeypatch.setenv("VCSPULL_PROGRESS_LINES", "many") assert _resolve_panel_lines(None) == 3 + + +def test_resolve_jobs_prefers_cli_flag( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """CLI ``--jobs N`` should win over the env var and the default.""" + from vcspull.cli.sync import _resolve_jobs + + monkeypatch.setenv("VCSPULL_JOBS", "16") + assert _resolve_jobs(4) == 4 + + +def test_resolve_jobs_falls_back_to_env_var( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Without a CLI flag, ``VCSPULL_JOBS`` takes over.""" + from vcspull.cli.sync import _resolve_jobs + + monkeypatch.setenv("VCSPULL_JOBS", "12") + assert _resolve_jobs(None) == 12 + + +def test_resolve_jobs_uses_default_when_unset( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Default is ``min(8, CPU*2)`` when neither flag nor env is set.""" + from vcspull.cli.sync import _DEFAULT_MAX_JOBS, _resolve_jobs + + monkeypatch.delenv("VCSPULL_JOBS", raising=False) + resolved = _resolve_jobs(None) + assert 1 <= resolved <= _DEFAULT_MAX_JOBS + + +def test_resolve_jobs_ignores_bogus_env_value( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A non-integer env value is logged and ignored; default applies.""" + from vcspull.cli.sync import _DEFAULT_MAX_JOBS, _resolve_jobs + + monkeypatch.setenv("VCSPULL_JOBS", "all-of-them") + resolved = _resolve_jobs(None) + assert 1 <= resolved <= _DEFAULT_MAX_JOBS + + +def test_resolve_jobs_ignores_zero_and_negative( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Non-positive flag values fall through to env / default.""" + from vcspull.cli.sync import _resolve_jobs + + monkeypatch.setenv("VCSPULL_JOBS", "5") + assert _resolve_jobs(0) == 5 + assert _resolve_jobs(-1) == 5 + + +def test_run_parallel_sync_loop_async_dispatches_each_repo_once( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``jobs=4`` against 10 fake repos -- every repo gets one sync event. + + Mocks ``update_repo`` to a near-no-op so the test exercises the + asyncio orchestrator (``Semaphore`` cap, ``as_completed`` streaming) + end-to-end without touching real VCS state. The semaphore is the + only thing keeping in-flight count bounded; we assert via summary + counters that each repo ran exactly once. + """ + import asyncio + + from vcspull.cli._progress import build_indicator + from vcspull.cli.sync import _run_parallel_sync_loop_async + + repos: list[dict[str, t.Any]] = [ + { + "name": f"repo-{i}", + "path": f"/tmp/vcspull-test/repo-{i}", + "url": f"git+file:///tmp/fake/{i}", + "vcs": "git", + "workspace_root": "/tmp/vcspull-test/", + } + for i in range(10) + ] + call_count = {"n": 0} + + def fake_update_repo(repo: dict[str, t.Any], **_: t.Any) -> None: + call_count["n"] += 1 + # Simulate a tiny amount of subprocess time so workers + # actually overlap in flight under the semaphore. + time.sleep(0.005) + + monkeypatch.setattr(sync_module, "update_repo", fake_update_repo) + + formatter = OutputFormatter(OutputMode.NDJSON) + colors = Colors(ColorMode.NEVER) + indicator = build_indicator(human=False, color="never", slots=4) + summary: dict[str, int] = { + "total": 0, + "synced": 0, + "previewed": 0, + "failed": 0, + "timed_out": 0, + "unmatched": 0, + } + timed_out: list[_TimedOutRepo] = [] + + asyncio.run( + _run_parallel_sync_loop_async( + found_repos=t.cast("t.Any", repos), + jobs=4, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=timed_out, + progress_callback=_noop_progress, + is_human=False, + repo_timeout=10, + exit_on_error=False, + include_worktrees=False, + dry_run=False, + parser=None, + log_file_path=None, + indicator=indicator, + ), + ) + indicator.close() + + assert call_count["n"] == 10 + assert summary["total"] == 10 + assert summary["synced"] == 10 + assert summary["failed"] == 0 + + +def test_run_parallel_sync_loop_async_respects_semaphore( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Concurrent in-flight count never exceeds ``jobs``.""" + import asyncio + import threading + + from vcspull.cli._progress import build_indicator + from vcspull.cli.sync import _run_parallel_sync_loop_async + + in_flight = 0 + peak = 0 + lock = threading.Lock() + + def fake_update_repo(repo: dict[str, t.Any], **_: t.Any) -> None: + nonlocal in_flight, peak + with lock: + in_flight += 1 + peak = max(peak, in_flight) + time.sleep(0.02) + with lock: + in_flight -= 1 + + monkeypatch.setattr(sync_module, "update_repo", fake_update_repo) + + repos: list[dict[str, t.Any]] = [ + { + "name": f"r-{i}", + "path": f"/tmp/vcspull-test/r-{i}", + "url": f"git+file:///tmp/fake/{i}", + "vcs": "git", + "workspace_root": "/tmp/vcspull-test/", + } + for i in range(20) + ] + + formatter = OutputFormatter(OutputMode.NDJSON) + colors = Colors(ColorMode.NEVER) + indicator = build_indicator(human=False, color="never", slots=3) + summary: dict[str, int] = { + "total": 0, + "synced": 0, + "previewed": 0, + "failed": 0, + "timed_out": 0, + "unmatched": 0, + } + + asyncio.run( + _run_parallel_sync_loop_async( + found_repos=t.cast("t.Any", repos), + jobs=3, + formatter=formatter, + colors=colors, + summary=summary, + timed_out_repos=[], + progress_callback=_noop_progress, + is_human=False, + repo_timeout=10, + exit_on_error=False, + include_worktrees=False, + dry_run=False, + parser=None, + log_file_path=None, + indicator=indicator, + ), + ) + indicator.close() + + assert peak <= 3 + assert summary["synced"] == 20 diff --git a/tests/conftest.py b/tests/conftest.py index 95746e852..2429888b8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,3 +30,17 @@ def snapshot_json(snapshot: SnapshotAssertion) -> SnapshotAssertion: def snapshot_yaml(snapshot: SnapshotAssertion) -> SnapshotAssertion: """YAML-formatted snapshot assertions.""" return snapshot.with_defaults(extension_class=YamlSnapshotExtension) + + +@pytest.fixture(autouse=True) +def _default_serial_jobs(monkeypatch: pytest.MonkeyPatch) -> None: + """Default ``vcspull sync`` to serial mode in tests. + + Production defaults to ``--jobs min(8, CPU*2)`` for batch speedups, + but most existing tests assert order-dependent behaviour (e.g. + ``--exit-on-error`` fixtures that pair a "good first" repo with a + "bad second" one). Pinning ``VCSPULL_JOBS=1`` here gives those + tests deterministic ordering; new tests that exercise the parallel + orchestrator override the env var inside their own scope. + """ + monkeypatch.setenv("VCSPULL_JOBS", "1")