Conversation
- Classes: StreamingResultIterator, ReorgAwareStream, BlockRange, BatchMetadata, ResponseBatch, ResponseBatchType, ResponseBatchWithReorg, ResumeWatermark - Not yet wired up with user callable functions
- Add load_stream_continous() for streaming with reorg handling - Enhance LoadResult to be prepared to handle reorgs
- Add query_and_load_streaming() to Client - Use query_and_load_streaming() in QueryBuilder.load() if stream=True
- Add test_reorg_result_string_representation - Enhance test_all_loaders_implement_required_methods to check whether it has real implementation in each data loader - Remove now redundant test test_create_table_from_schema_not_just_pass
incrypto32
reviewed
Oct 20, 2025
Comment on lines
+536
to
+539
| iceberg_table = self._catalog.load_table(table_identifier) | ||
| except NoSuchTableError: | ||
| self.logger.warning(f"Table '{table_identifier}' does not exist, skipping reorg handling") | ||
| return |
Member
There was a problem hiding this comment.
Wouldn't a full table scan be problematic for very large datasets?
incrypto32
reviewed
Oct 20, 2025
Comment on lines
+715
to
+717
| row_mask = pa.array([j == i for j in range(current_table.num_rows)]) | ||
| keep_mask = pa.compute.and_(keep_mask, pa.compute.invert(row_mask)) | ||
| break |
Member
There was a problem hiding this comment.
This is very inn-efficient, crates huge arrays unneccesarily. A simple bool flag list would work here instead
incrypto32
approved these changes
Oct 20, 2025
Member
incrypto32
left a comment
There was a problem hiding this comment.
LGTM apart from the full table scans for Iceberg and Deltalake loader and the deletion logic in Deltalake loader.
I'll add the commits to fix that
Signed-off-by: Ford <ford@edgeandnode.com>
Signed-off-by: Ford <ford@edgeandnode.com>
Contributor
Author
|
I'm going to go ahead and merge this and consider the specific loader implementations to be in beta (needing optimization and hardening). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Reorg Aware Streaming Support for amp-python
Summary
This PR adds comprehensive blockchain reorganization (reorg) handling and streaming support to the amp-python client library. It enables real-time data streaming with automatic handling of blockchain reorganizations across all supported data loaders.
Key Features
1. Streaming Infrastructure
ResponseBatchWithReorgtype for streaming data and reorg eventsBlockRangemetadata tracking for multi-network support2. Enhanced Client API
load()method in QueryBuilder with streaming supportstreamparameter enables continuous data streaminghandle_reorgsparameter3. Universal Reorg Support
4. Metadata Architecture
_meta_block_rangescolumn across all loaders[{"network": "ethereum", "start": 100, "end": 110}]Changes by Component
Core Library (552 lines added)
src/amp/client.py: Enhanced QueryBuilder.load() with streaming supportsrc/amp/loaders/base.py: Addedload_stream_continuous()and_handle_reorg()src/amp/streaming/: New module with iterator, types, and reorg handlingData Loaders (740 lines added)
Testing (2,063 lines added)
Documentation (355 lines added)
Usage Example
Commits