diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml new file mode 100644 index 0000000..b3ae2f1 --- /dev/null +++ b/.github/workflows/e2e-tests.yml @@ -0,0 +1,66 @@ +name: E2E Tests +permissions: + contents: read + +on: + push: + branches: [ main ] + paths: + - 'src/**/*.py' + - 'tests/e2e/**' + - '.github/workflows/e2e-tests.yml' + pull_request: + branches: [ main ] + paths: + - 'src/**/*.py' + - 'tests/e2e/**' + - '.github/workflows/e2e-tests.yml' + workflow_dispatch: + +jobs: + e2e-tests: + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + cache-dependency-glob: "**/pyproject.toml" + + - name: Set up Python 3.12 + run: uv python install 3.12 + + - name: Install dependencies + run: uv sync --all-groups + + - name: Install PostgreSQL + run: | + sudo apt-get update + sudo apt-get install -y postgresql + echo "/usr/lib/postgresql/$(ls /usr/lib/postgresql/)/bin" >> "$GITHUB_PATH" + + - name: Install Foundry (anvil) + uses: foundry-rs/foundry-toolchain@v1 + + - name: Install amp tools + run: | + gh release download --repo edgeandnode/amp --pattern 'ampd-linux-x86_64' --output /usr/local/bin/ampd + chmod +x /usr/local/bin/ampd + env: + GH_TOKEN: ${{ github.token }} + + - name: Verify tools + run: | + anvil --version + ampd --version + initdb --version + postgres --version + + - name: Run E2E tests + run: | + uv run pytest tests/e2e/ -v --log-cli-level=INFO -m "e2e" -x + diff --git a/.gitignore b/.gitignore index ba8d0fb..881b1fc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ .env .test.env *.env +.amp +.amp_state # Kubernetes secrets (NEVER commit these!) k8s/secret.yaml diff --git a/Makefile b/Makefile index 64572ad..bf653fc 100644 --- a/Makefile +++ b/Makefile @@ -54,6 +54,11 @@ test-performance: @echo "🏇 Running performance tests..." $(PYTHON) pytest tests/performance/ -m "performance" -v --log-cli-level=ERROR +# E2E tests (require anvil, ampd, Docker) +test-e2e: + @echo "Running E2E tests..." + $(PYTHON) pytest tests/e2e/ -m "e2e" -v --log-cli-level=INFO -x + # Code quality (using your ruff config) lint: @echo "🔍 Linting code..." @@ -133,6 +138,7 @@ help: @echo " make test-redis - Run Redis tests" @echo " make test-snowflake - Run Snowflake tests" @echo " make test-performance - Run performance tests" + @echo " make test-e2e - Run E2E tests (require anvil, ampd, Docker)" @echo " make lint - Lint code with ruff" @echo " make format - Format code with ruff" @echo " make test-setup - Start test databases" diff --git a/pyproject.toml b/pyproject.toml index 258bc24..98ffd92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,6 +123,7 @@ markers = [ "iceberg: Tests requiring Apache Iceberg", "snowflake: Tests requiring Snowflake", "cloud: Tests requiring cloud storage access", + "e2e: End-to-end tests (require anvil, ampd, Docker)", ] [tool.ruff] diff --git a/tests/conftest.py b/tests/conftest.py index f0bbc4d..900b316 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -461,6 +461,7 @@ def pytest_configure(config): config.addinivalue_line('markers', 'snowflake: Tests requiring Snowflake') config.addinivalue_line('markers', 'lmdb: Tests requiring LMDB') config.addinivalue_line('markers', 'slow: Slow tests (> 30 seconds)') + config.addinivalue_line('markers', 'e2e: End-to-end tests (require anvil, ampd, Docker)') # Utility fixtures for mocking diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 0000000..e82bbcc --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,157 @@ +"""E2E test fixtures: session-scoped and function-scoped ampd + Anvil infrastructure.""" + +import logging +import shutil +import tempfile +from dataclasses import dataclass +from pathlib import Path + +import pytest + +from amp.client import Client + +from .helpers.config import copy_anvil_manifest, generate_ampd_config, generate_provider_toml +from .helpers.dataset_manager import DatasetManager +from .helpers.process_manager import ( + get_free_port, + mine_blocks, + send_eth, + spawn_ampd, + spawn_anvil, + wait_for_ampd_ready, + wait_for_data_ready, +) + +logger = logging.getLogger(__name__) + + +def _check_deps(): + missing = [b for b in ('anvil', 'ampd', 'initdb', 'postgres') if not shutil.which(b)] + if missing: + pytest.fail(f'Missing binaries: {", ".join(missing)}') + + +@dataclass +class AmpTestServer: + """An ampd + Anvil stack with a connected client.""" + + client: Client + anvil_url: str + admin_url: str + ports: dict + + +def _setup_amp_stack(num_blocks: int = 10, end_block: str | None = 'latest'): + """Spin up anvil + ampd + register + deploy. + + Returns (AmpTestServer, cleanup_fn). + """ + temp_dir = Path(tempfile.mkdtemp(prefix='amp_e2e_')) + anvil_proc = None + ampd_proc = None + try: + log_dir = temp_dir / 'logs' + ports = { + 'admin': get_free_port(), + 'flight': get_free_port(), + 'jsonl': get_free_port(), + } + + anvil_proc, anvil_url = spawn_anvil(log_dir) + # Send a transaction so transactions table has data + send_eth( + anvil_url, + from_addr='0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266', + to_addr='0x70997970C51812dc3A010C7d01b50e0d17dc79C8', + value_wei=10**18, + ) + mine_blocks(anvil_url, num_blocks) + + config_path = generate_ampd_config( + temp_dir, + ports['admin'], + ports['flight'], + ports['jsonl'], + ) + generate_provider_toml(temp_dir, anvil_url) + manifest_path = copy_anvil_manifest(temp_dir) + + ampd_proc = spawn_ampd(config_path, log_dir) + wait_for_ampd_ready(ports['admin']) + + admin_url = f'http://127.0.0.1:{ports["admin"]}' + manager = DatasetManager(admin_url) + try: + manager.register_provider('anvil', temp_dir / 'provider_sources' / 'anvil.toml') + manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') + manager.deploy_dataset('_', 'anvil', '0.0.1', end_block=end_block) + wait_for_data_ready(ports['flight']) + finally: + manager.close() + except Exception: + if ampd_proc: + ampd_proc.terminate() + if anvil_proc: + anvil_proc.terminate() + shutil.rmtree(temp_dir, ignore_errors=True) + raise + + server = AmpTestServer( + client=Client(query_url=f'grpc://127.0.0.1:{ports["flight"]}'), + anvil_url=anvil_url, + admin_url=admin_url, + ports=ports, + ) + + def cleanup(): + ampd_proc.terminate() + anvil_proc.terminate() + shutil.rmtree(temp_dir, ignore_errors=True) + + return server, cleanup + + +def _amp_fixture(**stack_kwargs): + """Yield an AmpTestServer, handling skip-check and cleanup.""" + _check_deps() + server, cleanup = _setup_amp_stack(**stack_kwargs) + yield server + cleanup() + + +# --------------------------------------------------------------------------- +# Session-scoped fixtures (read-only query tests) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope='session') +def e2e_server(): + yield from _amp_fixture() + + +@pytest.fixture(scope='session') +def e2e_client(e2e_server): + return e2e_server.client + + +@pytest.fixture(scope='session') +def continuous_server(): + """Session-scoped ampd + Anvil with continuous deploy.""" + yield from _amp_fixture(end_block=None) + + +# --------------------------------------------------------------------------- +# Function-scoped fixtures (tests that mutate chain state) +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def amp_test_server(): + """Isolated ampd + Anvil stack for a single test.""" + yield from _amp_fixture() + + +@pytest.fixture() +def reorg_server(): + """Isolated ampd + Anvil stack for reorg testing.""" + yield from _amp_fixture(end_block=None) diff --git a/tests/e2e/fixtures/anvil.json b/tests/e2e/fixtures/anvil.json new file mode 100644 index 0000000..6a303e2 --- /dev/null +++ b/tests/e2e/fixtures/anvil.json @@ -0,0 +1,408 @@ +{ + "$comment": "Generated with: ampctl manifest generate --kind evm-rpc --network anvil --start-block 0", + "kind": "evm-rpc", + "network": "anvil", + "start_block": 0, + "finalized_blocks_only": false, + "tables": { + "blocks": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "parent_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "ommers_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "miner", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "state_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "transactions_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "receipt_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "logs_bloom", + "type": "Binary", + "nullable": false + }, + { + "name": "difficulty", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": false + }, + { + "name": "total_difficulty", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "gas_limit", + "type": "UInt64", + "nullable": false + }, + { + "name": "gas_used", + "type": "UInt64", + "nullable": false + }, + { + "name": "extra_data", + "type": "Binary", + "nullable": false + }, + { + "name": "mix_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "nonce", + "type": "UInt64", + "nullable": false + }, + { + "name": "base_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "withdrawals_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "blob_gas_used", + "type": "UInt64", + "nullable": true + }, + { + "name": "excess_blob_gas", + "type": "UInt64", + "nullable": true + }, + { + "name": "parent_beacon_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + } + ] + } + }, + "network": "anvil" + }, + "logs": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "tx_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "tx_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "log_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "address", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "topic0", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic1", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic2", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic3", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "data", + "type": "Binary", + "nullable": false + } + ] + } + }, + "network": "anvil" + }, + "transactions": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "tx_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "tx_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "to", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": true + }, + { + "name": "nonce", + "type": "UInt64", + "nullable": false + }, + { + "name": "gas_price", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "gas_limit", + "type": "UInt64", + "nullable": false + }, + { + "name": "value", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": false + }, + { + "name": "input", + "type": "Binary", + "nullable": false + }, + { + "name": "v", + "type": "Binary", + "nullable": false + }, + { + "name": "r", + "type": "Binary", + "nullable": false + }, + { + "name": "s", + "type": "Binary", + "nullable": false + }, + { + "name": "gas_used", + "type": "UInt64", + "nullable": false + }, + { + "name": "type", + "type": "Int32", + "nullable": false + }, + { + "name": "max_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "max_priority_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "max_fee_per_blob_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "from", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "status", + "type": "Boolean", + "nullable": false + } + ] + } + }, + "network": "anvil" + } + } +} diff --git a/tests/e2e/fixtures/config.toml.template b/tests/e2e/fixtures/config.toml.template new file mode 100644 index 0000000..517daf8 --- /dev/null +++ b/tests/e2e/fixtures/config.toml.template @@ -0,0 +1,7 @@ +manifests_dir = "{manifests_dir}" +providers_dir = "{providers_dir}" +data_dir = "{data_dir}" +max_mem_mb = 2000 +admin_api_addr = "127.0.0.1:{admin_port}" +flight_addr = "127.0.0.1:{flight_port}" +jsonl_addr = "127.0.0.1:{jsonl_port}" diff --git a/tests/e2e/fixtures/provider.toml.template b/tests/e2e/fixtures/provider.toml.template new file mode 100644 index 0000000..1ec7f89 --- /dev/null +++ b/tests/e2e/fixtures/provider.toml.template @@ -0,0 +1,3 @@ +kind = "evm-rpc" +url = "{anvil_url}" +network = "anvil" diff --git a/tests/e2e/helpers/__init__.py b/tests/e2e/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/helpers/config.py b/tests/e2e/helpers/config.py new file mode 100644 index 0000000..128d03b --- /dev/null +++ b/tests/e2e/helpers/config.py @@ -0,0 +1,60 @@ +"""Config file generation for ampd E2E tests.""" + +import shutil +from pathlib import Path + +FIXTURES_DIR = Path(__file__).parent.parent / 'fixtures' + + +def generate_ampd_config( + config_dir: Path, + admin_port: int, + flight_port: int, + jsonl_port: int, +) -> Path: + """Generate ampd config.toml and required subdirectories. + + Returns the path to config.toml. + """ + for subdir in ('data', 'manifests', 'providers', 'provider_sources'): + (config_dir / subdir).mkdir(parents=True, exist_ok=True) + + template = (FIXTURES_DIR / 'config.toml.template').read_text() + config = template.format( + manifests_dir=config_dir / 'manifests', + providers_dir=config_dir / 'providers', + data_dir=config_dir / 'data', + admin_port=admin_port, + flight_port=flight_port, + jsonl_port=jsonl_port, + ) + + config_path = config_dir / 'config.toml' + config_path.write_text(config) + return config_path + + +def generate_provider_toml(config_dir: Path, anvil_url: str) -> Path: + """Write provider source TOML for Anvil. + + Returns the path to the provider source file. + """ + provider_sources_dir = config_dir / 'provider_sources' + provider_sources_dir.mkdir(parents=True, exist_ok=True) + + template = (FIXTURES_DIR / 'provider.toml.template').read_text() + provider_toml = template.format(anvil_url=anvil_url) + + path = provider_sources_dir / 'anvil.toml' + path.write_text(provider_toml) + return path + + +def copy_anvil_manifest(config_dir: Path) -> Path: + """Copy anvil.json fixture into the manifests directory. + + Returns the path to the copied manifest. + """ + dest = config_dir / 'manifests' / 'anvil.json' + shutil.copy(FIXTURES_DIR / 'anvil.json', dest) + return dest diff --git a/tests/e2e/helpers/dataset_manager.py b/tests/e2e/helpers/dataset_manager.py new file mode 100644 index 0000000..e8fc83a --- /dev/null +++ b/tests/e2e/helpers/dataset_manager.py @@ -0,0 +1,37 @@ +"""Dataset registration and deployment for E2E tests via Admin API.""" + +import json +import logging +import tomllib +from pathlib import Path + +from amp.admin import AdminClient + +logger = logging.getLogger(__name__) + + +class DatasetManager: + """Manages provider registration, dataset registration, and deployment.""" + + def __init__(self, admin_url: str): + self._admin = AdminClient(base_url=admin_url) + + def register_provider(self, name: str, provider_source_path: Path) -> None: + with open(provider_source_path, 'rb') as f: + config = tomllib.load(f) + config['name'] = name + self._admin._request('POST', '/providers', json=config) + logger.info(f'Registered provider {name}') + + def register_dataset(self, namespace: str, name: str, manifest_path: Path, tag: str) -> None: + with open(manifest_path) as f: + manifest = json.load(f) + self._admin.datasets.register(namespace, name, tag, manifest) + logger.info(f'Registered dataset {namespace}/{name}@{tag}') + + def deploy_dataset(self, namespace: str, name: str, version: str, end_block: str | None = 'latest') -> None: + self._admin.datasets.deploy(namespace, name, version, end_block=end_block) + logger.info(f'Deployed {namespace}/{name}@{version}') + + def close(self) -> None: + self._admin.close() diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py new file mode 100644 index 0000000..1d4cf00 --- /dev/null +++ b/tests/e2e/helpers/process_manager.py @@ -0,0 +1,201 @@ +"""Subprocess utilities for managing Anvil and ampd processes in E2E tests.""" + +import logging +import os +import socket +import subprocess +import time +from dataclasses import dataclass, field +from pathlib import Path + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class ManagedProcess: + """Wrapper around subprocess.Popen with cleanup.""" + + process: subprocess.Popen + name: str + _log_files: list = field(default_factory=list, repr=False) + + def terminate(self, timeout: int = 5) -> None: + """Terminate the process with a kill fallback.""" + if self.process.poll() is not None: + self._close_logs() + return + self.process.terminate() + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + logger.warning(f'{self.name} did not terminate in {timeout}s, killing') + self.process.kill() + self.process.wait(timeout=5) + self._close_logs() + + def _close_logs(self) -> None: + for f in self._log_files: + f.close() + self._log_files.clear() + + def is_alive(self) -> bool: + return self.process.poll() is None + + +def get_free_port() -> int: + """Bind to port 0 and return the assigned port.""" + with socket.socket() as s: + s.bind(('127.0.0.1', 0)) + return s.getsockname()[1] + + +def spawn_anvil(log_dir: Path) -> tuple[ManagedProcess, str]: + """Spawn an Anvil process on a random free port. + + Returns the managed process and the HTTP URL. + """ + port = get_free_port() + log_dir.mkdir(parents=True, exist_ok=True) + + stdout_f = open(log_dir / 'anvil_stdout.log', 'w') + stderr_f = open(log_dir / 'anvil_stderr.log', 'w') + + process = subprocess.Popen( + ['anvil', '--port', str(port)], + stdout=stdout_f, + stderr=stderr_f, + ) + + url = f'http://127.0.0.1:{port}' + _wait_for_jsonrpc(url, timeout=30) + + return ( + ManagedProcess(process=process, name='anvil', _log_files=[stdout_f, stderr_f]), + url, + ) + + +def _wait_for_jsonrpc(url: str, timeout: int = 30) -> None: + """Poll a JSON-RPC endpoint until it responds.""" + start = time.monotonic() + with httpx.Client() as client: + while time.monotonic() - start < timeout: + try: + resp = client.post( + url, + json={'jsonrpc': '2.0', 'method': 'eth_blockNumber', 'params': [], 'id': 1}, + ) + if resp.status_code == 200: + return + except httpx.ConnectError: + pass + time.sleep(0.2) + raise TimeoutError(f'JSON-RPC at {url} not ready after {timeout}s') + + +def mine_blocks(anvil_url: str, count: int) -> None: + """Mine blocks on an Anvil instance via JSON-RPC anvil_mine.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'anvil_mine', 'params': [count, 0], 'id': 1}, + ) + resp.raise_for_status() + + +def send_eth(anvil_url: str, from_addr: str, to_addr: str, value_wei: int) -> str: + """Send an ETH transfer on Anvil. Returns the transaction hash.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={ + 'jsonrpc': '2.0', + 'method': 'eth_sendTransaction', + 'params': [{'from': from_addr, 'to': to_addr, 'value': hex(value_wei)}], + 'id': 1, + }, + ) + resp.raise_for_status() + return resp.json()['result'] + + +def evm_snapshot(anvil_url: str) -> str: + """Take a snapshot of the current anvil state. Returns snapshot ID.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'evm_snapshot', 'params': [], 'id': 1}, + ) + resp.raise_for_status() + return resp.json()['result'] + + +def evm_revert(anvil_url: str, snapshot_id: str) -> None: + """Revert anvil to a previous snapshot.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'evm_revert', 'params': [snapshot_id], 'id': 1}, + ) + resp.raise_for_status() + assert resp.json()['result'] is True, 'evm_revert failed' + + +def spawn_ampd(config_path: Path, log_dir: Path) -> ManagedProcess: + """Spawn ampd dev with the given config file.""" + log_dir.mkdir(parents=True, exist_ok=True) + + stdout_f = open(log_dir / 'ampd_stdout.log', 'w') + stderr_f = open(log_dir / 'ampd_stderr.log', 'w') + + amp_dir = config_path.parent / '.amp' + process = subprocess.Popen( + ['ampd', 'dev'], + env={**os.environ, 'AMP_CONFIG': str(config_path), 'AMP_DIR': str(amp_dir)}, + stdout=stdout_f, + stderr=stderr_f, + ) + + return ManagedProcess(process=process, name='ampd', _log_files=[stdout_f, stderr_f]) + + +def wait_for_ampd_ready(admin_port: int, timeout: int = 60) -> None: + """Poll the Admin API until ampd is ready.""" + url = f'http://127.0.0.1:{admin_port}/datasets' + start = time.monotonic() + with httpx.Client() as client: + while time.monotonic() - start < timeout: + try: + resp = client.get(url) + if resp.status_code == 200: + logger.info(f'ampd ready after {time.monotonic() - start:.1f}s') + return + except httpx.ConnectError: + pass + time.sleep(0.5) + raise TimeoutError(f'ampd admin API not ready after {timeout}s') + + +def wait_for_data_ready(flight_port: int, timeout: int = 60) -> None: + """Poll Flight SQL until data is queryable.""" + wait_for_block(flight_port, 0, timeout=timeout) + + +def wait_for_block(flight_port: int, block_num: int, timeout: int = 60) -> None: + """Poll Flight SQL until a specific block number is available.""" + from amp.client import Client + + client = Client(query_url=f'grpc://127.0.0.1:{flight_port}') + start = time.monotonic() + while time.monotonic() - start < timeout: + try: + table = client.sql(f'SELECT block_num FROM anvil.blocks WHERE block_num = {block_num}').to_arrow() + if len(table) > 0: + logger.info(f'Block {block_num} available after {time.monotonic() - start:.1f}s') + return + except Exception: + pass + time.sleep(1) + raise TimeoutError(f'Block {block_num} not available after {timeout}s') diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py new file mode 100644 index 0000000..cf62e1c --- /dev/null +++ b/tests/e2e/test_queries.py @@ -0,0 +1,50 @@ +"""E2E query validation tests against real ampd + Anvil.""" + +import pyarrow as pa +import pytest + +pytestmark = pytest.mark.e2e + +# genesis + 1 tx block (auto-mined) + 10 mined blocks +EXPECTED_BLOCKS = 12 + + +def test_query_blocks(e2e_client): + table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() + + assert len(table) == EXPECTED_BLOCKS + block_nums = table.column('block_num').to_pylist() + assert block_nums == list(range(EXPECTED_BLOCKS)) + assert all(h is not None for h in table.column('hash').to_pylist()) + + +def test_blocks_schema(e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.blocks LIMIT 1').to_arrow() + schema = table.schema + + assert schema.field('block_num').type == pa.uint64() + assert schema.field('hash').type == pa.binary(32) + assert schema.field('parent_hash').type == pa.binary(32) + assert schema.field('miner').type == pa.binary(20) + assert schema.field('gas_used').type == pa.uint64() + assert hasattr(schema.field('timestamp').type, 'tz') + + +def test_query_transactions(e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() + assert len(table) >= 1 + expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} + assert expected_cols.issubset(set(table.column_names)) + + +def test_query_with_where_clause(e2e_client): + table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() + + assert len(table) == EXPECTED_BLOCKS - 6 + assert table.column('block_num').to_pylist() == list(range(6, EXPECTED_BLOCKS)) + + +def test_isolated_server(amp_test_server): + """Verify function-scoped stack works on its own ports.""" + table = amp_test_server.client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == EXPECTED_BLOCKS diff --git a/tests/e2e/test_streaming.py b/tests/e2e/test_streaming.py new file mode 100644 index 0000000..755fcb2 --- /dev/null +++ b/tests/e2e/test_streaming.py @@ -0,0 +1,125 @@ +"""E2E streaming, continuous ingestion, and reorg detection tests.""" + +import time + +import pytest + +from .helpers.process_manager import ( + evm_revert, + evm_snapshot, + mine_blocks, + wait_for_block, +) + +pytestmark = pytest.mark.e2e + + +def test_continuous_ingestion(continuous_server): + """Verify ampd ingests new blocks in continuous mode.""" + anvil_url = continuous_server.anvil_url + flight_port = continuous_server.ports['flight'] + client = continuous_server.client + + table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + count_before = table.column('cnt').to_pylist()[0] + assert count_before >= 11 + + mine_blocks(anvil_url, 5) + wait_for_block(flight_port, count_before + 4) + + table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + count_after = table.column('cnt').to_pylist()[0] + assert count_after >= count_before + 5 + + +def test_streaming_metadata_parsing(continuous_server): + """Verify real server app_metadata parses into BatchMetadata with hashes.""" + from google.protobuf.any_pb2 import Any + from pyarrow import flight + + from amp import FlightSql_pb2 + from amp.streaming.types import BatchMetadata + + query = 'SELECT block_num, hash FROM anvil.blocks SETTINGS stream = true' + + command_query = FlightSql_pb2.CommandStatementQuery() + command_query.query = query + any_command = Any() + any_command.Pack(command_query) + cmd = any_command.SerializeToString() + + client = continuous_server.client + flight_descriptor = flight.FlightDescriptor.for_command(cmd) + info = client.conn.get_flight_info(flight_descriptor) + reader = client.conn.do_get(info.endpoints[0].ticket) + + # Read chunks until we get one with data rows (skip watermark/empty batches) + metadata = None + for _ in range(20): + chunk = reader.read_chunk() + if chunk.app_metadata is not None: + md = BatchMetadata.from_flight_data(chunk.app_metadata) + if md.ranges: + metadata = md + break + + reader.cancel() + + assert metadata is not None, 'Should receive at least one batch with block ranges' + for r in metadata.ranges: + assert r.network is not None + assert r.start >= 0 + assert r.end >= r.start + assert r.hash is not None, 'Server should send block hashes' + + +def test_reorg_detection(reorg_server): + """Verify ampd detects a reorg when new blocks break the hash chain.""" + anvil_url = reorg_server.anvil_url + flight_port = reorg_server.ports['flight'] + client = reorg_server.client + + # Find current chain tip + tip = client.sql('SELECT MAX(block_num) AS tip FROM anvil.blocks').to_arrow() + tip_block = tip.column('tip').to_pylist()[0] + + snapshot_id = evm_snapshot(anvil_url) + + # Mine 5 blocks past the tip, wait for ingestion + mine_blocks(anvil_url, 5) + first_new = tip_block + 1 + last_new = tip_block + 5 + wait_for_block(flight_port, last_new) + + # Capture pre-reorg hashes + pre_reorg = client.sql( + f'SELECT block_num, hash FROM anvil.blocks WHERE block_num >= {first_new} ORDER BY block_num' + ).to_arrow() + assert len(pre_reorg) == 5 + pre_hashes = pre_reorg.column('hash').to_pylist() + + # Revert to snapshot and mine past the old tip so the worker + # writes new blocks whose prev_hash won't match the stored hash, + # triggering fork detection and re-materialization. + evm_revert(anvil_url, snapshot_id) + mine_blocks(anvil_url, 10) + + # Wait for ampd to detect reorg and re-ingest + timeout = 30 + post_hashes = None + start = time.monotonic() + while time.monotonic() - start < timeout: + post_reorg = client.sql( + f'SELECT block_num, hash FROM anvil.blocks ' + f'WHERE block_num >= {first_new} AND block_num <= {last_new} ORDER BY block_num' + ).to_arrow() + if len(post_reorg) == 5: + post_hashes = post_reorg.column('hash').to_pylist() + if post_hashes != pre_hashes: + break + time.sleep(1) + else: + pytest.fail(f'ampd did not detect reorg within {timeout}s') + + assert pre_reorg.column('block_num').to_pylist() == post_reorg.column('block_num').to_pylist() + assert post_hashes != pre_hashes