diff --git a/src/pyspector/cli.py b/src/pyspector/cli.py index d43a705..845e9fe 100644 --- a/src/pyspector/cli.py +++ b/src/pyspector/cli.py @@ -3,10 +3,13 @@ import time import json import ast +import contextlib +import os import subprocess import tempfile import sys import warnings +from importlib.metadata import version as _pkg_version, PackageNotFoundError from pathlib import Path from typing import Optional, Dict, Any, List, cast @@ -48,6 +51,84 @@ def get_startup_note(): _list = list _tuple = tuple _ast_AST = ast.AST + + +def _dbg(debug: bool, msg: str = "", **style_kwargs) -> None: + """Emit *msg* via click.echo only when --debug is enabled. + + Used to gate progress/info chatter so the default output stays focused on + findings, warnings and errors. Errors and findings should call click.echo + directly, not this helper. + """ + if not debug: + return + if style_kwargs: + click.echo(click.style(msg, **style_kwargs)) + else: + click.echo(msg) + + +_BANNER = r""" + o__ __o o__ __o o + <| v\ /v v\ <|> + / \ <\ /> <\ < > + \o/ o/ o o _\o____ \o_ __o o__ __o __o__ | o__ __o \o__ __o + |__ _<|/ <|> <|> \_\__o__ | v\ /v |> /> \ o__/_ /v v\ | |> + | < > < > \ / \ <\ /> // o/ | /> <\ / \ < > + \o o/ \ / \o/ / \o o/ <| | \ / \o/ + | v\ /v o o | o v\ /v __o \\ o o o | + / \ <\/> <\__ __/> / \ __/> <\/> __/> _\o__ / \ + / \o/ + o | + __/> / \ +""" + + +@contextlib.contextmanager +def _silence_fd1(active: bool): + """Redirect file descriptor 1 (stdout) to /dev/null when *active* is True. + + Used to swallow ``println!`` output emitted by the Rust core during a scan + when --debug is not set. Python-side ``click.echo`` calls inside the block + are also suppressed; do not place user-facing output (findings, errors) + inside this context. + """ + if not active: + yield + return + sys.stdout.flush() + saved_fd = os.dup(1) + devnull_fd = os.open(os.devnull, os.O_WRONLY) + try: + os.dup2(devnull_fd, 1) + yield + finally: + sys.stdout.flush() + os.dup2(saved_fd, 1) + os.close(saved_fd) + os.close(devnull_fd) + + +def _get_version() -> str: + try: + return _pkg_version("pyspector") + except PackageNotFoundError: + return "unknown" + + +def _print_banner() -> None: + """Print the name banner, version, credits and the startup joke. + + Shown at the start of every scan. The verbose ``[*]`` progress lines that + follow are gated by --debug. + """ + click.echo(click.style(_BANNER)) + click.echo(f"Version: {_get_version()}") + click.echo("Made with <3 by github.com/ParzivalHack\n") + note = get_startup_note() + click.echo(click.style(f"{note}\n", fg="bright_black", italic=True)) + + _ast_iter_fields = ast.iter_fields # --- Helper function for AST serialization --- @@ -109,10 +190,36 @@ def should_skip_file(file_path: Path) -> bool: return False +def _is_path_excluded(file_path: Path, root: Path, patterns: List[str]) -> bool: + """Return True if *file_path* matches any of the *patterns* (fnmatch-style). + + Patterns are matched against the path relative to *root*, against the + absolute path, and against each individual path component. This lets + bare names like ".venv" or "node_modules" prune whole subtrees regardless + of depth. + """ + import fnmatch + try: + rel = file_path.relative_to(root) + except ValueError: + rel = file_path + rel_str = str(rel).replace("\\", "/") + abs_str = str(file_path).replace("\\", "/") + parts = set(rel.parts) | set(file_path.parts) + for pat in patterns: + if fnmatch.fnmatch(rel_str, pat) or fnmatch.fnmatch(abs_str, pat): + return True + if pat in parts: + return True + return False + + def get_python_file_asts( path: Path, enable_syntax_warnings: bool = False, _stats_meta: Optional[Dict[str, int]] = None, + debug: bool = False, + exclude: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """ Recursively finds Python files and returns their content and AST. @@ -131,7 +238,15 @@ def get_python_file_asts( _stats_meta['errors'] = 0 results = [] - files_to_scan = list(path.glob("**/*.py")) if path.is_dir() else [path] + exclude_patterns = list(exclude or []) + root = path if path.is_dir() else path.parent + if path.is_dir(): + files_to_scan = [ + p for p in path.glob("**/*.py") + if not _is_path_excluded(p, root, exclude_patterns) + ] + else: + files_to_scan = [path] with warnings.catch_warnings(): if not enable_syntax_warnings: @@ -146,11 +261,10 @@ def get_python_file_asts( ) if should_skip_file(py_file): - click.echo( - click.style( - f"Info: Skipped {display_path} (test file or fixture)", - fg="blue", - ) + _dbg( + debug, + f"Info: Skipped {display_path} (test file or fixture)", + fg="blue", ) if _stats_meta is not None: _stats_meta['skipped'] += 1 @@ -229,12 +343,13 @@ def execute_plugins( scan_path: Path, plugin_names: list, plugin_config: dict | None = None, + debug: bool = False, ): """Execute specified plugins on scan results.""" if not plugin_names: return - click.echo(f"\n[*] Loading {len(plugin_names)} plugin(s)...") + _dbg(debug, f"\n[*] Loading {len(plugin_names)} plugin(s)...") plugin_manager = get_plugin_manager() plugin_config = plugin_config or {} @@ -270,16 +385,15 @@ def execute_plugins( sys.argv = original_argv if result.get("success"): - click.echo( - click.style( - f"[+] {plugin.metadata.name}: {result.get('message', 'Success')}", - fg="green", - ) + _dbg( + debug, + f"[+] {plugin.metadata.name}: {result.get('message', 'Success')}", + fg="green", ) if result.get("output_files"): - click.echo("[*] Generated files:") + _dbg(debug, "[*] Generated files:") for file_path in result["output_files"]: - click.echo(f" - {file_path}") + _dbg(debug, f" - {file_path}") else: click.echo( click.style( @@ -297,25 +411,6 @@ def cli(): PySpector: A high-performance, security-focused static analysis tool for Python, powered by Rust. """ - banner = r""" - o__ __o o__ __o o - <| v\ /v v\ <|> - / \ <\ /> <\ < > - \o/ o/ o o _\o____ \o_ __o o__ __o __o__ | o__ __o \o__ __o - |__ _<|/ <|> <|> \_\__o__ | v\ /v |> /> \ o__/_ /v v\ | |> - | < > < > \ / \ <\ /> // o/ | /> <\ / \ < > - \o o/ \ / \o/ / \o o/ <| | \ / \o/ - | v\ /v o o | o v\ /v __o \\ o o o | - / \ <\/> <\__ __/> / \ __/> <\/> __/> _\o__ / \ - / \o/ - o | - __/> / \ -""" - click.echo(click.style(banner)) - click.echo("Version: 0.1.9\n") - click.echo("Made with <3 by github.com/ParzivalHack\n") - note = get_startup_note() - click.echo(click.style(f"{note}\n", fg="bright_black", italic=True)) def run_wizard(): @@ -422,6 +517,9 @@ def run_wizard(): "at the end of the scan (LoC/sec, memory, engine breakdown, " "top rules, top files, vulnerability density, and more)." )) +@click.option('--debug', is_flag=True, default=False, + help="Show all informational/progress messages and the banner. " + "Without this flag only findings, warnings and errors are printed.") def run_scan_command( path: Optional[Path], repo_url: Optional[str], @@ -437,9 +535,12 @@ def run_scan_command( syntax_warnings: bool, wizard: bool, show_stats: bool, + debug: bool, ): """The main scan command with plugin and stats support.""" + _print_banner() + # --- Wizard Mode --- if wizard: params = run_wizard() @@ -456,7 +557,7 @@ def run_scan_command( "URL must be a public GitHub or GitLab repository." ) with tempfile.TemporaryDirectory() as temp_dir: - click.echo(f"[*] Cloning '{params['repo_url']}' into temporary directory...") + _dbg(debug, f"[*] Cloning '{params['repo_url']}' into temporary directory...") subprocess.run( ['git', 'clone', '--depth', '1', params["repo_url"], temp_dir], check=True, capture_output=True, text=True, @@ -473,6 +574,7 @@ def run_scan_command( supply_chain_scan=params["supply_chain_scan"], syntax_warnings=params["syntax_warnings"], show_stats=params["show_stats"], + debug=debug, ) else: _execute_scan( @@ -487,6 +589,7 @@ def run_scan_command( supply_chain_scan=params["supply_chain_scan"], syntax_warnings=params["syntax_warnings"], show_stats=params["show_stats"], + debug=debug, ) return @@ -543,7 +646,7 @@ def run_scan_command( ) with tempfile.TemporaryDirectory() as temp_dir: - click.echo(f"[*] Cloning '{repo_url}' into temporary directory...") + _dbg(debug, f"[*] Cloning '{repo_url}' into temporary directory...") try: subprocess.run( ['git', 'clone', '--depth', '1', repo_url, temp_dir], @@ -553,7 +656,7 @@ def run_scan_command( Path(temp_dir), config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain, - syntax_warnings, show_stats, + syntax_warnings, show_stats, debug, ) except subprocess.CalledProcessError as e: click.echo( @@ -576,7 +679,7 @@ def run_scan_command( path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain, - syntax_warnings, show_stats, + syntax_warnings, show_stats, debug, ) @@ -592,6 +695,7 @@ def _execute_scan( supply_chain_scan: bool = False, syntax_warnings: bool = False, show_stats: bool = False, + debug: bool = False, ): """ Core scan orchestrator. @@ -616,7 +720,7 @@ def _execute_scan( if stats: stats.record_rules(rules_toml_str) - click.echo(f"[*] Starting PySpector scan on '{scan_path}'...") + _dbg(debug, f"[*] Starting PySpector scan on '{scan_path}'...") # ── Load Baseline ───────────────────────────────────────────────────── baseline_path = ( @@ -632,9 +736,10 @@ def _execute_scan( ignored_fingerprints = set( baseline_data.get("ignored_fingerprints", []) ) - click.echo( + _dbg( + debug, f"[*] Loaded baseline from '{baseline_path}', " - f"ignoring {len(ignored_fingerprints)} known issues." + f"ignoring {len(ignored_fingerprints)} known issues.", ) except json.JSONDecodeError: click.echo( @@ -651,8 +756,10 @@ def _execute_scan( scan_path, enable_syntax_warnings=syntax_warnings, _stats_meta=ast_stats_meta, + debug=debug, + exclude=list(config.get("exclude", [])), ) - click.echo(f"[*] Successfully parsed {len(python_files_data)} Python files in {time.time()-t_parse:.2f}s") + _dbg(debug, f"[*] Successfully parsed {len(python_files_data)} Python files in {time.time()-t_parse:.2f}s") if stats: stats.record_files( @@ -665,8 +772,9 @@ def _execute_scan( if supply_chain_scan: try: from pyspector._rust_core import scan_supply_chain - click.echo("\n[*] Scanning dependencies for known vulnerabilities...") - dep_vulns = scan_supply_chain(str(scan_path.resolve())) + _dbg(debug, "\n[*] Scanning dependencies for known vulnerabilities...") + with _silence_fd1(not debug): + dep_vulns = scan_supply_chain(str(scan_path.resolve())) if dep_vulns: click.echo(f"\n{'='*60}") @@ -693,7 +801,7 @@ def _execute_scan( click.echo(f" Fixed in: {vuln['fixed_version']}") click.echo() else: - click.echo("[+] No known vulnerabilities found in dependencies") + _dbg(debug, "[+] No known vulnerabilities found in dependencies") except ImportError: click.echo( click.style( @@ -707,10 +815,11 @@ def _execute_scan( # ── Run Scan (Rust core) ─────────────────────────────────────────────── t_rust = time.time() try: - raw_issues = run_scan( - str(scan_path.resolve()), rules_toml_str, config, python_files_data - ) - click.echo(f"[*] Rust core scan: {time.time()-t_rust:.2f}s") + with _silence_fd1(not debug): + raw_issues = run_scan( + str(scan_path.resolve()), rules_toml_str, config, python_files_data + ) + _dbg(debug, f"[*] Rust core scan: {time.time()-t_rust:.2f}s") except ValueError as e: click.echo( click.style( @@ -790,7 +899,7 @@ def _execute_scan( if plugins: try: - execute_plugins(findings_dict, scan_path, list(plugins), plugin_config) + execute_plugins(findings_dict, scan_path, list(plugins), plugin_config, debug=debug) except click.ClickException as exc: click.echo(click.style(f"[!] Plugin error: {exc}", fg="red")) @@ -801,21 +910,23 @@ def _execute_scan( if output_file: try: output_file.write_text(output, encoding='utf-8') - click.echo(f"\n[+] Report saved to '{output_file}'") + _dbg(debug, f"\n[+] Report saved to '{output_file}'") except IOError as e: click.echo(click.style(f"Error writing to output file: {e}", fg="red")) else: click.echo(output) end_time = time.time() - click.echo( + _dbg( + debug, f"\n[*] Scan finished in {end_time - start_time:.2f} seconds. " - f"Found {len(final_issues)} issues." + f"Found {len(final_issues)} issues.", ) if len(raw_issues) > len(final_issues): - click.echo( + _dbg( + debug, f"[*] Ignored {len(raw_issues) - len(final_issues)} issues " - f"based on severity level or baseline." + f"based on severity level or baseline.", ) # ── Stats Table ──────────────────────────────────────────────────────── diff --git a/src/pyspector/config.py b/src/pyspector/config.py index fac1241..785e181 100644 --- a/src/pyspector/config.py +++ b/src/pyspector/config.py @@ -11,6 +11,8 @@ DEFAULT_CONFIG = { "exclude": [ ".venv", "venv", ".git", "__pycache__", "build", "dist", "*.egg-info", + # Dependency / vendored directories + "node_modules", "bower_components", "vendor", # Add test fixture exclusions "*/tests/fixtures/*", "*/test/fixtures/*",