diff --git a/fastapi_startkit/src/fastapi_startkit/process/__init__.py b/fastapi_startkit/src/fastapi_startkit/process/__init__.py new file mode 100644 index 00000000..ec70d28b --- /dev/null +++ b/fastapi_startkit/src/fastapi_startkit/process/__init__.py @@ -0,0 +1,4 @@ +from .process import Process +from .result import ProcessResult + +__all__ = ["Process", "ProcessResult"] diff --git a/fastapi_startkit/src/fastapi_startkit/process/exception.py b/fastapi_startkit/src/fastapi_startkit/process/exception.py new file mode 100644 index 00000000..1b12a5ac --- /dev/null +++ b/fastapi_startkit/src/fastapi_startkit/process/exception.py @@ -0,0 +1,12 @@ +class ProcessFailedException(Exception): + def __init__(self, result): + self.result = result + super().__init__( + f"Process [{result.command()}] failed with exit code {result.exit_code()}.\n{result.error_output()}" + ) + + +class ProcessTimedOutException(Exception): + def __init__(self, command): + self.command = command + super().__init__(f"Process [{command}] timed out.") diff --git a/fastapi_startkit/src/fastapi_startkit/process/fake.py b/fastapi_startkit/src/fastapi_startkit/process/fake.py new file mode 100644 index 00000000..5a42547b --- /dev/null +++ b/fastapi_startkit/src/fastapi_startkit/process/fake.py @@ -0,0 +1,31 @@ +from .result import ProcessResult + + +class FakeProcessDescription: + def __init__(self) -> None: + self._output_sequences: list[tuple[str, str]] = [] + self._exit_code: int = 0 + + def output(self, text: str) -> "FakeProcessDescription": + """Add a stdout line to the fake output.""" + self._output_sequences.append(("stdout", text)) + return self + + def error_output(self, text: str) -> "FakeProcessDescription": + """Add a stderr line to the fake output.""" + self._output_sequences.append(("stderr", text)) + return self + + def exit_code(self, code: int) -> "FakeProcessDescription": + self._exit_code = code + return self + + def to_result(self, command: str) -> "ProcessResult": + stdout_lines = [t for kind, t in self._output_sequences if kind == "stdout"] + stderr_lines = [t for kind, t in self._output_sequences if kind == "stderr"] + return ProcessResult( + stdout="\n".join(stdout_lines), + stderr="\n".join(stderr_lines), + returncode=self._exit_code, + args=command, + ) diff --git a/fastapi_startkit/src/fastapi_startkit/process/process.py b/fastapi_startkit/src/fastapi_startkit/process/process.py new file mode 100644 index 00000000..f08866ff --- /dev/null +++ b/fastapi_startkit/src/fastapi_startkit/process/process.py @@ -0,0 +1,680 @@ +from __future__ import annotations + +import asyncio +import os +import subprocess +import threading +import time +from typing import Callable, Optional, Union + +from .exception import ProcessFailedException, ProcessTimedOutException # noqa: F401 +from .fake import FakeProcessDescription +from .result import ProcessResult + + +# --------------------------------------------------------------------------- +# InvokedProcess — returned by PendingProcess.start() +# --------------------------------------------------------------------------- + + +class InvokedProcess: + def __init__( + self, + process: subprocess.Popen, + timeout: float | None = None, + callback: Callable | None = None, + ) -> None: + self._process = process + self._timeout = timeout + self._callback = callback + self._timed_out = False + self._start_time = time.monotonic() + self._stdout_buf: list[str] = [] + self._stderr_buf: list[str] = [] + self._stdout_thread: Optional[threading.Thread] = None + self._stderr_thread: Optional[threading.Thread] = None + self._start_reader_threads() + + def _start_reader_threads(self) -> None: + def read(pipe, kind, buf): + for line in iter(pipe.readline, ""): + buf.append(line) + if self._callback: + self._callback(kind, line) + + if self._process.stdout: + t = threading.Thread( + target=read, + args=(self._process.stdout, "stdout", self._stdout_buf), + daemon=True, + ) + t.start() + self._stdout_thread = t + + if self._process.stderr: + t = threading.Thread( + target=read, + args=(self._process.stderr, "stderr", self._stderr_buf), + daemon=True, + ) + t.start() + self._stderr_thread = t + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def running(self) -> bool: + return self._process.poll() is None + + def pid(self) -> int: + return self._process.pid + + def signal(self, sig: int) -> "InvokedProcess": + """Send a signal to the process.""" + self._process.send_signal(sig) + return self + + def kill(self) -> "InvokedProcess": + """Kill the process immediately.""" + self._process.kill() + return self + + def ensure_not_timed_out(self) -> "InvokedProcess": + """Raise ProcessTimedOutException if the process has exceeded its timeout.""" + if self._timed_out: + raise ProcessTimedOutException(self._process.args) + + if self._timeout is not None: + elapsed = time.monotonic() - self._start_time + if elapsed >= self._timeout: + self._timed_out = True + self._process.kill() + raise ProcessTimedOutException(self._process.args) + + return self + + def wait(self) -> ProcessResult: + """Block until the process finishes and return a ProcessResult.""" + try: + self._process.wait(timeout=self._timeout) + except subprocess.TimeoutExpired: + self._timed_out = True + self._process.kill() + self._process.wait() + raise ProcessTimedOutException(self._process.args) + + for t in (self._stdout_thread, self._stderr_thread): + if t: + t.join() + + return ProcessResult( + stdout="".join(self._stdout_buf), + stderr="".join(self._stderr_buf), + returncode=self._process.returncode, + args=self._process.args, + ) + + +# --------------------------------------------------------------------------- +# Pipe — used with Process.pipe(...) +# --------------------------------------------------------------------------- + + +class Pipe: + def __init__(self) -> None: + self._commands: list[str] = [] + + def command(self, cmd: str) -> "Pipe": + self._commands.append(cmd) + return self + + def to_command(self) -> str: + return " | ".join(self._commands) + + +# --------------------------------------------------------------------------- +# PoolResults +# --------------------------------------------------------------------------- + + +class PoolResults: + def __init__(self, results: list[ProcessResult]) -> None: + self._results = results + + def __getitem__(self, index: int) -> ProcessResult: + return self._results[index] + + def __iter__(self): + return iter(self._results) + + def __len__(self) -> int: + return len(self._results) + + def successful(self) -> bool: + return all(r.successful() for r in self._results) + + def failed(self) -> bool: + return not self.successful() + + +# --------------------------------------------------------------------------- +# Pool — used with Process.pool(...) +# --------------------------------------------------------------------------- + + +class _PoolEntry: + """Fluent builder for a single command inside a Pool.""" + + def __init__(self, pool: "Pool") -> None: + self._pool = pool + self._command: str | None = None + self._path: str | None = None + + def path(self, path: str) -> "_PoolEntry": + self._path = path + return self + + def command(self, cmd: str) -> "Pool": + self._command = cmd + self._pool._entries.append(self) + return self._pool + + def _cwd(self) -> str | None: + return self._path + + +class Pool: + def __init__(self, env: dict | None = None, timeout: float | None = None) -> None: + self._entries: list[_PoolEntry] = [] + self._env = env + self._timeout = timeout + self._invoked: list[InvokedProcess] = [] + + def path(self, path: str) -> _PoolEntry: + """Begin a pool entry, setting its working directory.""" + entry = _PoolEntry(self) + entry._path = path + return entry + + def command(self, cmd: str) -> "Pool": + """Add a command directly (no custom path).""" + entry = _PoolEntry(self) + entry._command = cmd + self._entries.append(entry) + return self + + def start(self, callback: Callable | None = None) -> "Pool": + """Start all pooled processes concurrently.""" + for i, entry in enumerate(self._entries): + command: str = entry._command or "" + proc = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=entry._cwd(), + env=self._env, + ) + + def make_cb(index): + if callback is None: + return None + + def _cb(kind, output): + callback(kind, output, index) + + return _cb + + self._invoked.append(InvokedProcess(proc, timeout=self._timeout, callback=make_cb(i))) + return self + + def running(self) -> list[InvokedProcess]: + """Return currently running InvokedProcesses.""" + return [p for p in self._invoked if p.running()] + + def wait(self) -> PoolResults: + """Wait for all processes and return PoolResults.""" + return PoolResults([p.wait() for p in self._invoked]) + + +# --------------------------------------------------------------------------- +# ProcessFake — testing infrastructure +# --------------------------------------------------------------------------- + + +class ProcessFake: + def __init__(self) -> None: + self._fakes: dict[str, Union[FakeProcessDescription, ProcessResult]] = {} + self._recorded: list[tuple] = [] # (command, pending, result) + + def _handle(self, command: str, pending: "PendingProcess") -> ProcessResult: + result = self._resolve(command) + self._recorded.append((command, pending, result)) + return result + + def _resolve(self, command: str) -> ProcessResult: + # Exact match first + if command in self._fakes: + fake = self._fakes[command] + return fake.to_result(command) if isinstance(fake, FakeProcessDescription) else fake + + # Wildcard + if "*" in self._fakes: + fake = self._fakes["*"] + return fake.to_result(command) if isinstance(fake, FakeProcessDescription) else fake + + # Default: successful empty result + return ProcessResult(stdout="", stderr="", returncode=0, args=command) + + # ------------------------------------------------------------------ + # Assertions + # ------------------------------------------------------------------ + + def assert_ran(self, command_or_callback) -> None: + """Assert a command was run. Accepts a string or an inspector callable.""" + if callable(command_or_callback): + for cmd, pending, result in self._recorded: + if command_or_callback(pending, result): + return + raise AssertionError("No process matching the given callback was run.") + + ran = [cmd for cmd, _, __ in self._recorded] + assert command_or_callback in ran, f"Process [{command_or_callback}] was not run.\nRan: {ran}" + + def assert_not_ran(self, command: str) -> None: + ran = [cmd for cmd, _, __ in self._recorded] + assert command not in ran, f"Process [{command}] was unexpectedly run." + + def assert_ran_times(self, command: str, times: int) -> None: + count = sum(1 for cmd, _, __ in self._recorded if cmd == command) + assert count == times, f"Process [{command}] expected to run {times} time(s) but ran {count} time(s)." + + def assert_nothing_ran(self) -> None: + assert not self._recorded, f"Unexpected processes were run: {[cmd for cmd, _, __ in self._recorded]}" + + +# --------------------------------------------------------------------------- +# PendingProcess — fluent builder +# --------------------------------------------------------------------------- + + +class PendingProcess: + def __init__(self, fake: ProcessFake | None = None) -> None: + self._fake = fake + self._timeout: float | None = 60 + self._quiet = False + self._tty = False + self._env: dict | None = None + self._cwd: str | None = None + self._input: str | None = None + + # ------------------------------------------------------------------ + # Fluent configuration + # ------------------------------------------------------------------ + + def timeout(self, seconds: float) -> "PendingProcess": + self._timeout = seconds + return self + + def forever(self) -> "PendingProcess": + """Disable timeout.""" + self._timeout = None + return self + + def quietly(self) -> "PendingProcess": + """Discard all output (stdout + stderr).""" + self._quiet = True + return self + + def tty(self, enabled: bool = True) -> "PendingProcess": + """Allocate a TTY — passes stdin/stdout/stderr through to the terminal.""" + self._tty = enabled + return self + + def env(self, env: dict) -> "PendingProcess": + self._env = {**os.environ, **env} + return self + + def path(self, cwd: str) -> "PendingProcess": + self._cwd = cwd + return self + + def input(self, data: str) -> "PendingProcess": + """Pipe a string into the process stdin.""" + self._input = data + return self + + # ------------------------------------------------------------------ + # Async execution (primary API) + # ------------------------------------------------------------------ + + async def run(self, command: str, callback: Callable | None = None) -> ProcessResult: + """Run a process asynchronously via asyncio and return a ProcessResult.""" + if self._fake is not None: + return self._fake._handle(command, self) + + if self._tty: + # TTY mode: pass through to terminal, no capture + result = subprocess.run( + command, + shell=True, + cwd=self._cwd, + env=self._env, + timeout=self._timeout, + ) + return ProcessResult( + stdout="", + stderr="", + returncode=result.returncode, + args=command, + ) + + stdout_pipe = asyncio.subprocess.DEVNULL if self._quiet else asyncio.subprocess.PIPE + stderr_pipe = asyncio.subprocess.DEVNULL if self._quiet else asyncio.subprocess.PIPE + stdin_pipe = asyncio.subprocess.PIPE if self._input else None + + proc = await asyncio.create_subprocess_shell( + command, + stdout=stdout_pipe, + stderr=stderr_pipe, + stdin=stdin_pipe, + cwd=self._cwd, + env=self._env, + ) + + try: + stdin_bytes = self._input.encode() if self._input else None + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(input=stdin_bytes), + timeout=self._timeout, + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise ProcessTimedOutException(command) + + return ProcessResult( + stdout=(stdout_bytes.decode() if stdout_bytes else "") if not self._quiet else "", + stderr=(stderr_bytes.decode() if stderr_bytes else "") if not self._quiet else "", + returncode=proc.returncode or 0, + args=command, + ) + + async def pipe( + self, + callback: Callable[["Pipe"], None], + output_callback: Callable | None = None, + ) -> ProcessResult: + """Build a pipeline of commands and run them asynchronously.""" + p = Pipe() + callback(p) + return await self.run(p.to_command(), output_callback) + + # ------------------------------------------------------------------ + # Sync execution (for scripts / CLI without an event loop) + # ------------------------------------------------------------------ + + def run_sync(self, command: str, callback: Callable | None = None) -> ProcessResult: + """Run a process synchronously and return a ProcessResult.""" + if self._fake is not None: + return self._fake._handle(command, self) + + if self._tty: + result = subprocess.run( + command, + shell=True, + cwd=self._cwd, + env=self._env, + timeout=self._timeout, + ) + return ProcessResult( + stdout="", + stderr="", + returncode=result.returncode, + args=command, + ) + + if self._quiet: + result = subprocess.run( + command, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + cwd=self._cwd, + env=self._env, + input=self._input, + timeout=self._timeout, + ) + return ProcessResult( + stdout="", + stderr="", + returncode=result.returncode, + args=command, + ) + + if callback is not None: + # Stream output through callback then return the final result + return self.start(command, callback).wait() + + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + cwd=self._cwd, + env=self._env, + input=self._input, + timeout=self._timeout, + ) + except subprocess.TimeoutExpired: + raise ProcessTimedOutException(command) + + return ProcessResult( + stdout=result.stdout or "", + stderr=result.stderr or "", + returncode=result.returncode, + args=command, + ) + + def pipe_sync( + self, + callback: Callable[["Pipe"], None], + output_callback: Callable | None = None, + ) -> ProcessResult: + """Build a pipeline of commands and run them synchronously.""" + p = Pipe() + callback(p) + return self.run_sync(p.to_command(), output_callback) + + # ------------------------------------------------------------------ + # Background execution + # ------------------------------------------------------------------ + + def start(self, command: str, callback: Callable | None = None) -> InvokedProcess: + """Start a process in the background and return an InvokedProcess.""" + if self._fake is not None: + raise NotImplementedError("Fake background processes are not yet supported. Use run() in tests.") + + proc = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=self._cwd, + env=self._env, + stdin=subprocess.PIPE if self._input else subprocess.DEVNULL, + ) + + if self._input and proc.stdin: + proc.stdin.write(self._input) + proc.stdin.close() + + return InvokedProcess(proc, timeout=self._timeout, callback=callback) + + def pool(self, callback: Callable[["Pool"], None]) -> Pool: + """Build a pool of concurrent processes.""" + pl = Pool(env=self._env, timeout=self._timeout) + callback(pl) + return pl + + +# --------------------------------------------------------------------------- +# Process facade +# --------------------------------------------------------------------------- + + +class Process: + """ + Facade for running shell commands. + + Primary API — async (for use inside FastAPI request handlers): + result = await Process.run('ls -la') + result = await Process.timeout(30).run('bash script.sh') + result = await Process.forever().quietly().run('bash import.sh') + + Sync API — for scripts / CLI without an event loop: + result = Process.run_sync('ls -la') + result = Process.timeout(30).run_sync('bash script.sh') + + Background execution: + process = Process.start('bash long.sh', callback=print) + while process.running(): + process.ensure_not_timed_out() + time.sleep(1) + result = process.wait() + + Pipelines: + result = await Process.pipe(lambda p: (p.command('cat f.txt'), p.command('grep foo'))) + + Pools (concurrent): + pool = Process.pool(lambda p: ( + p.command('bash job1.sh'), + p.command('bash job2.sh'), + )).start(lambda kind, output, i: print(f"[{i}] {output}")) + results = pool.wait() + + Testing: + fake = Process.fake({'bash import.sh': Process.describe().output('ok').exit_code(0)}) + await Process.run('bash import.sh') + fake.assert_ran('bash import.sh') + Process.reset_fake() + """ + + _fake: ProcessFake | None = None + + # ------------------------------------------------------------------ + # Fake / testing + # ------------------------------------------------------------------ + + @classmethod + def fake( + cls, + fakes: dict[str, Union[FakeProcessDescription, ProcessResult]] | None = None, + ) -> ProcessFake: + """Enable fake mode. Optionally supply per-command fakes.""" + fake = ProcessFake() + if fakes: + for pattern, desc in fakes.items(): + fake._fakes[pattern] = desc + cls._fake = fake + return fake + + @classmethod + def reset_fake(cls) -> None: + """Disable fake mode (call this in test teardown).""" + cls._fake = None + + @classmethod + def describe(cls) -> FakeProcessDescription: + """Create a FakeProcessDescription for use with Process.fake({...}).""" + return FakeProcessDescription() + + # ------------------------------------------------------------------ + # Fluent configuration — each returns a PendingProcess + # ------------------------------------------------------------------ + + @classmethod + def _pending(cls) -> PendingProcess: + return PendingProcess(fake=cls._fake) + + @classmethod + def timeout(cls, seconds: float) -> PendingProcess: + return cls._pending().timeout(seconds) + + @classmethod + def forever(cls) -> PendingProcess: + return cls._pending().forever() + + @classmethod + def quietly(cls) -> PendingProcess: + return cls._pending().quietly() + + @classmethod + def tty(cls, enabled: bool = True) -> PendingProcess: + return cls._pending().tty(enabled) + + @classmethod + def env(cls, env: dict) -> PendingProcess: + return cls._pending().env(env) + + @classmethod + def path(cls, cwd: str) -> PendingProcess: + return cls._pending().path(cwd) + + @classmethod + def input(cls, data: str) -> PendingProcess: + return cls._pending().input(data) + + # ------------------------------------------------------------------ + # Async execution shortcuts (primary API) + # ------------------------------------------------------------------ + + @classmethod + async def run(cls, command: str, callback: Callable | None = None) -> ProcessResult: + """Run a command asynchronously (primary API).""" + return await cls._pending().run(command, callback) + + @classmethod + async def pipe( + cls, + callback: Callable[["Pipe"], None], + output_callback: Callable | None = None, + ) -> ProcessResult: + """Build a pipeline of commands and run them asynchronously.""" + return await cls._pending().pipe(callback, output_callback) + + # ------------------------------------------------------------------ + # Sync execution shortcuts (for scripts / CLI) + # ------------------------------------------------------------------ + + @classmethod + def run_sync(cls, command: str, callback: Callable | None = None) -> ProcessResult: + """Run a command synchronously (for scripts / CLI without an event loop).""" + return cls._pending().run_sync(command, callback) + + @classmethod + def pipe_sync( + cls, + callback: Callable[["Pipe"], None], + output_callback: Callable | None = None, + ) -> ProcessResult: + """Build a pipeline of commands and run them synchronously.""" + return cls._pending().pipe_sync(callback, output_callback) + + # ------------------------------------------------------------------ + # Background execution + # ------------------------------------------------------------------ + + @classmethod + def start(cls, command: str, callback: Callable | None = None) -> InvokedProcess: + """Start a command in the background, returning an InvokedProcess.""" + return cls._pending().start(command, callback) + + @classmethod + def pool(cls, callback: Callable[["Pool"], None]) -> Pool: + """Build a pool of concurrent processes.""" + return cls._pending().pool(callback) diff --git a/fastapi_startkit/src/fastapi_startkit/process/result.py b/fastapi_startkit/src/fastapi_startkit/process/result.py new file mode 100644 index 00000000..880956dd --- /dev/null +++ b/fastapi_startkit/src/fastapi_startkit/process/result.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import json +from typing import Any + +from .exception import ProcessFailedException + + +class ProcessResult: + def __init__( + self, + stdout: str = "", + stderr: str = "", + returncode: int = 0, + args: str = "", + ) -> None: + self._stdout = stdout + self._stderr = stderr + self._returncode = returncode + self._args = args + + def command(self) -> str: + return self._args + + def successful(self) -> bool: + return self._returncode == 0 + + def failed(self) -> bool: + return self._returncode != 0 + + def output(self) -> str: + return self._stdout or "" + + def error_output(self) -> str: + return self._stderr or "" + + def error(self) -> str: + """Alias for error_output() — stderr as a string.""" + return self.error_output() + + def exit_code(self) -> int: + return self._returncode + + def output_json(self) -> Any: + """Parse stdout as JSON.""" + return json.loads(self._stdout) + + def throw(self) -> "ProcessResult": + """Raise ProcessFailedException if the process failed.""" + if self.failed(): + raise ProcessFailedException(self) + return self + + def throw_if(self, condition: Any) -> "ProcessResult": + """Raise ProcessFailedException if condition is truthy.""" + if condition: + self.throw() + return self + + def __repr__(self) -> str: + return f"ProcessResult(exit_code={self._returncode},output={self._stdout!r})" diff --git a/fastapi_startkit/tests/process/__init__.py b/fastapi_startkit/tests/process/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fastapi_startkit/tests/process/test_pending_process.py b/fastapi_startkit/tests/process/test_pending_process.py new file mode 100644 index 00000000..050a2164 --- /dev/null +++ b/fastapi_startkit/tests/process/test_pending_process.py @@ -0,0 +1,300 @@ +"""Tests for PendingProcess fluent builder methods.""" + +import pytest + +from fastapi_startkit.process.exception import ProcessTimedOutException +from fastapi_startkit.process.process import InvokedProcess, PendingProcess, ProcessFake +from fastapi_startkit.process.result import ProcessResult + + +@pytest.fixture +def pending(): + """Fresh PendingProcess with no fake.""" + return PendingProcess() + + +@pytest.fixture +def fake_pending(): + """PendingProcess wired to a ProcessFake.""" + fake = ProcessFake() + return PendingProcess(fake=fake), fake + + +# --------------------------------------------------------------------------- +# Fluent setters +# --------------------------------------------------------------------------- + + +class TestFluentSetters: + def test_timeout_sets_value(self, pending): + result = pending.timeout(42) + assert result is pending + assert pending._timeout == 42 + + def test_forever_sets_none(self, pending): + result = pending.forever() + assert result is pending + assert pending._timeout is None + + def test_quietly_sets_flag(self, pending): + result = pending.quietly() + assert result is pending + assert pending._quiet is True + + def test_tty_sets_flag(self, pending): + result = pending.tty() + assert result is pending + assert pending._tty is True + + def test_tty_can_be_disabled(self, pending): + pending.tty(True) + result = pending.tty(False) + assert result is pending + assert pending._tty is False + + def test_env_merges_with_os_environ(self, pending): + result = pending.env({"FOO": "BAR"}) + assert result is pending + assert pending._env["FOO"] == "BAR" + assert "PATH" in pending._env # os.environ keys still present + + def test_env_allows_override_of_existing_key(self, pending): + result = pending.env({"PATH": "/custom/path"}) + assert result is pending + assert pending._env["PATH"] == "/custom/path" + + def test_path_sets_cwd(self, pending, tmp_path): + result = pending.path(str(tmp_path)) + assert result is pending + assert pending._cwd == str(tmp_path) + + def test_input_sets_stdin_data(self, pending): + result = pending.input("some input data") + assert result is pending + assert pending._input == "some input data" + + def test_chaining_multiple_setters(self, pending, tmp_path): + result = pending.timeout(10).quietly().path(str(tmp_path)).env({"MY": "VAR"}) + assert result is pending + assert pending._timeout == 10 + assert pending._quiet is True + assert pending._cwd == str(tmp_path) + assert pending._env["MY"] == "VAR" + + +# --------------------------------------------------------------------------- +# run() — async, real subprocess +# --------------------------------------------------------------------------- + + +class TestPendingProcessRun: + async def test_run_returns_process_result(self, pending): + result = await pending.run("echo hi") + assert isinstance(result, ProcessResult) + + async def test_run_captures_stdout(self, pending): + result = await pending.run("echo hello world") + assert "hello world" in result.output() + + async def test_run_captures_stderr(self, pending): + result = await pending.run("ls /nonexistent_path_xyz_abc_123") + assert result.failed() is True + + async def test_run_with_cwd(self, tmp_path): + (tmp_path / "myfile.txt").write_text("content") + p = PendingProcess() + p.path(str(tmp_path)) + result = await p.run("ls myfile.txt") + assert result.successful() is True + assert "myfile.txt" in result.output() + + async def test_run_with_env_variable(self, pending): + pending.env({"TESTVAR": "testvalue"}) + result = await pending.run("echo $TESTVAR") + assert "testvalue" in result.output() + + async def test_run_quietly_discards_output(self, pending): + pending.quietly() + result = await pending.run("echo should not appear") + assert result.output() == "" + assert result.successful() is True + + async def test_run_quietly_discards_stderr(self, pending): + pending.quietly() + result = await pending.run("ls /nonexistent_path_xyz_abc_123") + assert result.error_output() == "" + assert result.failed() is True + + async def test_run_with_input(self, pending): + pending.input("hello from stdin\n") + result = await pending.run("cat") + assert "hello from stdin" in result.output() + + async def test_run_timeout_raises(self, pending): + pending.timeout(0.1) + with pytest.raises(ProcessTimedOutException): + await pending.run("sleep 5") + + async def test_run_forever_completes(self, pending): + pending.forever() + result = await pending.run("echo quick") + assert result.successful() is True + + +# --------------------------------------------------------------------------- +# run() — fake mode +# --------------------------------------------------------------------------- + + +class TestPendingProcessFake: + async def test_run_uses_fake_when_set(self, fake_pending): + p, fake = fake_pending + result = await p.run("echo hello") + assert isinstance(result, ProcessResult) + assert result.successful() is True + + async def test_run_records_command_in_fake(self, fake_pending): + p, fake = fake_pending + await p.run("echo hello") + fake.assert_ran("echo hello") + + async def test_run_uses_registered_fake_result(self): + from fastapi_startkit.process.process import Process + + try: + Process.fake({"my command": Process.describe().output("faked output").exit_code(0)}) + result = await Process.run("my command") + assert result.output() == "faked output" + finally: + Process.reset_fake() + + +# --------------------------------------------------------------------------- +# run_sync() — synchronous execution +# --------------------------------------------------------------------------- + + +class TestPendingProcessRunSync: + def test_run_sync_returns_process_result(self, pending): + result = pending.run_sync("echo hi") + assert isinstance(result, ProcessResult) + + def test_run_sync_captures_stdout(self, pending): + result = pending.run_sync("echo hello world") + assert "hello world" in result.output() + + def test_run_sync_uses_fake_when_set(self, fake_pending): + p, fake = fake_pending + result = p.run_sync("echo hello") + assert isinstance(result, ProcessResult) + assert result.successful() is True + + def test_run_sync_timeout_raises(self, pending): + pending.timeout(0.1) + with pytest.raises(ProcessTimedOutException): + pending.run_sync("sleep 5") + + def test_run_sync_quietly_discards_output(self, pending): + pending.quietly() + result = pending.run_sync("echo quiet") + assert result.output() == "" + assert result.successful() is True + + def test_run_sync_with_input(self, pending): + pending.input("hello sync\n") + result = pending.run_sync("cat") + assert "hello sync" in result.output() + + +# --------------------------------------------------------------------------- +# start() — background invocation +# --------------------------------------------------------------------------- + + +class TestPendingProcessStart: + def test_start_returns_invoked_process(self, pending): + invoked = pending.start("echo hello") + assert isinstance(invoked, InvokedProcess) + result = invoked.wait() + assert result.successful() is True + assert "hello" in result.output() + + def test_start_fake_raises_not_implemented(self, fake_pending): + p, fake = fake_pending + with pytest.raises(NotImplementedError): + p.start("echo hello") + + def test_start_and_wait(self, pending): + invoked = pending.start("echo wait test") + result = invoked.wait() + assert isinstance(result, ProcessResult) + assert "wait test" in result.output() + + def test_start_running_check(self, pending): + invoked = pending.start("echo quick") + result = invoked.wait() + # After wait(), process is done + assert invoked.running() is False + + def test_start_timeout_raises(self, pending): + pending.timeout(0.1) + invoked = pending.start("sleep 5") + with pytest.raises(ProcessTimedOutException): + invoked.wait() + + def test_start_with_callback(self, pending): + lines_received = [] + + def on_output(kind, line): + lines_received.append((kind, line)) + + invoked = pending.start("echo callback test", callback=on_output) + result = invoked.wait() + assert result.successful() is True + assert any("callback test" in line for kind, line in lines_received) + + +# --------------------------------------------------------------------------- +# pipe() +# --------------------------------------------------------------------------- + + +class TestPendingProcessPipe: + async def test_pipe_runs_piped_commands(self, pending): + result = await pending.pipe(lambda p: (p.command("echo hello world"), p.command("grep hello"))) + assert result.successful() is True + assert "hello" in result.output() + + async def test_pipe_returns_process_result(self, pending): + result = await pending.pipe(lambda p: p.command("echo piped")) + assert isinstance(result, ProcessResult) + + +# --------------------------------------------------------------------------- +# pool() +# --------------------------------------------------------------------------- + + +class TestPendingProcessPool: + def test_pool_runs_multiple_commands(self, pending, tmp_path): + from fastapi_startkit.process.process import Pool + + pool = pending.pool(lambda p: (p.command("echo one"), p.command("echo two"))) + assert isinstance(pool, Pool) + pool.start() + results = pool.wait() + assert len(results) == 2 + assert all(r.successful() for r in results) + + def test_pool_results_iterable(self, pending): + pool = pending.pool( + lambda p: ( + p.command("echo a"), + p.command("echo b"), + p.command("echo c"), + ) + ) + pool.start() + results = pool.wait() + count = sum(1 for _ in results) + assert count == 3 diff --git a/fastapi_startkit/tests/process/test_process.py b/fastapi_startkit/tests/process/test_process.py new file mode 100644 index 00000000..4a283933 --- /dev/null +++ b/fastapi_startkit/tests/process/test_process.py @@ -0,0 +1,308 @@ +"""Tests for the Process facade and real subprocess execution.""" + +import pytest + +from fastapi_startkit.process.exception import ProcessFailedException, ProcessTimedOutException +from fastapi_startkit.process.process import Process, ProcessFake + + +@pytest.fixture(autouse=True) +def reset_process_fake(): + """Ensure Process fake state is cleared after every test.""" + yield + Process.reset_fake() + + +# --------------------------------------------------------------------------- +# Real subprocess execution via Process.run() (async) +# --------------------------------------------------------------------------- + + +class TestProcessRun: + async def test_run_echo_returns_output(self): + result = await Process.run("echo hello") + assert "hello" in result.output() + + async def test_run_successful_exit_code(self): + result = await Process.run("echo hi") + assert result.exit_code() == 0 + assert result.successful() is True + assert result.failed() is False + + async def test_run_failed_command_exit_code(self): + result = await Process.run("exit 1") + assert result.exit_code() == 1 + assert result.failed() is True + + async def test_run_ls_lists_files(self, tmp_path): + (tmp_path / "testfile.txt").write_text("x") + result = await Process.path(str(tmp_path)).run("ls") + assert "testfile.txt" in result.output() + + async def test_run_captures_stderr(self): + result = await Process.run("ls /nonexistent_path_xyz_abc_123 2>&1 || true") + # stderr redirected to stdout via 2>&1, exit code 0 from 'true' + assert result.exit_code() == 0 + + async def test_run_with_stderr_separate(self): + result = await Process.run("ls /nonexistent_path_xyz_abc_123") + assert result.failed() is True + assert result.error_output() != "" or result.exit_code() != 0 + + async def test_run_multiline_output(self): + result = await Process.run('printf "line1\nline2\nline3"') + lines = result.output().strip().splitlines() + assert len(lines) == 3 + assert lines[0] == "line1" + + async def test_run_echo_with_env_variable(self): + result = await Process.env({"MY_VAR": "testvalue"}).run("echo $MY_VAR") + assert "testvalue" in result.output() + + async def test_run_in_path(self, tmp_path): + (tmp_path / "hello.txt").write_text("hello content") + result = await Process.path(str(tmp_path)).run("ls hello.txt") + assert result.successful() is True + assert "hello.txt" in result.output() + + async def test_run_quietly_returns_empty_output(self): + result = await Process.quietly().run("echo this is quiet") + assert result.output() == "" + assert result.successful() is True + + async def test_run_quietly_stderr_empty(self): + result = await Process.quietly().run("ls /nonexistent_path_xyz_abc_123") + assert result.error_output() == "" + assert result.failed() is True + + +class TestProcessTimeout: + async def test_timeout_raises_on_slow_command(self): + with pytest.raises(ProcessTimedOutException): + await Process.timeout(0.1).run("sleep 5") + + async def test_timeout_message_contains_command(self): + with pytest.raises(ProcessTimedOutException, match="sleep"): + await Process.timeout(0.1).run("sleep 5") + + async def test_forever_disables_timeout(self): + result = await Process.forever().run("echo fast") + assert result.successful() is True + + +# --------------------------------------------------------------------------- +# Process.run_sync() — sync execution +# --------------------------------------------------------------------------- + + +class TestProcessRunSync: + def test_run_sync_echo_returns_output(self): + result = Process.run_sync("echo hello") + assert "hello" in result.output() + + def test_run_sync_successful(self): + result = Process.run_sync("echo hi") + assert result.successful() is True + + def test_run_sync_failed_command(self): + result = Process.run_sync("exit 1") + assert result.failed() is True + + def test_run_sync_timeout_raises(self): + with pytest.raises(ProcessTimedOutException): + Process.timeout(0.1).run_sync("sleep 5") + + +# --------------------------------------------------------------------------- +# Process.fake() — testing infrastructure +# --------------------------------------------------------------------------- + + +class TestProcessFake: + def test_fake_returns_process_fake_instance(self): + fake = Process.fake() + assert isinstance(fake, ProcessFake) + + async def test_fake_default_successful_result(self): + Process.fake() + result = await Process.run("any command") + assert result.successful() is True + assert result.output() == "" + + async def test_fake_with_custom_output(self): + Process.fake({"echo hello": Process.describe().output("hello").exit_code(0)}) + result = await Process.run("echo hello") + assert result.output() == "hello" + assert result.successful() is True + + async def test_fake_with_failing_result(self): + Process.fake({"bad cmd": Process.describe().error_output("fail!").exit_code(1)}) + result = await Process.run("bad cmd") + assert result.failed() is True + assert result.error_output() == "fail!" + assert result.exit_code() == 1 + + async def test_fake_wildcard_matches_any_command(self): + Process.fake({"*": Process.describe().output("wildcard output").exit_code(0)}) + result = await Process.run("something unregistered") + assert result.output() == "wildcard output" + + async def test_fake_exact_match_takes_priority_over_wildcard(self): + Process.fake( + { + "specific cmd": Process.describe().output("specific").exit_code(0), + "*": Process.describe().output("wildcard").exit_code(0), + } + ) + result = await Process.run("specific cmd") + assert result.output() == "specific" + + async def test_fake_unregistered_command_returns_default_success(self): + Process.fake() + result = await Process.run("unregistered command") + assert result.successful() is True + + async def test_reset_fake_disables_fake_mode(self): + Process.fake({"echo hi": Process.describe().output("faked").exit_code(0)}) + Process.reset_fake() + # After reset, real subprocess runs + result = await Process.run("echo hi") + assert "hi" in result.output() + + +class TestProcessFakeAssertions: + async def test_assert_ran_passes_when_command_ran(self): + fake = Process.fake() + await Process.run("echo hello") + fake.assert_ran("echo hello") + + async def test_assert_ran_fails_when_command_not_run(self): + fake = Process.fake() + with pytest.raises(AssertionError, match="not run"): + fake.assert_ran("never ran") + + async def test_assert_not_ran_passes_when_command_not_run(self): + fake = Process.fake() + await Process.run("echo something") + fake.assert_not_ran("echo other") + + async def test_assert_not_ran_fails_when_command_ran(self): + fake = Process.fake() + await Process.run("echo hello") + with pytest.raises(AssertionError, match="unexpectedly"): + fake.assert_not_ran("echo hello") + + async def test_assert_ran_times_passes(self): + fake = Process.fake() + await Process.run("echo hello") + await Process.run("echo hello") + fake.assert_ran_times("echo hello", 2) + + async def test_assert_ran_times_fails_wrong_count(self): + fake = Process.fake() + await Process.run("echo hello") + with pytest.raises(AssertionError, match="time"): + fake.assert_ran_times("echo hello", 3) + + def test_assert_nothing_ran_passes_when_nothing_ran(self): + fake = Process.fake() + fake.assert_nothing_ran() + + async def test_assert_nothing_ran_fails_when_something_ran(self): + fake = Process.fake() + await Process.run("echo hello") + with pytest.raises(AssertionError): + fake.assert_nothing_ran() + + async def test_assert_ran_with_callback(self): + fake = Process.fake() + await Process.run("echo hello") + fake.assert_ran(lambda pending, result: result.successful()) + + async def test_assert_ran_with_callback_fails_when_no_match(self): + fake = Process.fake() + await Process.run("echo hello") + with pytest.raises(AssertionError, match="callback"): + fake.assert_ran(lambda pending, result: result.failed()) + + +# --------------------------------------------------------------------------- +# Exception cases +# --------------------------------------------------------------------------- + + +class TestExceptions: + async def test_process_failed_exception_has_result(self): + result = await Process.run("exit 42") + try: + result.throw() + pytest.fail("Expected ProcessFailedException") + except ProcessFailedException as e: + assert e.result is result + + async def test_process_failed_exception_message(self): + result = await Process.run("false") + with pytest.raises(ProcessFailedException) as exc_info: + result.throw() + assert "false" in str(exc_info.value) + + async def test_process_timed_out_exception(self): + with pytest.raises(ProcessTimedOutException) as exc_info: + await Process.timeout(0.1).run("sleep 10") + assert "sleep" in str(exc_info.value) + + async def test_process_timed_out_exception_has_command(self): + try: + await Process.timeout(0.1).run("sleep 10") + except ProcessTimedOutException as e: + assert "sleep" in e.command + + +# --------------------------------------------------------------------------- +# Fluent builder (Process class methods) +# --------------------------------------------------------------------------- + + +class TestFluentBuilder: + def test_timeout_returns_pending_process(self): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.timeout(30) + assert isinstance(pending, PendingProcess) + assert pending._timeout == 30 + + def test_forever_returns_pending_process_with_none_timeout(self): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.forever() + assert isinstance(pending, PendingProcess) + assert pending._timeout is None + + def test_quietly_returns_pending_process(self): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.quietly() + assert isinstance(pending, PendingProcess) + assert pending._quiet is True + + def test_env_merges_with_os_environ(self): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.env({"MY_KEY": "MY_VAL"}) + assert isinstance(pending, PendingProcess) + assert pending._env["MY_KEY"] == "MY_VAL" + assert "PATH" in pending._env + + def test_path_sets_cwd(self, tmp_path): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.path(str(tmp_path)) + assert isinstance(pending, PendingProcess) + assert pending._cwd == str(tmp_path) + + def test_input_sets_stdin_data(self): + from fastapi_startkit.process.process import PendingProcess + + pending = Process.input("stdin data") + assert isinstance(pending, PendingProcess) + assert pending._input == "stdin data" diff --git a/fastapi_startkit/tests/process/test_result.py b/fastapi_startkit/tests/process/test_result.py new file mode 100644 index 00000000..cdb23ef0 --- /dev/null +++ b/fastapi_startkit/tests/process/test_result.py @@ -0,0 +1,150 @@ +"""Tests for ProcessResult.""" + +import json + +import pytest + +from fastapi_startkit.process.exception import ProcessFailedException +from fastapi_startkit.process.result import ProcessResult + + +def _make_result(stdout="", stderr="", returncode=0, args="echo hello"): + """Helper: build a ProcessResult from plain attributes.""" + return ProcessResult(stdout=stdout, stderr=stderr, returncode=returncode, args=args) + + +class TestOutput: + def test_output_returns_stdout(self): + r = _make_result(stdout="hello world") + assert r.output() == "hello world" + + def test_output_returns_empty_string_when_empty(self): + r = ProcessResult() + assert r.output() == "" + + def test_error_output_returns_stderr(self): + r = _make_result(stderr="something went wrong") + assert r.error_output() == "something went wrong" + + def test_error_output_returns_empty_string_when_empty(self): + r = ProcessResult() + assert r.error_output() == "" + + def test_error_is_alias_for_error_output(self): + r = _make_result(stderr="err msg") + assert r.error() == r.error_output() + assert r.error() == "err msg" + + +class TestExitCode: + def test_exit_code_zero(self): + r = _make_result(returncode=0) + assert r.exit_code() == 0 + + def test_exit_code_nonzero(self): + r = _make_result(returncode=1) + assert r.exit_code() == 1 + + def test_exit_code_arbitrary(self): + r = _make_result(returncode=127) + assert r.exit_code() == 127 + + +class TestSuccessfulFailed: + def test_successful_when_returncode_zero(self): + r = _make_result(returncode=0) + assert r.successful() is True + assert r.failed() is False + + def test_failed_when_returncode_nonzero(self): + r = _make_result(returncode=1) + assert r.failed() is True + assert r.successful() is False + + def test_failed_with_large_code(self): + r = _make_result(returncode=255) + assert r.failed() is True + + +class TestThrow: + def test_throw_does_not_raise_on_success(self): + r = _make_result(returncode=0) + result = r.throw() + assert result is r + + def test_throw_raises_on_failure(self): + r = _make_result(returncode=1, stderr="oops", args="bad cmd") + with pytest.raises(ProcessFailedException) as exc_info: + r.throw() + assert exc_info.value.result is r + + def test_throw_exception_message_contains_command(self): + r = _make_result(returncode=2, args="my-command") + with pytest.raises(ProcessFailedException, match="my-command"): + r.throw() + + def test_throw_exception_message_contains_exit_code(self): + r = _make_result(returncode=42, args="cmd") + with pytest.raises(ProcessFailedException, match="42"): + r.throw() + + +class TestThrowIf: + def test_throw_if_true_raises(self): + r = _make_result(returncode=1) + with pytest.raises(ProcessFailedException): + r.throw_if(True) + + def test_throw_if_false_does_not_raise(self): + r = _make_result(returncode=1) + result = r.throw_if(False) + assert result is r + + def test_throw_if_truthy_raises(self): + r = _make_result(returncode=1) + with pytest.raises(ProcessFailedException): + r.throw_if(1) + + def test_throw_if_falsy_does_not_raise(self): + r = _make_result(returncode=1) + result = r.throw_if(0) + assert result is r + + def test_throw_if_on_successful_result_does_not_raise_even_with_true(self): + # throw_if(condition) calls throw(), which only raises when failed(). + # A successful result (returncode=0) will not raise regardless of condition. + r = _make_result(returncode=0) + result = r.throw_if(True) + assert result is r + + +class TestCommand: + def test_command_returns_args(self): + r = _make_result(args="ls -la /tmp") + assert r.command() == "ls -la /tmp" + + +class TestOutputJson: + def test_output_json_parses_valid_json(self): + r = _make_result(stdout='{"key": "value", "num": 42}') + data = r.output_json() + assert data == {"key": "value", "num": 42} + + def test_output_json_parses_list(self): + r = _make_result(stdout="[1, 2, 3]") + assert r.output_json() == [1, 2, 3] + + def test_output_json_raises_on_invalid_json(self): + r = _make_result(stdout="not valid json") + with pytest.raises(json.JSONDecodeError): + r.output_json() + + +class TestRepr: + def test_repr_contains_exit_code(self): + r = _make_result(returncode=0, stdout="hi") + assert "exit_code=0" in repr(r) + + def test_repr_contains_output(self): + r = _make_result(returncode=0, stdout="hello") + assert "hello" in repr(r)