Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 15 additions & 71 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use crate::codec::HeaderOnlyBlock;
use crate::data_source::DataSourceTemplate;
use crate::data_source::UnresolvedDataSourceTemplate;
use crate::ingestor::PollingBlockIngestor;
use crate::json_block::EthereumJsonBlock;
use crate::network::EthereumNetworkAdapters;
use crate::polling_block_stream::PollingBlockStream;
use crate::runtime::runtime_adapter::eth_call_gas;
Expand All @@ -66,7 +65,6 @@ use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
TriggersAdapterWrapper,
};

/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];

Expand Down Expand Up @@ -1076,55 +1074,26 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.ancestor_block(ptr.clone(), offset, root.clone())
.await?;

// First check if we have the ancestor in cache and can deserialize it.
// The cached JSON can be in one of three formats:
// 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]}
// 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data
// 3. Legacy direct: block fields at root level {hash, number, transactions, ...}
// We need full format with receipts for ancestor_block (used for trigger processing).
// Use full blocks (with receipts) directly from cache.
// Light blocks (no receipts) need to be fetched from Firehose/RPC.
let block_ptr = match cached {
Some((json, ptr)) => {
let json_block = EthereumJsonBlock::new(json);
if json_block.is_shallow() {
trace!(
self.logger,
"Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.",
ptr.number,
ptr.hash_hex(),
);
ptr
} else if json_block.is_legacy_format() {
Some((cached_block, ptr)) => match cached_block.into_full_block() {
Some(block) => {
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
calls: None,
})));
}
None => {
trace!(
self.logger,
"Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.",
"Cached block #{} {} is light (no receipts). Falling back to Firehose/RPC.",
ptr.number,
ptr.hash_hex(),
);
ptr
} else {
match json_block.into_full_block() {
Ok(block) => {
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
calls: None,
})));
}
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached ancestor block #{} {} (offset {} from #{}): {}. \
Falling back to Firehose/RPC.",
ptr.number,
ptr.hash_hex(),
offset,
ptr_for_log.number,
e
);
ptr
}
}
}
}
},
None => {
// Cache miss - fall back to walking the chain via parent_ptr() calls.
// This provides resilience when the block cache is empty (e.g., after truncation).
Expand Down Expand Up @@ -1179,35 +1148,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
let block = match self.chain_client.as_ref() {
ChainClient::Firehose(endpoints) => {
let chain_store = self.chain_store.cheap_clone();
// First try to get the block from the store
// See ancestor_block() for documentation of the 3 cached JSON formats.
// First try to get the block from the store (typed cache)
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
if let Some(cached_json) = blocks.into_iter().next() {
let json_block = EthereumJsonBlock::new(cached_json);
if json_block.is_shallow() {
trace!(
self.logger,
"Cached block #{} {} is shallow. Falling back to Firehose.",
block.number,
block.hash_hex(),
);
} else {
match json_block.into_light_block() {
Ok(light_block) => {
return Ok(light_block.parent_ptr());
}
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached block #{} {}: {}. \
Falling back to Firehose.",
block.number,
block.hash_hex(),
e
);
}
}
}
if let Some(cached_block) = blocks.into_iter().next() {
return Ok(cached_block.light_block().parent_ptr());
}
}

Expand Down
19 changes: 1 addition & 18 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ use crate::adapter::EthereumRpcError;
use crate::adapter::ProviderStatus;
use crate::call_helper::interpret_eth_call_error;
use crate::chain::BlockFinality;
use crate::json_block::EthereumJsonBlock;
use crate::trigger::{LogPosition, LogRef};
use crate::Chain;
use crate::NodeCapabilities;
Expand Down Expand Up @@ -1641,23 +1640,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
.map_err(|e| error!(&logger, "Error accessing block cache {}", e))
.unwrap_or_default()
.into_iter()
.filter_map(|value| {
let json_block = EthereumJsonBlock::new(value);
if json_block.is_shallow() {
return None;
}
json_block
.into_light_block()
.map_err(|e| {
warn!(
&logger,
"Failed to deserialize cached block: {}. Block will be re-fetched from RPC.",
e
);
})
.ok()
})
.map(Arc::new)
.map(|cached| Arc::new(cached.into_light_block()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This then becomes .map(CachedBlock::into_light_block)

.collect();

let missing_blocks = Vec::from_iter(
Expand Down
2 changes: 0 additions & 2 deletions chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ mod data_source;
mod env;
mod ethereum_adapter;
mod ingestor;
mod json_block;
mod json_patch;
mod polling_block_stream;
pub mod runtime;
mod transport;
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::json_patch;
use alloy::transports::{TransportError, TransportErrorKind, TransportFut};
use graph::components::ethereum::json_patch;
use graph::components::network_provider::ProviderName;
use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels};
use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
Expand Down
8 changes: 6 additions & 2 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
bail,
components::{
ethereum::CachedBlock,
link_resolver::LinkResolver,
network_provider::ChainName,
store::{
Expand Down Expand Up @@ -527,15 +528,18 @@ impl ChainStore for MockChainStore {
) -> Result<Option<B256>, Error> {
unimplemented!()
}
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error> {
unimplemented!()
}
async fn blocks_as_json(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
unimplemented!()
}
async fn ancestor_block(
self: Arc<Self>,
_block_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<(Value, BlockPtr)>, Error> {
) -> Result<Option<(CachedBlock, BlockPtr)>, Error> {
unimplemented!()
}
async fn cleanup_cached_blocks(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graph::prelude::serde_json::{self as json, Value};
use graph::prelude::{EthereumBlock, LightEthereumBlock};
use serde_json::{self as json, Value};

use crate::json_patch;
use super::json_patch;
use super::types::{CachedBlock, EthereumBlock, LightEthereumBlock};

#[derive(Debug)]
pub struct EthereumJsonBlock(Value);
Expand Down Expand Up @@ -49,4 +49,18 @@ impl EthereumJsonBlock {
json_patch::patch_block_transactions(&mut inner);
json::from_value(inner)
}

/// Tries to deserialize into a `CachedBlock`. Uses `transaction_receipts`
/// presence to decide between full and light block, avoiding a JSON clone.
pub fn try_into_cached_block(self) -> Option<CachedBlock> {
let has_receipts = self
.0
.get("transaction_receipts")
.is_some_and(|v| !v.is_null());
if has_receipts {
self.into_full_block().ok().map(CachedBlock::Full)
} else {
self.into_light_block().ok().map(CachedBlock::Light)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
//!
//! Also used by `PatchingHttp` for chains that don't support EIP-2718 typed transactions.

use graph::prelude::serde_json::Value;
use serde_json::Value;

pub(crate) fn patch_type_field(obj: &mut Value) -> bool {
pub fn patch_type_field(obj: &mut Value) -> bool {
if let Value::Object(map) = obj {
if !map.contains_key("type") {
map.insert("type".to_string(), Value::String("0x0".to_string()));
Expand All @@ -19,7 +19,7 @@ pub(crate) fn patch_type_field(obj: &mut Value) -> bool {
false
}

pub(crate) fn patch_block_transactions(block: &mut Value) -> bool {
pub fn patch_block_transactions(block: &mut Value) -> bool {
let Some(txs) = block.get_mut("transactions").and_then(|t| t.as_array_mut()) else {
return false;
};
Expand All @@ -30,7 +30,7 @@ pub(crate) fn patch_block_transactions(block: &mut Value) -> bool {
patched
}

pub(crate) fn patch_receipts(result: &mut Value) -> bool {
pub fn patch_receipts(result: &mut Value) -> bool {
match result {
Value::Object(_) => patch_type_field(result),
Value::Array(arr) => {
Expand All @@ -47,7 +47,7 @@ pub(crate) fn patch_receipts(result: &mut Value) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use graph::prelude::serde_json::json;
use serde_json::json;

#[test]
fn patch_type_field_adds_missing_type() {
Expand Down
7 changes: 5 additions & 2 deletions graph/src/components/ethereum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
pub mod json_block;
pub mod json_patch;
mod network;
mod types;

pub use self::json_block::EthereumJsonBlock;
pub use self::network::AnyNetworkBare;
pub use self::types::{
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls,
EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, CachedBlock, EthereumBlock,
EthereumBlockWithCalls, EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
};

// Re-export Alloy network types for convenience
Expand Down
56 changes: 55 additions & 1 deletion graph/src/components/ethereum/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ use crate::{
prelude::BlockNumber,
};

use super::json_block::EthereumJsonBlock;

pub type AnyTransaction = Transaction<AnyTxEnvelope>;
pub type AnyBlock = Block<AnyTransaction, Header<AnyHeader>>;
/// Like alloy's `AnyTransactionReceipt` but without the `WithOtherFields` wrapper,
/// avoiding `#[serde(flatten)]` overhead during deserialization.
pub type AnyTransactionReceiptBare = TransactionReceipt<AnyReceiptEnvelope<Log>>;

#[allow(dead_code)]
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LightEthereumBlock(AnyBlock);

impl Default for LightEthereumBlock {
Expand Down Expand Up @@ -259,3 +261,55 @@ impl<'a> From<&'a EthereumCall> for BlockPtr {
BlockPtr::from((call.block_hash, call.block_number))
}
}

/// Typed cached block for Ethereum. Stores the deserialized block so that
/// repeated reads from the in-memory cache avoid `serde_json::from_value()`.
#[derive(Clone, Debug)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to get rid of Clone for this and all other block structs; that's probably a bigger puzzle and requires using Arc judiciously in various data structures, i.e., something for another PR; but conceptually, blocks are readonly and it should therefore be possible to Arc them.

#[allow(clippy::large_enum_variant)]
pub enum CachedBlock {
Full(EthereumBlock),
Light(LightEthereumBlock),
}

impl CachedBlock {
pub fn light_block(&self) -> &LightEthereumBlock {
match self {
CachedBlock::Full(block) => &block.block,
CachedBlock::Light(block) => block,
}
}

pub fn into_light_block(self) -> LightEthereumBlock {
match self {
CachedBlock::Full(block) => block.block.as_ref().clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change this to

pub fn into_light_block(self) -> Arc<LightEthereumBlock> {
        match self {
            CachedBlock::Full(block) => block.block,
            CachedBlock::Light(block) => Arc::new(block),
        }
}

you can avoid a deep clone of the block

CachedBlock::Light(block) => block,
}
}

pub fn into_full_block(self) -> Option<EthereumBlock> {
match self {
CachedBlock::Full(block) => Some(block),
CachedBlock::Light(_) => None,
}
}

pub fn from_json(value: serde_json::Value) -> Option<Self> {
let json_block = EthereumJsonBlock::new(value);
if json_block.is_shallow() {
return None;
}
json_block.try_into_cached_block()
}

pub fn timestamp(&self) -> Option<u64> {
Some(self.light_block().timestamp_u64())
}

pub fn parent_ptr(&self) -> Option<BlockPtr> {
self.light_block().parent_ptr()
}

pub fn ptr(&self) -> BlockPtr {
self.light_block().block_ptr()
}
}
11 changes: 8 additions & 3 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use super::*;
use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor};
use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
use crate::components::ethereum::CachedBlock;
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::network_provider::ChainName;
use crate::components::server::index_node::VersionInfo;
Expand Down Expand Up @@ -553,8 +554,12 @@ pub trait ChainStore: ChainHeadStore {
ancestor_count: BlockNumber,
) -> Result<Option<B256>, Error>;

/// Returns the blocks present in the store.
async fn blocks(
/// Returns the blocks present in the store as typed cached blocks.
async fn blocks(self: Arc<Self>, hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error>;

/// Returns blocks as raw JSON. Used by callers that need the original
/// JSON representation (e.g., GraphQL block queries, CLI tools).
async fn blocks_as_json(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try and get rid of this one, but that's also for another PR

self: Arc<Self>,
hashes: Vec<BlockHash>,
) -> Result<Vec<serde_json::Value>, Error>;
Expand Down Expand Up @@ -584,7 +589,7 @@ pub trait ChainStore: ChainHeadStore {
block_ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<(serde_json::Value, BlockPtr)>, Error>;
) -> Result<Option<(CachedBlock, BlockPtr)>, Error>;

/// Remove old blocks from the cache we maintain in the database and
/// return a pair containing the number of the oldest block retained
Expand Down
Loading