From 86f4c6d64fd8282eb1a831be8c5c30f238133e16 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 11 Feb 2026 18:04:26 +0400 Subject: [PATCH 01/12] Add E2E helper utilities for process and config management Subprocess utilities for Anvil/ampd lifecycle, config generation from templates, and dataset management via AdminClient. --- tests/e2e/__init__.py | 0 tests/e2e/fixtures/config.toml.template | 10 ++ tests/e2e/fixtures/provider.toml.template | 3 + tests/e2e/helpers/__init__.py | 0 tests/e2e/helpers/config.py | 71 ++++++++++ tests/e2e/helpers/dataset_manager.py | 37 +++++ tests/e2e/helpers/process_manager.py | 158 ++++++++++++++++++++++ 7 files changed, 279 insertions(+) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/fixtures/config.toml.template create mode 100644 tests/e2e/fixtures/provider.toml.template create mode 100644 tests/e2e/helpers/__init__.py create mode 100644 tests/e2e/helpers/config.py create mode 100644 tests/e2e/helpers/dataset_manager.py create mode 100644 tests/e2e/helpers/process_manager.py diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/fixtures/config.toml.template b/tests/e2e/fixtures/config.toml.template new file mode 100644 index 0000000..a132b3a --- /dev/null +++ b/tests/e2e/fixtures/config.toml.template @@ -0,0 +1,10 @@ +manifests_dir = "{manifests_dir}" +providers_dir = "{providers_dir}" +data_dir = "{data_dir}" +max_mem_mb = 2000 +admin_api_addr = "127.0.0.1:{admin_port}" +flight_addr = "127.0.0.1:{flight_port}" +jsonl_addr = "127.0.0.1:{jsonl_port}" + +[metadata_db] +url = "{pg_url}" diff --git a/tests/e2e/fixtures/provider.toml.template b/tests/e2e/fixtures/provider.toml.template new file mode 100644 index 0000000..1ec7f89 --- /dev/null +++ b/tests/e2e/fixtures/provider.toml.template @@ -0,0 +1,3 @@ +kind = "evm-rpc" +url = "{anvil_url}" +network = "anvil" diff --git a/tests/e2e/helpers/__init__.py b/tests/e2e/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/helpers/config.py b/tests/e2e/helpers/config.py new file mode 100644 index 0000000..4571e98 --- /dev/null +++ b/tests/e2e/helpers/config.py @@ -0,0 +1,71 @@ +"""Config file generation for ampd E2E tests.""" + +import json +import shutil +from pathlib import Path + +FIXTURES_DIR = Path(__file__).parent.parent / 'fixtures' + + +def generate_ampd_config( + config_dir: Path, + pg_url: str, + admin_port: int, + flight_port: int, + jsonl_port: int, +) -> Path: + """Generate ampd config.toml and required subdirectories. + + Returns the path to config.toml. + """ + for subdir in ('data', 'manifests', 'providers', 'provider_sources'): + (config_dir / subdir).mkdir(parents=True, exist_ok=True) + + pg_url = pg_url.replace('postgresql+psycopg2://', 'postgresql://') + + template = (FIXTURES_DIR / 'config.toml.template').read_text() + config = template.format( + manifests_dir=config_dir / 'manifests', + providers_dir=config_dir / 'providers', + data_dir=config_dir / 'data', + admin_port=admin_port, + flight_port=flight_port, + jsonl_port=jsonl_port, + pg_url=pg_url, + ) + + config_path = config_dir / 'config.toml' + config_path.write_text(config) + return config_path + + +def generate_provider_toml(config_dir: Path, anvil_url: str) -> Path: + """Write provider source TOML for Anvil. + + Returns the path to the provider source file. + """ + provider_sources_dir = config_dir / 'provider_sources' + provider_sources_dir.mkdir(parents=True, exist_ok=True) + + template = (FIXTURES_DIR / 'provider.toml.template').read_text() + provider_toml = template.format(anvil_url=anvil_url) + + path = provider_sources_dir / 'anvil.toml' + path.write_text(provider_toml) + return path + + +def copy_anvil_manifest(config_dir: Path) -> Path: + """Copy anvil.json fixture into the manifests directory. + + Returns the path to the copied manifest. + """ + dest = config_dir / 'manifests' / 'anvil.json' + shutil.copy(FIXTURES_DIR / 'anvil.json', dest) + return dest + + +def load_anvil_manifest() -> dict: + """Load the anvil.json manifest as a dict.""" + with open(FIXTURES_DIR / 'anvil.json') as f: + return json.load(f) diff --git a/tests/e2e/helpers/dataset_manager.py b/tests/e2e/helpers/dataset_manager.py new file mode 100644 index 0000000..abdc6aa --- /dev/null +++ b/tests/e2e/helpers/dataset_manager.py @@ -0,0 +1,37 @@ +"""Dataset registration and deployment for E2E tests via Admin API.""" + +import json +import logging +import tomllib +from pathlib import Path + +from amp.admin import AdminClient + +logger = logging.getLogger(__name__) + + +class DatasetManager: + """Manages provider registration, dataset registration, and deployment.""" + + def __init__(self, admin_url: str): + self._admin = AdminClient(base_url=admin_url) + + def register_provider(self, name: str, provider_source_path: Path) -> None: + with open(provider_source_path, 'rb') as f: + config = tomllib.load(f) + config['name'] = name + self._admin._request('POST', '/providers', json=config) + logger.info(f'Registered provider {name}') + + def register_dataset(self, namespace: str, name: str, manifest_path: Path, tag: str) -> None: + with open(manifest_path) as f: + manifest = json.load(f) + self._admin.datasets.register(namespace, name, tag, manifest) + logger.info(f'Registered dataset {namespace}/{name}@{tag}') + + def deploy_dataset(self, namespace: str, name: str, version: str, end_block: str = 'latest') -> None: + self._admin.datasets.deploy(namespace, name, version, end_block=end_block) + logger.info(f'Deployed {namespace}/{name}@{version}') + + def close(self) -> None: + self._admin.close() diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py new file mode 100644 index 0000000..50794e5 --- /dev/null +++ b/tests/e2e/helpers/process_manager.py @@ -0,0 +1,158 @@ +"""Subprocess utilities for managing Anvil and ampd processes in E2E tests.""" + +import logging +import os +import socket +import subprocess +import time +from dataclasses import dataclass, field +from pathlib import Path + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class ManagedProcess: + """Wrapper around subprocess.Popen with cleanup.""" + + process: subprocess.Popen + name: str + _log_files: list = field(default_factory=list, repr=False) + + def terminate(self, timeout: int = 5) -> None: + """Terminate the process with a kill fallback.""" + if self.process.poll() is not None: + self._close_logs() + return + self.process.terminate() + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + logger.warning(f'{self.name} did not terminate in {timeout}s, killing') + self.process.kill() + self.process.wait(timeout=5) + self._close_logs() + + def _close_logs(self) -> None: + for f in self._log_files: + f.close() + self._log_files.clear() + + def is_alive(self) -> bool: + return self.process.poll() is None + + +def get_free_port() -> int: + """Bind to port 0 and return the assigned port.""" + with socket.socket() as s: + s.bind(('127.0.0.1', 0)) + return s.getsockname()[1] + + +def spawn_anvil(log_dir: Path) -> tuple[ManagedProcess, str]: + """Spawn an Anvil process on a random free port. + + Returns the managed process and the HTTP URL. + """ + port = get_free_port() + log_dir.mkdir(parents=True, exist_ok=True) + + stdout_f = open(log_dir / 'anvil_stdout.log', 'w') + stderr_f = open(log_dir / 'anvil_stderr.log', 'w') + + process = subprocess.Popen( + ['anvil', '--port', str(port)], + stdout=stdout_f, + stderr=stderr_f, + ) + + url = f'http://127.0.0.1:{port}' + _wait_for_jsonrpc(url, timeout=30) + + return ( + ManagedProcess(process=process, name='anvil', _log_files=[stdout_f, stderr_f]), + url, + ) + + +def _wait_for_jsonrpc(url: str, timeout: int = 30) -> None: + """Poll a JSON-RPC endpoint until it responds.""" + start = time.monotonic() + with httpx.Client() as client: + while time.monotonic() - start < timeout: + try: + resp = client.post( + url, + json={'jsonrpc': '2.0', 'method': 'eth_blockNumber', 'params': [], 'id': 1}, + ) + if resp.status_code == 200: + return + except httpx.ConnectError: + pass + time.sleep(0.2) + raise TimeoutError(f'JSON-RPC at {url} not ready after {timeout}s') + + +def mine_blocks(anvil_url: str, count: int) -> None: + """Mine blocks on an Anvil instance via JSON-RPC evm_mine.""" + with httpx.Client() as client: + for _ in range(count): + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'evm_mine', 'params': [], 'id': 1}, + ) + resp.raise_for_status() + + +def spawn_ampd(config_path: Path, log_dir: Path) -> ManagedProcess: + """Spawn ampd dev with the given config file.""" + log_dir.mkdir(parents=True, exist_ok=True) + + stdout_f = open(log_dir / 'ampd_stdout.log', 'w') + stderr_f = open(log_dir / 'ampd_stderr.log', 'w') + + process = subprocess.Popen( + ['ampd', 'dev'], + env={**os.environ, 'AMP_CONFIG': str(config_path)}, + stdout=stdout_f, + stderr=stderr_f, + ) + + return ManagedProcess(process=process, name='ampd', _log_files=[stdout_f, stderr_f]) + + +def wait_for_ampd_ready(admin_port: int, timeout: int = 60) -> None: + """Poll the Admin API until ampd is ready.""" + url = f'http://127.0.0.1:{admin_port}/datasets' + start = time.monotonic() + with httpx.Client() as client: + while time.monotonic() - start < timeout: + try: + resp = client.get(url) + if resp.status_code == 200: + logger.info(f'ampd ready after {time.monotonic() - start:.1f}s') + return + except httpx.ConnectError: + pass + time.sleep(0.5) + raise TimeoutError(f'ampd admin API not ready after {timeout}s') + + +def wait_for_data_ready(flight_port: int, timeout: int = 60) -> None: + """Poll Flight SQL until data is queryable.""" + from amp.client import Client + + start = time.monotonic() + while time.monotonic() - start < timeout: + try: + client = Client(query_url=f'grpc://127.0.0.1:{flight_port}') + table = client.sql('SELECT block_num FROM anvil.blocks LIMIT 1').to_arrow() + if len(table) > 0: + logger.info(f'Data ready after {time.monotonic() - start:.1f}s') + return + except Exception: + pass + time.sleep(2) + raise TimeoutError(f'Flight SQL data not queryable after {timeout}s') From 36752c8228b3e3f8b46d69f1c4b8625aaf808183 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Tue, 10 Mar 2026 16:35:02 +0400 Subject: [PATCH 02/12] Add E2E test fixtures and session infrastructure --- tests/e2e/conftest.py | 118 ++++++++++ tests/e2e/fixtures/anvil.json | 407 ++++++++++++++++++++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 tests/e2e/conftest.py create mode 100644 tests/e2e/fixtures/anvil.json diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 0000000..23cceed --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,118 @@ +"""E2E test fixtures: session-scoped ampd + Anvil infrastructure.""" + +import logging +import os +import shutil +import tempfile +from pathlib import Path + +import pytest + +from .helpers.config import copy_anvil_manifest, generate_ampd_config, generate_provider_toml +from .helpers.dataset_manager import DatasetManager +from .helpers.process_manager import ( + get_free_port, + mine_blocks, + spawn_ampd, + spawn_anvil, + wait_for_ampd_ready, + wait_for_data_ready, +) + +logger = logging.getLogger(__name__) + + +def _skip_if_missing_deps(): + missing = [b for b in ('anvil', 'ampd') if not shutil.which(b)] + if missing: + pytest.skip(f'Missing binaries: {", ".join(missing)}') + + +@pytest.fixture(scope='session') +def e2e_temp_dir(): + d = tempfile.mkdtemp(prefix='amp_e2e_') + yield Path(d) + shutil.rmtree(d, ignore_errors=True) + + +@pytest.fixture(scope='session') +def e2e_ports(): + return { + 'admin': get_free_port(), + 'flight': get_free_port(), + 'jsonl': get_free_port(), + } + + +@pytest.fixture(scope='session') +def e2e_postgres_url(): + _skip_if_missing_deps() + + if 'DOCKER_HOST' not in os.environ: + colima_socket = Path.home() / '.colima' / 'default' / 'docker.sock' + if colima_socket.exists(): + os.environ['DOCKER_HOST'] = f'unix://{colima_socket}' + + if 'TESTCONTAINERS_RYUK_DISABLED' not in os.environ: + os.environ['TESTCONTAINERS_RYUK_DISABLED'] = 'true' + + from testcontainers.postgres import PostgresContainer + + with PostgresContainer('postgres:16') as pg: + yield pg.get_connection_url() + + +@pytest.fixture(scope='session') +def e2e_anvil(e2e_temp_dir): + log_dir = e2e_temp_dir / 'logs' + proc, url = spawn_anvil(log_dir) + mine_blocks(url, 10) + yield proc, url + proc.terminate() + + +@pytest.fixture(scope='session') +def e2e_ampd_config(e2e_temp_dir, e2e_postgres_url, e2e_anvil, e2e_ports): + _, anvil_url = e2e_anvil + config_path = generate_ampd_config( + e2e_temp_dir, + e2e_postgres_url, + e2e_ports['admin'], + e2e_ports['flight'], + e2e_ports['jsonl'], + ) + provider_path = generate_provider_toml(e2e_temp_dir, anvil_url) + manifest_path = copy_anvil_manifest(e2e_temp_dir) + return config_path, provider_path, manifest_path + + +@pytest.fixture(scope='session') +def e2e_ampd(e2e_temp_dir, e2e_ampd_config, e2e_ports): + config_path, _, _ = e2e_ampd_config + log_dir = e2e_temp_dir / 'logs' + proc = spawn_ampd(config_path, log_dir) + wait_for_ampd_ready(e2e_ports['admin']) + yield proc + proc.terminate() + + +@pytest.fixture(scope='session') +def e2e_dataset(e2e_ampd, e2e_ampd_config, e2e_ports): + _, provider_path, manifest_path = e2e_ampd_config + admin_url = f'http://127.0.0.1:{e2e_ports["admin"]}' + + manager = DatasetManager(admin_url) + try: + manager.register_provider('anvil', provider_path) + manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') + manager.deploy_dataset('_', 'anvil', '0.0.1') + wait_for_data_ready(e2e_ports['flight']) + finally: + manager.close() + + +@pytest.fixture(scope='session') +def e2e_client(e2e_dataset, e2e_ports): + from amp.client import Client + + return Client(query_url=f'grpc://127.0.0.1:{e2e_ports["flight"]}') diff --git a/tests/e2e/fixtures/anvil.json b/tests/e2e/fixtures/anvil.json new file mode 100644 index 0000000..63fbe64 --- /dev/null +++ b/tests/e2e/fixtures/anvil.json @@ -0,0 +1,407 @@ +{ + "kind": "evm-rpc", + "network": "anvil", + "start_block": 0, + "finalized_blocks_only": false, + "tables": { + "blocks": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "parent_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "ommers_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "miner", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "state_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "transactions_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "receipt_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "logs_bloom", + "type": "Binary", + "nullable": false + }, + { + "name": "difficulty", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": false + }, + { + "name": "total_difficulty", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "gas_limit", + "type": "UInt64", + "nullable": false + }, + { + "name": "gas_used", + "type": "UInt64", + "nullable": false + }, + { + "name": "extra_data", + "type": "Binary", + "nullable": false + }, + { + "name": "mix_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "nonce", + "type": "UInt64", + "nullable": false + }, + { + "name": "base_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "withdrawals_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "blob_gas_used", + "type": "UInt64", + "nullable": true + }, + { + "name": "excess_blob_gas", + "type": "UInt64", + "nullable": true + }, + { + "name": "parent_beacon_root", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + } + ] + } + }, + "network": "anvil" + }, + "logs": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "tx_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "tx_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "log_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "address", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "topic0", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic1", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic2", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "topic3", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": true + }, + { + "name": "data", + "type": "Binary", + "nullable": false + } + ] + } + }, + "network": "anvil" + }, + "transactions": { + "schema": { + "arrow": { + "fields": [ + { + "name": "block_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "block_num", + "type": "UInt64", + "nullable": false + }, + { + "name": "timestamp", + "type": { + "Timestamp": [ + "Nanosecond", + "+00:00" + ] + }, + "nullable": false + }, + { + "name": "tx_index", + "type": "UInt32", + "nullable": false + }, + { + "name": "tx_hash", + "type": { + "FixedSizeBinary": 32 + }, + "nullable": false + }, + { + "name": "to", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": true + }, + { + "name": "nonce", + "type": "UInt64", + "nullable": false + }, + { + "name": "gas_price", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "gas_limit", + "type": "UInt64", + "nullable": false + }, + { + "name": "value", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": false + }, + { + "name": "input", + "type": "Binary", + "nullable": false + }, + { + "name": "v", + "type": "Binary", + "nullable": false + }, + { + "name": "r", + "type": "Binary", + "nullable": false + }, + { + "name": "s", + "type": "Binary", + "nullable": false + }, + { + "name": "gas_used", + "type": "UInt64", + "nullable": false + }, + { + "name": "type", + "type": "Int32", + "nullable": false + }, + { + "name": "max_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "max_priority_fee_per_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "max_fee_per_blob_gas", + "type": { + "Decimal128": [ + 38, + 0 + ] + }, + "nullable": true + }, + { + "name": "from", + "type": { + "FixedSizeBinary": 20 + }, + "nullable": false + }, + { + "name": "status", + "type": "Boolean", + "nullable": false + } + ] + } + }, + "network": "anvil" + } + } +} \ No newline at end of file From 11cf46ed5863bf63c8af7747767a988e4df76de5 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Tue, 10 Mar 2026 16:41:58 +0400 Subject: [PATCH 03/12] Add E2E query tests and wire up test runner --- Makefile | 6 +++++ pyproject.toml | 1 + tests/conftest.py | 1 + tests/e2e/test_queries.py | 55 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 tests/e2e/test_queries.py diff --git a/Makefile b/Makefile index 64572ad..7b86d2c 100644 --- a/Makefile +++ b/Makefile @@ -54,6 +54,11 @@ test-performance: @echo "🏇 Running performance tests..." $(PYTHON) pytest tests/performance/ -m "performance" -v --log-cli-level=ERROR +# E2E tests (require anvil, ampd, Docker) +test-e2e: + @echo "Running E2E tests..." + $(PYTHON) pytest tests/e2e/ -m "e2e" -v --log-cli-level=INFO -x --timeout=300 + # Code quality (using your ruff config) lint: @echo "🔍 Linting code..." @@ -133,6 +138,7 @@ help: @echo " make test-redis - Run Redis tests" @echo " make test-snowflake - Run Snowflake tests" @echo " make test-performance - Run performance tests" + @echo " make test-e2e - Run E2E tests (require anvil, ampd, Docker)" @echo " make lint - Lint code with ruff" @echo " make format - Format code with ruff" @echo " make test-setup - Start test databases" diff --git a/pyproject.toml b/pyproject.toml index 258bc24..98ffd92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,6 +123,7 @@ markers = [ "iceberg: Tests requiring Apache Iceberg", "snowflake: Tests requiring Snowflake", "cloud: Tests requiring cloud storage access", + "e2e: End-to-end tests (require anvil, ampd, Docker)", ] [tool.ruff] diff --git a/tests/conftest.py b/tests/conftest.py index f0bbc4d..900b316 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -461,6 +461,7 @@ def pytest_configure(config): config.addinivalue_line('markers', 'snowflake: Tests requiring Snowflake') config.addinivalue_line('markers', 'lmdb: Tests requiring LMDB') config.addinivalue_line('markers', 'slow: Slow tests (> 30 seconds)') + config.addinivalue_line('markers', 'e2e: End-to-end tests (require anvil, ampd, Docker)') # Utility fixtures for mocking diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py new file mode 100644 index 0000000..2d139f4 --- /dev/null +++ b/tests/e2e/test_queries.py @@ -0,0 +1,55 @@ +"""E2E query validation tests against real ampd + Anvil.""" + +import pyarrow as pa +import pytest + + +@pytest.mark.e2e +class TestBlocksQuery: + def test_query_blocks(self, e2e_client): + table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() + + assert len(table) == 11 + block_nums = table.column('block_num').to_pylist() + assert block_nums == list(range(11)) + assert all(h is not None for h in table.column('hash').to_pylist()) + + def test_blocks_schema(self, e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.blocks LIMIT 1').to_arrow() + schema = table.schema + + assert schema.field('block_num').type == pa.uint64() + assert schema.field('hash').type == pa.binary(32) + assert schema.field('parent_hash').type == pa.binary(32) + assert schema.field('miner').type == pa.binary(20) + assert schema.field('gas_used').type == pa.uint64() + assert hasattr(schema.field('timestamp').type, 'tz') + + +@pytest.mark.e2e +class TestTransactionsQuery: + def test_query_transactions(self, e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() + expected_cols = {'block_num', 'hash', 'from', 'to', 'value', 'gas_used'} + assert expected_cols.issubset(set(table.column_names)) + + +@pytest.mark.e2e +class TestLogsQuery: + def test_query_logs(self, e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.logs LIMIT 5').to_arrow() + expected_cols = {'block_num', 'log_index', 'address', 'data'} + assert expected_cols.issubset(set(table.column_names)) + + +@pytest.mark.e2e +class TestQueryBehavior: + def test_query_with_where_clause(self, e2e_client): + table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() + + assert len(table) == 5 + assert table.column('block_num').to_pylist() == [6, 7, 8, 9, 10] + + def test_query_with_aggregation(self, e2e_client): + table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == 11 From 7db4c04331c88131f499ee250cb1151af02231ba Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Tue, 10 Mar 2026 17:15:42 +0400 Subject: [PATCH 04/12] Use ampd managed Postgres and fix transaction column name --- tests/e2e/conftest.py | 25 +++---------------------- tests/e2e/fixtures/config.toml.template | 3 --- tests/e2e/helpers/config.py | 4 ---- tests/e2e/test_queries.py | 2 +- 4 files changed, 4 insertions(+), 30 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 23cceed..762932a 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,7 +1,6 @@ """E2E test fixtures: session-scoped ampd + Anvil infrastructure.""" import logging -import os import shutil import tempfile from pathlib import Path @@ -23,13 +22,14 @@ def _skip_if_missing_deps(): - missing = [b for b in ('anvil', 'ampd') if not shutil.which(b)] + missing = [b for b in ('anvil', 'ampd', 'initdb', 'postgres') if not shutil.which(b)] if missing: pytest.skip(f'Missing binaries: {", ".join(missing)}') @pytest.fixture(scope='session') def e2e_temp_dir(): + _skip_if_missing_deps() d = tempfile.mkdtemp(prefix='amp_e2e_') yield Path(d) shutil.rmtree(d, ignore_errors=True) @@ -44,24 +44,6 @@ def e2e_ports(): } -@pytest.fixture(scope='session') -def e2e_postgres_url(): - _skip_if_missing_deps() - - if 'DOCKER_HOST' not in os.environ: - colima_socket = Path.home() / '.colima' / 'default' / 'docker.sock' - if colima_socket.exists(): - os.environ['DOCKER_HOST'] = f'unix://{colima_socket}' - - if 'TESTCONTAINERS_RYUK_DISABLED' not in os.environ: - os.environ['TESTCONTAINERS_RYUK_DISABLED'] = 'true' - - from testcontainers.postgres import PostgresContainer - - with PostgresContainer('postgres:16') as pg: - yield pg.get_connection_url() - - @pytest.fixture(scope='session') def e2e_anvil(e2e_temp_dir): log_dir = e2e_temp_dir / 'logs' @@ -72,11 +54,10 @@ def e2e_anvil(e2e_temp_dir): @pytest.fixture(scope='session') -def e2e_ampd_config(e2e_temp_dir, e2e_postgres_url, e2e_anvil, e2e_ports): +def e2e_ampd_config(e2e_temp_dir, e2e_anvil, e2e_ports): _, anvil_url = e2e_anvil config_path = generate_ampd_config( e2e_temp_dir, - e2e_postgres_url, e2e_ports['admin'], e2e_ports['flight'], e2e_ports['jsonl'], diff --git a/tests/e2e/fixtures/config.toml.template b/tests/e2e/fixtures/config.toml.template index a132b3a..517daf8 100644 --- a/tests/e2e/fixtures/config.toml.template +++ b/tests/e2e/fixtures/config.toml.template @@ -5,6 +5,3 @@ max_mem_mb = 2000 admin_api_addr = "127.0.0.1:{admin_port}" flight_addr = "127.0.0.1:{flight_port}" jsonl_addr = "127.0.0.1:{jsonl_port}" - -[metadata_db] -url = "{pg_url}" diff --git a/tests/e2e/helpers/config.py b/tests/e2e/helpers/config.py index 4571e98..f61c696 100644 --- a/tests/e2e/helpers/config.py +++ b/tests/e2e/helpers/config.py @@ -9,7 +9,6 @@ def generate_ampd_config( config_dir: Path, - pg_url: str, admin_port: int, flight_port: int, jsonl_port: int, @@ -21,8 +20,6 @@ def generate_ampd_config( for subdir in ('data', 'manifests', 'providers', 'provider_sources'): (config_dir / subdir).mkdir(parents=True, exist_ok=True) - pg_url = pg_url.replace('postgresql+psycopg2://', 'postgresql://') - template = (FIXTURES_DIR / 'config.toml.template').read_text() config = template.format( manifests_dir=config_dir / 'manifests', @@ -31,7 +28,6 @@ def generate_ampd_config( admin_port=admin_port, flight_port=flight_port, jsonl_port=jsonl_port, - pg_url=pg_url, ) config_path = config_dir / 'config.toml' diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py index 2d139f4..0db64e7 100644 --- a/tests/e2e/test_queries.py +++ b/tests/e2e/test_queries.py @@ -30,7 +30,7 @@ def test_blocks_schema(self, e2e_client): class TestTransactionsQuery: def test_query_transactions(self, e2e_client): table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() - expected_cols = {'block_num', 'hash', 'from', 'to', 'value', 'gas_used'} + expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} assert expected_cols.issubset(set(table.column_names)) From 83c72db5af3245bfb702f4296d19fd065d488075 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Tue, 10 Mar 2026 18:20:15 +0400 Subject: [PATCH 05/12] Flatten test classes into plain functions --- tests/e2e/test_queries.py | 68 +++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py index 0db64e7..a784269 100644 --- a/tests/e2e/test_queries.py +++ b/tests/e2e/test_queries.py @@ -5,51 +5,51 @@ @pytest.mark.e2e -class TestBlocksQuery: - def test_query_blocks(self, e2e_client): - table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() +def test_query_blocks(e2e_client): + table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() - assert len(table) == 11 - block_nums = table.column('block_num').to_pylist() - assert block_nums == list(range(11)) - assert all(h is not None for h in table.column('hash').to_pylist()) + assert len(table) == 11 + block_nums = table.column('block_num').to_pylist() + assert block_nums == list(range(11)) + assert all(h is not None for h in table.column('hash').to_pylist()) - def test_blocks_schema(self, e2e_client): - table = e2e_client.sql('SELECT * FROM anvil.blocks LIMIT 1').to_arrow() - schema = table.schema - assert schema.field('block_num').type == pa.uint64() - assert schema.field('hash').type == pa.binary(32) - assert schema.field('parent_hash').type == pa.binary(32) - assert schema.field('miner').type == pa.binary(20) - assert schema.field('gas_used').type == pa.uint64() - assert hasattr(schema.field('timestamp').type, 'tz') +@pytest.mark.e2e +def test_blocks_schema(e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.blocks LIMIT 1').to_arrow() + schema = table.schema + + assert schema.field('block_num').type == pa.uint64() + assert schema.field('hash').type == pa.binary(32) + assert schema.field('parent_hash').type == pa.binary(32) + assert schema.field('miner').type == pa.binary(20) + assert schema.field('gas_used').type == pa.uint64() + assert hasattr(schema.field('timestamp').type, 'tz') @pytest.mark.e2e -class TestTransactionsQuery: - def test_query_transactions(self, e2e_client): - table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() - expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} - assert expected_cols.issubset(set(table.column_names)) +def test_query_transactions(e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() + expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} + assert expected_cols.issubset(set(table.column_names)) @pytest.mark.e2e -class TestLogsQuery: - def test_query_logs(self, e2e_client): - table = e2e_client.sql('SELECT * FROM anvil.logs LIMIT 5').to_arrow() - expected_cols = {'block_num', 'log_index', 'address', 'data'} - assert expected_cols.issubset(set(table.column_names)) +def test_query_logs(e2e_client): + table = e2e_client.sql('SELECT * FROM anvil.logs LIMIT 5').to_arrow() + expected_cols = {'block_num', 'log_index', 'address', 'data'} + assert expected_cols.issubset(set(table.column_names)) @pytest.mark.e2e -class TestQueryBehavior: - def test_query_with_where_clause(self, e2e_client): - table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() +def test_query_with_where_clause(e2e_client): + table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() - assert len(table) == 5 - assert table.column('block_num').to_pylist() == [6, 7, 8, 9, 10] + assert len(table) == 5 + assert table.column('block_num').to_pylist() == [6, 7, 8, 9, 10] - def test_query_with_aggregation(self, e2e_client): - table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() - assert table.column('cnt').to_pylist()[0] == 11 + +@pytest.mark.e2e +def test_query_with_aggregation(e2e_client): + table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == 11 From e6df0a3c36a7a0f3d303d1a5b893018d11711faf Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Tue, 10 Mar 2026 18:37:15 +0400 Subject: [PATCH 06/12] Add function-scoped amp_test_server fixture for isolated tests --- tests/e2e/conftest.py | 124 +++++++++++++++++++++++--------------- tests/e2e/test_queries.py | 7 +++ 2 files changed, 82 insertions(+), 49 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 762932a..721ae52 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,8 +1,9 @@ -"""E2E test fixtures: session-scoped ampd + Anvil infrastructure.""" +"""E2E test fixtures: session-scoped and function-scoped ampd + Anvil infrastructure.""" import logging import shutil import tempfile +from dataclasses import dataclass from pathlib import Path import pytest @@ -27,73 +28,98 @@ def _skip_if_missing_deps(): pytest.skip(f'Missing binaries: {", ".join(missing)}') -@pytest.fixture(scope='session') -def e2e_temp_dir(): - _skip_if_missing_deps() - d = tempfile.mkdtemp(prefix='amp_e2e_') - yield Path(d) - shutil.rmtree(d, ignore_errors=True) +@dataclass +class AmpTestServer: + """An ampd + Anvil stack with a connected client.""" + client: object + anvil_url: str + admin_url: str + ports: dict -@pytest.fixture(scope='session') -def e2e_ports(): - return { + +def _setup_amp_stack(num_blocks: int = 10): + """Spin up anvil + ampd + register + deploy. + + Returns (AmpTestServer, cleanup_fn). + """ + from amp.client import Client + + temp_dir = Path(tempfile.mkdtemp(prefix='amp_e2e_')) + log_dir = temp_dir / 'logs' + ports = { 'admin': get_free_port(), 'flight': get_free_port(), 'jsonl': get_free_port(), } + anvil_proc, anvil_url = spawn_anvil(log_dir) + mine_blocks(anvil_url, num_blocks) -@pytest.fixture(scope='session') -def e2e_anvil(e2e_temp_dir): - log_dir = e2e_temp_dir / 'logs' - proc, url = spawn_anvil(log_dir) - mine_blocks(url, 10) - yield proc, url - proc.terminate() - - -@pytest.fixture(scope='session') -def e2e_ampd_config(e2e_temp_dir, e2e_anvil, e2e_ports): - _, anvil_url = e2e_anvil config_path = generate_ampd_config( - e2e_temp_dir, - e2e_ports['admin'], - e2e_ports['flight'], - e2e_ports['jsonl'], + temp_dir, + ports['admin'], + ports['flight'], + ports['jsonl'], ) - provider_path = generate_provider_toml(e2e_temp_dir, anvil_url) - manifest_path = copy_anvil_manifest(e2e_temp_dir) - return config_path, provider_path, manifest_path - + generate_provider_toml(temp_dir, anvil_url) + manifest_path = copy_anvil_manifest(temp_dir) -@pytest.fixture(scope='session') -def e2e_ampd(e2e_temp_dir, e2e_ampd_config, e2e_ports): - config_path, _, _ = e2e_ampd_config - log_dir = e2e_temp_dir / 'logs' - proc = spawn_ampd(config_path, log_dir) - wait_for_ampd_ready(e2e_ports['admin']) - yield proc - proc.terminate() - - -@pytest.fixture(scope='session') -def e2e_dataset(e2e_ampd, e2e_ampd_config, e2e_ports): - _, provider_path, manifest_path = e2e_ampd_config - admin_url = f'http://127.0.0.1:{e2e_ports["admin"]}' + ampd_proc = spawn_ampd(config_path, log_dir) + wait_for_ampd_ready(ports['admin']) + admin_url = f'http://127.0.0.1:{ports["admin"]}' manager = DatasetManager(admin_url) try: - manager.register_provider('anvil', provider_path) + manager.register_provider('anvil', temp_dir / 'provider_sources' / 'anvil.toml') manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') manager.deploy_dataset('_', 'anvil', '0.0.1') - wait_for_data_ready(e2e_ports['flight']) + wait_for_data_ready(ports['flight']) finally: manager.close() + server = AmpTestServer( + client=Client(query_url=f'grpc://127.0.0.1:{ports["flight"]}'), + anvil_url=anvil_url, + admin_url=admin_url, + ports=ports, + ) + + def cleanup(): + ampd_proc.terminate() + anvil_proc.terminate() + shutil.rmtree(temp_dir, ignore_errors=True) + + return server, cleanup + + +# --------------------------------------------------------------------------- +# Session-scoped fixtures +# --------------------------------------------------------------------------- + @pytest.fixture(scope='session') -def e2e_client(e2e_dataset, e2e_ports): - from amp.client import Client +def e2e_server(): + _skip_if_missing_deps() + server, cleanup = _setup_amp_stack() + yield server + cleanup() - return Client(query_url=f'grpc://127.0.0.1:{e2e_ports["flight"]}') + +@pytest.fixture(scope='session') +def e2e_client(e2e_server): + return e2e_server.client + + +# --------------------------------------------------------------------------- +# Function-scoped fixture +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def amp_test_server(): + """Isolated ampd + Anvil stack for a single test.""" + _skip_if_missing_deps() + server, cleanup = _setup_amp_stack() + yield server + cleanup() diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py index a784269..c3ad41e 100644 --- a/tests/e2e/test_queries.py +++ b/tests/e2e/test_queries.py @@ -53,3 +53,10 @@ def test_query_with_where_clause(e2e_client): def test_query_with_aggregation(e2e_client): table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() assert table.column('cnt').to_pylist()[0] == 11 + + +@pytest.mark.e2e +def test_isolated_server(amp_test_server): + """Verify function-scoped stack works on its own ports.""" + table = amp_test_server.client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == 11 From 6959edb453d46404c0fe4c4501e1ad516045df4c Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 11 Mar 2026 15:44:00 +0400 Subject: [PATCH 07/12] Clean up E2E test infrastructure - Gitignore .amp and .amp_state runtime directories - Clean up temp dir on setup failure to avoid /tmp leaks - Remove unused load_anvil_manifest() and json import - Add trailing newline to anvil.json --- .gitignore | 2 ++ tests/e2e/conftest.py | 64 +++++++++++++++++++---------------- tests/e2e/fixtures/anvil.json | 2 +- tests/e2e/helpers/config.py | 7 ---- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index ba8d0fb..881b1fc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ .env .test.env *.env +.amp +.amp_state # Kubernetes secrets (NEVER commit these!) k8s/secret.yaml diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 721ae52..5ba458b 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -46,37 +46,41 @@ def _setup_amp_stack(num_blocks: int = 10): from amp.client import Client temp_dir = Path(tempfile.mkdtemp(prefix='amp_e2e_')) - log_dir = temp_dir / 'logs' - ports = { - 'admin': get_free_port(), - 'flight': get_free_port(), - 'jsonl': get_free_port(), - } - - anvil_proc, anvil_url = spawn_anvil(log_dir) - mine_blocks(anvil_url, num_blocks) - - config_path = generate_ampd_config( - temp_dir, - ports['admin'], - ports['flight'], - ports['jsonl'], - ) - generate_provider_toml(temp_dir, anvil_url) - manifest_path = copy_anvil_manifest(temp_dir) - - ampd_proc = spawn_ampd(config_path, log_dir) - wait_for_ampd_ready(ports['admin']) - - admin_url = f'http://127.0.0.1:{ports["admin"]}' - manager = DatasetManager(admin_url) try: - manager.register_provider('anvil', temp_dir / 'provider_sources' / 'anvil.toml') - manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') - manager.deploy_dataset('_', 'anvil', '0.0.1') - wait_for_data_ready(ports['flight']) - finally: - manager.close() + log_dir = temp_dir / 'logs' + ports = { + 'admin': get_free_port(), + 'flight': get_free_port(), + 'jsonl': get_free_port(), + } + + anvil_proc, anvil_url = spawn_anvil(log_dir) + mine_blocks(anvil_url, num_blocks) + + config_path = generate_ampd_config( + temp_dir, + ports['admin'], + ports['flight'], + ports['jsonl'], + ) + generate_provider_toml(temp_dir, anvil_url) + manifest_path = copy_anvil_manifest(temp_dir) + + ampd_proc = spawn_ampd(config_path, log_dir) + wait_for_ampd_ready(ports['admin']) + + admin_url = f'http://127.0.0.1:{ports["admin"]}' + manager = DatasetManager(admin_url) + try: + manager.register_provider('anvil', temp_dir / 'provider_sources' / 'anvil.toml') + manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') + manager.deploy_dataset('_', 'anvil', '0.0.1') + wait_for_data_ready(ports['flight']) + finally: + manager.close() + except Exception: + shutil.rmtree(temp_dir, ignore_errors=True) + raise server = AmpTestServer( client=Client(query_url=f'grpc://127.0.0.1:{ports["flight"]}'), diff --git a/tests/e2e/fixtures/anvil.json b/tests/e2e/fixtures/anvil.json index 63fbe64..10a2aeb 100644 --- a/tests/e2e/fixtures/anvil.json +++ b/tests/e2e/fixtures/anvil.json @@ -404,4 +404,4 @@ "network": "anvil" } } -} \ No newline at end of file +} diff --git a/tests/e2e/helpers/config.py b/tests/e2e/helpers/config.py index f61c696..128d03b 100644 --- a/tests/e2e/helpers/config.py +++ b/tests/e2e/helpers/config.py @@ -1,6 +1,5 @@ """Config file generation for ampd E2E tests.""" -import json import shutil from pathlib import Path @@ -59,9 +58,3 @@ def copy_anvil_manifest(config_dir: Path) -> Path: dest = config_dir / 'manifests' / 'anvil.json' shutil.copy(FIXTURES_DIR / 'anvil.json', dest) return dest - - -def load_anvil_manifest() -> dict: - """Load the anvil.json manifest as a dict.""" - with open(FIXTURES_DIR / 'anvil.json') as f: - return json.load(f) From 94dc9ec6bcbccef1c18b0abd2c420cef89791c97 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 11 Mar 2026 15:53:53 +0400 Subject: [PATCH 08/12] Set AMP_DIR per instance to isolate managed Postgres state --- tests/e2e/helpers/process_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py index 50794e5..b66108e 100644 --- a/tests/e2e/helpers/process_manager.py +++ b/tests/e2e/helpers/process_manager.py @@ -113,9 +113,10 @@ def spawn_ampd(config_path: Path, log_dir: Path) -> ManagedProcess: stdout_f = open(log_dir / 'ampd_stdout.log', 'w') stderr_f = open(log_dir / 'ampd_stderr.log', 'w') + amp_dir = config_path.parent / '.amp' process = subprocess.Popen( ['ampd', 'dev'], - env={**os.environ, 'AMP_CONFIG': str(config_path)}, + env={**os.environ, 'AMP_CONFIG': str(config_path), 'AMP_DIR': str(amp_dir)}, stdout=stdout_f, stderr=stderr_f, ) From 3ce59d6c5262a36228deec2edd8f0d223c8f60f2 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Wed, 11 Mar 2026 16:26:16 +0400 Subject: [PATCH 09/12] Add streaming metadata and continuous ingestion E2E tests --- tests/e2e/conftest.py | 4 +- tests/e2e/helpers/process_manager.py | 35 +++++++++++-- tests/e2e/test_streaming.py | 78 ++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 tests/e2e/test_streaming.py diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 5ba458b..ea9fad2 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -38,7 +38,7 @@ class AmpTestServer: ports: dict -def _setup_amp_stack(num_blocks: int = 10): +def _setup_amp_stack(num_blocks: int = 10, end_block: str = 'latest'): """Spin up anvil + ampd + register + deploy. Returns (AmpTestServer, cleanup_fn). @@ -74,7 +74,7 @@ def _setup_amp_stack(num_blocks: int = 10): try: manager.register_provider('anvil', temp_dir / 'provider_sources' / 'anvil.toml') manager.register_dataset('_', 'anvil', manifest_path, '0.0.1') - manager.deploy_dataset('_', 'anvil', '0.0.1') + manager.deploy_dataset('_', 'anvil', '0.0.1', end_block=end_block) wait_for_data_ready(ports['flight']) finally: manager.close() diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py index b66108e..efa3d99 100644 --- a/tests/e2e/helpers/process_manager.py +++ b/tests/e2e/helpers/process_manager.py @@ -106,6 +106,28 @@ def mine_blocks(anvil_url: str, count: int) -> None: resp.raise_for_status() +def evm_snapshot(anvil_url: str) -> str: + """Take a snapshot of the current anvil state. Returns snapshot ID.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'evm_snapshot', 'params': [], 'id': 1}, + ) + resp.raise_for_status() + return resp.json()['result'] + + +def evm_revert(anvil_url: str, snapshot_id: str) -> None: + """Revert anvil to a previous snapshot.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'evm_revert', 'params': [snapshot_id], 'id': 1}, + ) + resp.raise_for_status() + assert resp.json()['result'] is True, 'evm_revert failed' + + def spawn_ampd(config_path: Path, log_dir: Path) -> ManagedProcess: """Spawn ampd dev with the given config file.""" log_dir.mkdir(parents=True, exist_ok=True) @@ -143,17 +165,22 @@ def wait_for_ampd_ready(admin_port: int, timeout: int = 60) -> None: def wait_for_data_ready(flight_port: int, timeout: int = 60) -> None: """Poll Flight SQL until data is queryable.""" + wait_for_block(flight_port, 0, timeout=timeout) + + +def wait_for_block(flight_port: int, block_num: int, timeout: int = 60) -> None: + """Poll Flight SQL until a specific block number is available.""" from amp.client import Client start = time.monotonic() while time.monotonic() - start < timeout: try: client = Client(query_url=f'grpc://127.0.0.1:{flight_port}') - table = client.sql('SELECT block_num FROM anvil.blocks LIMIT 1').to_arrow() + table = client.sql(f'SELECT block_num FROM anvil.blocks WHERE block_num = {block_num}').to_arrow() if len(table) > 0: - logger.info(f'Data ready after {time.monotonic() - start:.1f}s') + logger.info(f'Block {block_num} available after {time.monotonic() - start:.1f}s') return except Exception: pass - time.sleep(2) - raise TimeoutError(f'Flight SQL data not queryable after {timeout}s') + time.sleep(1) + raise TimeoutError(f'Block {block_num} not available after {timeout}s') diff --git a/tests/e2e/test_streaming.py b/tests/e2e/test_streaming.py new file mode 100644 index 0000000..c706fd1 --- /dev/null +++ b/tests/e2e/test_streaming.py @@ -0,0 +1,78 @@ +"""E2E streaming and continuous ingestion tests against real ampd + Anvil.""" + +import pytest + +from .helpers.process_manager import mine_blocks, wait_for_block + + +@pytest.fixture() +def continuous_server(): + """Isolated ampd + Anvil stack with continuous deploy.""" + from .conftest import _setup_amp_stack, _skip_if_missing_deps + + _skip_if_missing_deps() + server, cleanup = _setup_amp_stack(num_blocks=10, end_block=None) + yield server + cleanup() + + +@pytest.mark.e2e +def test_continuous_ingestion(continuous_server): + """Verify ampd ingests new blocks in continuous mode.""" + anvil_url = continuous_server.anvil_url + flight_port = continuous_server.ports['flight'] + client = continuous_server.client + + # Verify initial 11 blocks (0-10) + table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == 11 + + # Mine 5 more blocks + mine_blocks(anvil_url, 5) + wait_for_block(flight_port, 15) + + # Verify all 16 blocks present + table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() + assert table.column('cnt').to_pylist()[0] == 16 + + +@pytest.mark.e2e +def test_streaming_metadata_parsing(continuous_server): + """Verify real server app_metadata parses into BatchMetadata with hashes.""" + from google.protobuf.any_pb2 import Any + from pyarrow import flight + + from amp import FlightSql_pb2 + from amp.streaming.types import BatchMetadata + + query = 'SELECT block_num, hash FROM anvil.blocks SETTINGS stream = true' + + command_query = FlightSql_pb2.CommandStatementQuery() + command_query.query = query + any_command = Any() + any_command.Pack(command_query) + cmd = any_command.SerializeToString() + + client = continuous_server.client + flight_descriptor = flight.FlightDescriptor.for_command(cmd) + info = client.conn.get_flight_info(flight_descriptor) + reader = client.conn.do_get(info.endpoints[0].ticket) + + # Read chunks until we get one with data rows (skip watermark/empty batches) + metadata = None + for _ in range(20): + chunk = reader.read_chunk() + if chunk.app_metadata is not None: + md = BatchMetadata.from_flight_data(chunk.app_metadata) + if md.ranges: + metadata = md + break + + reader.cancel() + + assert metadata is not None, 'Should receive at least one batch with block ranges' + for r in metadata.ranges: + assert r.network is not None + assert r.start >= 0 + assert r.end >= r.start + assert r.hash is not None, 'Server should send block hashes' From 875f28c1ac04da4859d813f3d81b2414b02a820f Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Thu, 12 Mar 2026 12:07:23 +0400 Subject: [PATCH 10/12] Add reorg detection test and consolidate E2E fixtures --- tests/e2e/conftest.py | 36 +++++++++---- tests/e2e/helpers/process_manager.py | 2 +- tests/e2e/test_streaming.py | 76 +++++++++++++++++++++++----- 3 files changed, 90 insertions(+), 24 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index ea9fad2..c814c38 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -46,6 +46,8 @@ def _setup_amp_stack(num_blocks: int = 10, end_block: str = 'latest'): from amp.client import Client temp_dir = Path(tempfile.mkdtemp(prefix='amp_e2e_')) + anvil_proc = None + ampd_proc = None try: log_dir = temp_dir / 'logs' ports = { @@ -79,6 +81,10 @@ def _setup_amp_stack(num_blocks: int = 10, end_block: str = 'latest'): finally: manager.close() except Exception: + if ampd_proc: + ampd_proc.terminate() + if anvil_proc: + anvil_proc.terminate() shutil.rmtree(temp_dir, ignore_errors=True) raise @@ -97,17 +103,22 @@ def cleanup(): return server, cleanup +def _amp_fixture(**stack_kwargs): + """Yield an AmpTestServer, handling skip-check and cleanup.""" + _skip_if_missing_deps() + server, cleanup = _setup_amp_stack(**stack_kwargs) + yield server + cleanup() + + # --------------------------------------------------------------------------- -# Session-scoped fixtures +# Session-scoped fixtures (read-only query tests) # --------------------------------------------------------------------------- @pytest.fixture(scope='session') def e2e_server(): - _skip_if_missing_deps() - server, cleanup = _setup_amp_stack() - yield server - cleanup() + yield from _amp_fixture() @pytest.fixture(scope='session') @@ -115,15 +126,20 @@ def e2e_client(e2e_server): return e2e_server.client +@pytest.fixture(scope='session') +def continuous_server(): + """Session-scoped ampd + Anvil with continuous deploy.""" + yield from _amp_fixture(end_block=None) + + # --------------------------------------------------------------------------- -# Function-scoped fixture +# Function-scoped fixtures (tests that mutate chain state) # --------------------------------------------------------------------------- @pytest.fixture() def amp_test_server(): """Isolated ampd + Anvil stack for a single test.""" - _skip_if_missing_deps() - server, cleanup = _setup_amp_stack() - yield server - cleanup() + yield from _amp_fixture() + + diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py index efa3d99..1e21cce 100644 --- a/tests/e2e/helpers/process_manager.py +++ b/tests/e2e/helpers/process_manager.py @@ -172,10 +172,10 @@ def wait_for_block(flight_port: int, block_num: int, timeout: int = 60) -> None: """Poll Flight SQL until a specific block number is available.""" from amp.client import Client + client = Client(query_url=f'grpc://127.0.0.1:{flight_port}') start = time.monotonic() while time.monotonic() - start < timeout: try: - client = Client(query_url=f'grpc://127.0.0.1:{flight_port}') table = client.sql(f'SELECT block_num FROM anvil.blocks WHERE block_num = {block_num}').to_arrow() if len(table) > 0: logger.info(f'Block {block_num} available after {time.monotonic() - start:.1f}s') diff --git a/tests/e2e/test_streaming.py b/tests/e2e/test_streaming.py index c706fd1..339f36b 100644 --- a/tests/e2e/test_streaming.py +++ b/tests/e2e/test_streaming.py @@ -1,19 +1,15 @@ -"""E2E streaming and continuous ingestion tests against real ampd + Anvil.""" - -import pytest - -from .helpers.process_manager import mine_blocks, wait_for_block +"""E2E streaming, continuous ingestion, and reorg detection tests.""" +import time -@pytest.fixture() -def continuous_server(): - """Isolated ampd + Anvil stack with continuous deploy.""" - from .conftest import _setup_amp_stack, _skip_if_missing_deps +import pytest - _skip_if_missing_deps() - server, cleanup = _setup_amp_stack(num_blocks=10, end_block=None) - yield server - cleanup() +from .helpers.process_manager import ( + evm_revert, + evm_snapshot, + mine_blocks, + wait_for_block, +) @pytest.mark.e2e @@ -76,3 +72,57 @@ def test_streaming_metadata_parsing(continuous_server): assert r.start >= 0 assert r.end >= r.start assert r.hash is not None, 'Server should send block hashes' + + +@pytest.fixture() +def reorg_server(): + """Isolated ampd + Anvil stack for reorg testing.""" + from .conftest import _amp_fixture + + yield from _amp_fixture(end_block=None) + + +@pytest.mark.e2e +def test_reorg_detection(reorg_server): + """Verify ampd detects a reorg when new blocks break the hash chain.""" + anvil_url = reorg_server.anvil_url + flight_port = reorg_server.ports['flight'] + client = reorg_server.client + + # Snapshot at block 10 + snapshot_id = evm_snapshot(anvil_url) + + # Mine blocks 11-15, wait for ingestion + mine_blocks(anvil_url, 5) + wait_for_block(flight_port, 15) + + # Capture pre-reorg hashes + pre_reorg = client.sql( + 'SELECT block_num, hash FROM anvil.blocks WHERE block_num >= 11 ORDER BY block_num' + ).to_arrow() + assert len(pre_reorg) == 5 + pre_hashes = pre_reorg.column('hash').to_pylist() + + # Revert to snapshot and mine past the old tip so the worker + # writes new blocks whose prev_hash won't match the stored hash, + # triggering fork detection and re-materialization. + evm_revert(anvil_url, snapshot_id) + mine_blocks(anvil_url, 10) # blocks 11-20, different hashes + + # Wait for ampd to detect reorg and re-ingest + timeout = 30 + start = time.monotonic() + while time.monotonic() - start < timeout: + post_reorg = client.sql( + 'SELECT block_num, hash FROM anvil.blocks WHERE block_num >= 11 AND block_num <= 15 ORDER BY block_num' + ).to_arrow() + if len(post_reorg) == 5: + post_hashes = post_reorg.column('hash').to_pylist() + if post_hashes != pre_hashes: + break + time.sleep(1) + else: + pytest.fail(f'ampd did not detect reorg within {timeout}s') + + assert pre_reorg.column('block_num').to_pylist() == post_reorg.column('block_num').to_pylist() + assert post_hashes != pre_hashes From 2492898075763963f53689df7e1ba01a53177b0e Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Thu, 12 Mar 2026 12:25:00 +0400 Subject: [PATCH 11/12] Refactor E2E test fixtures and helpers --- Makefile | 2 +- tests/e2e/conftest.py | 12 ++++++++---- tests/e2e/fixtures/anvil.json | 1 + tests/e2e/helpers/dataset_manager.py | 2 +- tests/e2e/helpers/process_manager.py | 13 ++++++------- tests/e2e/test_queries.py | 9 ++------- tests/e2e/test_streaming.py | 25 ++++++++----------------- 7 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Makefile b/Makefile index 7b86d2c..bf653fc 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ test-performance: # E2E tests (require anvil, ampd, Docker) test-e2e: @echo "Running E2E tests..." - $(PYTHON) pytest tests/e2e/ -m "e2e" -v --log-cli-level=INFO -x --timeout=300 + $(PYTHON) pytest tests/e2e/ -m "e2e" -v --log-cli-level=INFO -x # Code quality (using your ruff config) lint: diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index c814c38..dd16fe4 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -8,6 +8,8 @@ import pytest +from amp.client import Client + from .helpers.config import copy_anvil_manifest, generate_ampd_config, generate_provider_toml from .helpers.dataset_manager import DatasetManager from .helpers.process_manager import ( @@ -32,19 +34,17 @@ def _skip_if_missing_deps(): class AmpTestServer: """An ampd + Anvil stack with a connected client.""" - client: object + client: Client anvil_url: str admin_url: str ports: dict -def _setup_amp_stack(num_blocks: int = 10, end_block: str = 'latest'): +def _setup_amp_stack(num_blocks: int = 10, end_block: str | None = 'latest'): """Spin up anvil + ampd + register + deploy. Returns (AmpTestServer, cleanup_fn). """ - from amp.client import Client - temp_dir = Path(tempfile.mkdtemp(prefix='amp_e2e_')) anvil_proc = None ampd_proc = None @@ -143,3 +143,7 @@ def amp_test_server(): yield from _amp_fixture() +@pytest.fixture() +def reorg_server(): + """Isolated ampd + Anvil stack for reorg testing.""" + yield from _amp_fixture(end_block=None) diff --git a/tests/e2e/fixtures/anvil.json b/tests/e2e/fixtures/anvil.json index 10a2aeb..6a303e2 100644 --- a/tests/e2e/fixtures/anvil.json +++ b/tests/e2e/fixtures/anvil.json @@ -1,4 +1,5 @@ { + "$comment": "Generated with: ampctl manifest generate --kind evm-rpc --network anvil --start-block 0", "kind": "evm-rpc", "network": "anvil", "start_block": 0, diff --git a/tests/e2e/helpers/dataset_manager.py b/tests/e2e/helpers/dataset_manager.py index abdc6aa..e8fc83a 100644 --- a/tests/e2e/helpers/dataset_manager.py +++ b/tests/e2e/helpers/dataset_manager.py @@ -29,7 +29,7 @@ def register_dataset(self, namespace: str, name: str, manifest_path: Path, tag: self._admin.datasets.register(namespace, name, tag, manifest) logger.info(f'Registered dataset {namespace}/{name}@{tag}') - def deploy_dataset(self, namespace: str, name: str, version: str, end_block: str = 'latest') -> None: + def deploy_dataset(self, namespace: str, name: str, version: str, end_block: str | None = 'latest') -> None: self._admin.datasets.deploy(namespace, name, version, end_block=end_block) logger.info(f'Deployed {namespace}/{name}@{version}') diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py index 1e21cce..5e849ba 100644 --- a/tests/e2e/helpers/process_manager.py +++ b/tests/e2e/helpers/process_manager.py @@ -96,14 +96,13 @@ def _wait_for_jsonrpc(url: str, timeout: int = 30) -> None: def mine_blocks(anvil_url: str, count: int) -> None: - """Mine blocks on an Anvil instance via JSON-RPC evm_mine.""" + """Mine blocks on an Anvil instance via JSON-RPC anvil_mine.""" with httpx.Client() as client: - for _ in range(count): - resp = client.post( - anvil_url, - json={'jsonrpc': '2.0', 'method': 'evm_mine', 'params': [], 'id': 1}, - ) - resp.raise_for_status() + resp = client.post( + anvil_url, + json={'jsonrpc': '2.0', 'method': 'anvil_mine', 'params': [count, 0], 'id': 1}, + ) + resp.raise_for_status() def evm_snapshot(anvil_url: str) -> str: diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py index c3ad41e..1d31bfd 100644 --- a/tests/e2e/test_queries.py +++ b/tests/e2e/test_queries.py @@ -3,8 +3,9 @@ import pyarrow as pa import pytest +pytestmark = pytest.mark.e2e + -@pytest.mark.e2e def test_query_blocks(e2e_client): table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() @@ -14,7 +15,6 @@ def test_query_blocks(e2e_client): assert all(h is not None for h in table.column('hash').to_pylist()) -@pytest.mark.e2e def test_blocks_schema(e2e_client): table = e2e_client.sql('SELECT * FROM anvil.blocks LIMIT 1').to_arrow() schema = table.schema @@ -27,21 +27,18 @@ def test_blocks_schema(e2e_client): assert hasattr(schema.field('timestamp').type, 'tz') -@pytest.mark.e2e def test_query_transactions(e2e_client): table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} assert expected_cols.issubset(set(table.column_names)) -@pytest.mark.e2e def test_query_logs(e2e_client): table = e2e_client.sql('SELECT * FROM anvil.logs LIMIT 5').to_arrow() expected_cols = {'block_num', 'log_index', 'address', 'data'} assert expected_cols.issubset(set(table.column_names)) -@pytest.mark.e2e def test_query_with_where_clause(e2e_client): table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() @@ -49,13 +46,11 @@ def test_query_with_where_clause(e2e_client): assert table.column('block_num').to_pylist() == [6, 7, 8, 9, 10] -@pytest.mark.e2e def test_query_with_aggregation(e2e_client): table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() assert table.column('cnt').to_pylist()[0] == 11 -@pytest.mark.e2e def test_isolated_server(amp_test_server): """Verify function-scoped stack works on its own ports.""" table = amp_test_server.client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() diff --git a/tests/e2e/test_streaming.py b/tests/e2e/test_streaming.py index 339f36b..3540342 100644 --- a/tests/e2e/test_streaming.py +++ b/tests/e2e/test_streaming.py @@ -11,28 +11,27 @@ wait_for_block, ) +pytestmark = pytest.mark.e2e + -@pytest.mark.e2e def test_continuous_ingestion(continuous_server): """Verify ampd ingests new blocks in continuous mode.""" anvil_url = continuous_server.anvil_url flight_port = continuous_server.ports['flight'] client = continuous_server.client - # Verify initial 11 blocks (0-10) table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() - assert table.column('cnt').to_pylist()[0] == 11 + count_before = table.column('cnt').to_pylist()[0] + assert count_before >= 11 - # Mine 5 more blocks mine_blocks(anvil_url, 5) - wait_for_block(flight_port, 15) + wait_for_block(flight_port, count_before + 4) - # Verify all 16 blocks present table = client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() - assert table.column('cnt').to_pylist()[0] == 16 + count_after = table.column('cnt').to_pylist()[0] + assert count_after >= count_before + 5 -@pytest.mark.e2e def test_streaming_metadata_parsing(continuous_server): """Verify real server app_metadata parses into BatchMetadata with hashes.""" from google.protobuf.any_pb2 import Any @@ -74,15 +73,6 @@ def test_streaming_metadata_parsing(continuous_server): assert r.hash is not None, 'Server should send block hashes' -@pytest.fixture() -def reorg_server(): - """Isolated ampd + Anvil stack for reorg testing.""" - from .conftest import _amp_fixture - - yield from _amp_fixture(end_block=None) - - -@pytest.mark.e2e def test_reorg_detection(reorg_server): """Verify ampd detects a reorg when new blocks break the hash chain.""" anvil_url = reorg_server.anvil_url @@ -111,6 +101,7 @@ def test_reorg_detection(reorg_server): # Wait for ampd to detect reorg and re-ingest timeout = 30 + post_hashes = None start = time.monotonic() while time.monotonic() - start < timeout: post_reorg = client.sql( From 2fde1c75a4334f3de8a009cfa1415b2d2adb2c8b Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Thu, 12 Mar 2026 13:19:36 +0400 Subject: [PATCH 12/12] Add real transaction to E2E setup and clean up query tests --- tests/e2e/conftest.py | 8 ++++++++ tests/e2e/helpers/process_manager.py | 16 ++++++++++++++++ tests/e2e/test_queries.py | 25 +++++++++---------------- tests/e2e/test_streaming.py | 19 +++++++++++++------ 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index dd16fe4..45cf3f9 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -15,6 +15,7 @@ from .helpers.process_manager import ( get_free_port, mine_blocks, + send_eth, spawn_ampd, spawn_anvil, wait_for_ampd_ready, @@ -57,6 +58,13 @@ def _setup_amp_stack(num_blocks: int = 10, end_block: str | None = 'latest'): } anvil_proc, anvil_url = spawn_anvil(log_dir) + # Send a transaction so transactions table has data + send_eth( + anvil_url, + from_addr='0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266', + to_addr='0x70997970C51812dc3A010C7d01b50e0d17dc79C8', + value_wei=10**18, + ) mine_blocks(anvil_url, num_blocks) config_path = generate_ampd_config( diff --git a/tests/e2e/helpers/process_manager.py b/tests/e2e/helpers/process_manager.py index 5e849ba..1d4cf00 100644 --- a/tests/e2e/helpers/process_manager.py +++ b/tests/e2e/helpers/process_manager.py @@ -105,6 +105,22 @@ def mine_blocks(anvil_url: str, count: int) -> None: resp.raise_for_status() +def send_eth(anvil_url: str, from_addr: str, to_addr: str, value_wei: int) -> str: + """Send an ETH transfer on Anvil. Returns the transaction hash.""" + with httpx.Client() as client: + resp = client.post( + anvil_url, + json={ + 'jsonrpc': '2.0', + 'method': 'eth_sendTransaction', + 'params': [{'from': from_addr, 'to': to_addr, 'value': hex(value_wei)}], + 'id': 1, + }, + ) + resp.raise_for_status() + return resp.json()['result'] + + def evm_snapshot(anvil_url: str) -> str: """Take a snapshot of the current anvil state. Returns snapshot ID.""" with httpx.Client() as client: diff --git a/tests/e2e/test_queries.py b/tests/e2e/test_queries.py index 1d31bfd..cf62e1c 100644 --- a/tests/e2e/test_queries.py +++ b/tests/e2e/test_queries.py @@ -5,13 +5,16 @@ pytestmark = pytest.mark.e2e +# genesis + 1 tx block (auto-mined) + 10 mined blocks +EXPECTED_BLOCKS = 12 + def test_query_blocks(e2e_client): table = e2e_client.sql('SELECT block_num, hash FROM anvil.blocks ORDER BY block_num').to_arrow() - assert len(table) == 11 + assert len(table) == EXPECTED_BLOCKS block_nums = table.column('block_num').to_pylist() - assert block_nums == list(range(11)) + assert block_nums == list(range(EXPECTED_BLOCKS)) assert all(h is not None for h in table.column('hash').to_pylist()) @@ -29,29 +32,19 @@ def test_blocks_schema(e2e_client): def test_query_transactions(e2e_client): table = e2e_client.sql('SELECT * FROM anvil.transactions LIMIT 5').to_arrow() + assert len(table) >= 1 expected_cols = {'block_num', 'tx_hash', 'from', 'to', 'value', 'gas_used'} assert expected_cols.issubset(set(table.column_names)) -def test_query_logs(e2e_client): - table = e2e_client.sql('SELECT * FROM anvil.logs LIMIT 5').to_arrow() - expected_cols = {'block_num', 'log_index', 'address', 'data'} - assert expected_cols.issubset(set(table.column_names)) - - def test_query_with_where_clause(e2e_client): table = e2e_client.sql('SELECT block_num FROM anvil.blocks WHERE block_num > 5 ORDER BY block_num').to_arrow() - assert len(table) == 5 - assert table.column('block_num').to_pylist() == [6, 7, 8, 9, 10] - - -def test_query_with_aggregation(e2e_client): - table = e2e_client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() - assert table.column('cnt').to_pylist()[0] == 11 + assert len(table) == EXPECTED_BLOCKS - 6 + assert table.column('block_num').to_pylist() == list(range(6, EXPECTED_BLOCKS)) def test_isolated_server(amp_test_server): """Verify function-scoped stack works on its own ports.""" table = amp_test_server.client.sql('SELECT COUNT(*) AS cnt FROM anvil.blocks').to_arrow() - assert table.column('cnt').to_pylist()[0] == 11 + assert table.column('cnt').to_pylist()[0] == EXPECTED_BLOCKS diff --git a/tests/e2e/test_streaming.py b/tests/e2e/test_streaming.py index 3540342..2e935bf 100644 --- a/tests/e2e/test_streaming.py +++ b/tests/e2e/test_streaming.py @@ -79,16 +79,22 @@ def test_reorg_detection(reorg_server): flight_port = reorg_server.ports['flight'] client = reorg_server.client - # Snapshot at block 10 + # Find current chain tip + tip = client.sql('SELECT MAX(block_num) AS tip FROM anvil.blocks').to_arrow() + tip_block = tip.column('tip').to_pylist()[0] + snapshot_id = evm_snapshot(anvil_url) - # Mine blocks 11-15, wait for ingestion + # Mine 5 blocks past the tip, wait for ingestion mine_blocks(anvil_url, 5) - wait_for_block(flight_port, 15) + first_new = tip_block + 1 + last_new = tip_block + 5 + wait_for_block(flight_port, last_new) # Capture pre-reorg hashes pre_reorg = client.sql( - 'SELECT block_num, hash FROM anvil.blocks WHERE block_num >= 11 ORDER BY block_num' + f'SELECT block_num, hash FROM anvil.blocks ' + f'WHERE block_num >= {first_new} ORDER BY block_num' ).to_arrow() assert len(pre_reorg) == 5 pre_hashes = pre_reorg.column('hash').to_pylist() @@ -97,7 +103,7 @@ def test_reorg_detection(reorg_server): # writes new blocks whose prev_hash won't match the stored hash, # triggering fork detection and re-materialization. evm_revert(anvil_url, snapshot_id) - mine_blocks(anvil_url, 10) # blocks 11-20, different hashes + mine_blocks(anvil_url, 10) # Wait for ampd to detect reorg and re-ingest timeout = 30 @@ -105,7 +111,8 @@ def test_reorg_detection(reorg_server): start = time.monotonic() while time.monotonic() - start < timeout: post_reorg = client.sql( - 'SELECT block_num, hash FROM anvil.blocks WHERE block_num >= 11 AND block_num <= 15 ORDER BY block_num' + f'SELECT block_num, hash FROM anvil.blocks ' + f'WHERE block_num >= {first_new} AND block_num <= {last_new} ORDER BY block_num' ).to_arrow() if len(post_reorg) == 5: post_hashes = post_reorg.column('hash').to_pylist()