Skip to content

Conversation

@jns-ps
Copy link
Contributor

@jns-ps jns-ps commented Sep 16, 2025

closes #423

Summary by CodeRabbit

  • New Features

    • Added AWS S3 data-availability backend for light clients and full nodes (WORM, cross-region, legal-hold support).
    • New CLI flag --allow_mock_proofs to control mock-proof verification.
  • Refactor

    • Unified startup via LightDataAvailabilityLayer::start(); DA layers now start before syncing.
    • Added cancellation support and graceful shutdown across AWS, Celestia, and In-Memory backends; background height/produce tasks updated.
  • Documentation

    • README updated to list multiple DA backends (includes Celestia) and AWS usage examples.
  • Chores

    • Added AWS-related dependencies, feature gating, and workspace manifest updates; updated tests for new flows.

@jns-ps jns-ps requested a review from sebasti810 September 16, 2025 11:01
@jns-ps jns-ps self-assigned this Sep 16, 2025
@jns-ps jns-ps added the enhancement New feature or request label Sep 16, 2025
@vercel
Copy link

vercel bot commented Sep 16, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
prism Ignored Ignored Preview Sep 22, 2025 10:58am

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 16, 2025

Walkthrough

Adds a feature-gated AWS S3 Data Availability backend (configs, client, light/full-node layers, WORM checks, S3 I/O); migrates DA startup to LightDataAvailabilityLayer; threads CancellationToken through DA, factory, tests, and CLI; adds --allow_mock_proofs flag and manifest dependency updates.

Changes

Cohort / File(s) Summary
AWS DA module (new)
crates/da/src/aws/mod.rs, crates/da/src/aws/client.rs, crates/da/src/aws/config.rs, crates/da/src/aws/full_node.rs, crates/da/src/aws/light_client.rs
New AWS S3 DA backend: config types, credential strategies, S3 client, WORM/Object Lock verification, upload/fetch logic, light & full-node layers, observability, and public exports (feature-gated).
DA crate root & traits
crates/da/src/lib.rs
Moved startup semantics: added async start() to LightDataAvailabilityLayer, removed start/initialize_sync_target from DataAvailabilityLayer; VerifiableStateTransition now requires Send+Sync; added aws module gating.
Factory & wiring
crates/da/src/factory.rs, crates/tests/src/lib.rs, crates/node_types/uniffi-lightclient/src/lib.rs
Added Aws variants to LightClientDAConfig/FullNodeDAConfig (feature-gated); create_*_da_layer signatures now accept CancellationToken; routes AWS/Celestia/InMemory creation with cancellation and retry/backoff.
In-memory DA
crates/da/src/memory.rs, crates/node_types/prover/src/prover/tests/mod.rs
Added CancellationToken and started flag; constructors accept CancellationToken; produce_blocks is cancellation-aware; startup moved to LightDataAvailabilityLayer::start; added subscribe_blocks.
Celestia DA refactor
crates/da/src/celestia/full_node.rs, crates/da/src/celestia/light_client.rs
Threaded CancellationToken through CelestiaConnection; moved start to LightDataAvailabilityLayer; lazy light-client node init; event channel accessor; added fetch helper with retries; removed initialize_sync_target.
Light client runtime
crates/node_types/lightclient/src/lightclient.rs
Calls da.start().await at run() start to ensure DA layer is started before sync tasks.
CLI changes
crates/cli/src/cli_args/commands.rs, crates/cli/src/apply_args/commands.rs, crates/cli/src/main.rs, crates/cli/src/tests/mod.rs
Added CLI flag --allow_mock_proofs (propagated into light client config); propagate CancellationToken into DA layer creation callsites; tests for flag/env precedence.
Manifests
Cargo.toml, crates/da/Cargo.toml, crates/events/Cargo.toml
Workspace and crate manifest updates: added aws-config, aws-sdk-s3, aws-sdk-sts (aws feature), futures and tokio-util wiring; tokio-util added to events.
Docs & misc
README.md, CLAUDE.md
README text clarifies DA backends (mentions Celestia); CLAUDE.md added commit guideline line.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App
  participant Factory
  participant AWSLight as AwsLightDataAvailabilityLayer
  participant AWSClient as AwsDataAvailabilityClient
  participant S3

  App->>Factory: create_light_client_da_layer(cfg, cancellation_token)
  Factory->>AWSLight: AwsLightDataAvailabilityLayer::new(cfg, cancellation_token)
  AWSLight->>AWSClient: AwsDataAvailabilityClient::new_from_light_da_config(cfg)
  AWSClient->>S3: Verify buckets & Object Lock / GET metadata
  S3-->>AWSClient: OK / 404 / Errors
  AWSLight-->>App: returns layer
  App->>AWSLight: start()
  activate AWSLight
  loop every block_time (poll)
    AWSLight->>AWSClient: fetch_height()
    AWSClient->>S3: GET metadata/info.json
    S3-->>AWSClient: height | 404
    AWSClient-->>AWSLight: Option<height>
    alt height increased
      AWSLight->>App: PrismEvent::UpdateDAHeight
    end
    AWSLight->>AWSLight: check cancellation_token
  end
  deactivate AWSLight
Loading
sequenceDiagram
  autonumber
  participant App
  participant Factory
  participant AWSFull as AwsFullNodeDataAvailabilityLayer
  participant AWSClient as AwsDataAvailabilityClient
  participant S3

  App->>Factory: create_full_node_da_layer(cfg, cancellation_token)
  Factory->>AWSFull: AwsFullNodeDataAvailabilityLayer::new(cfg, cancellation_token)
  AWSFull->>AWSClient: AwsDataAvailabilityClient::new_from_full_da_config(cfg)
  AWSClient->>S3: Verify epochs/tx/metadata buckets + Object Lock
  S3-->>AWSClient: OK / Errors
  AWSFull-->>App: returns layer
  App->>AWSFull: start()
  activate AWSFull
  loop every block_time (produce_blocks)
    AWSFull->>AWSFull: increment height, reset tx offset
    AWSFull->>S3: PUT metadata/info.json (non-WORM), PUT epochs/tx (WORM)
    AWSFull->>App: PrismEvent::UpdateDAHeight
    AWSFull->>AWSClient: submit_finalized_epoch / submit_transactions
  end
  deactivate AWSFull
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60–90 minutes

Possibly related PRs

Suggested reviewers

  • sebasti810
  • distractedm1nd

Poem

I hop through buckets, keys, and log,
I guard each epoch in a WORM-bound bog.
I poll, I publish, a token in paw,
Uploads and heights — a rabbit's awe.
Bytes in burrows, events I log. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Out of Scope Changes Check ⚠️ Warning The PR includes substantial cross-cutting API and behavioral changes beyond the AWS/WORM evaluation objective, such as introducing CancellationToken across DA constructors and callers (factory, CLI, tests, node_types), altering public trait surfaces (adding LightDataAvailabilityLayer::start, removing DataAvailabilityLayer::start and initialize_sync_target), tightening trait bounds (VerifiableStateTransition now requires Sync), and refactoring Celestia and InMemory startup flows; these touch many non-AWS modules and consumers. Those modifications constitute breaking or wide-impact refactors that are not strictly required to "evaluate AWS S3 WORM" and will force broad client updates and a larger review surface. Because of that, the PR mixes feature addition with unrelated API churn and behavioral changes. Split the work: isolate the AWS S3/WORM implementation (aws/*, config, client, aws wiring in factory) into one focused PR and move the API/cancellation-token and trait-surface refactors (Light/DataAvailability trait changes, Celestia/InMemory changes, caller updates) into a separate, clearly-documented breaking-change PR with a migration guide. If splitting is infeasible, add an explicit migration section in this PR describing all public API changes, list affected call sites, include adapter shims or deprecation paths, and run full downstream CI (including integration tests with LocalStack or AWS) before merging.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "feat: AWS DA" is concise, follows conventional commit style, and accurately highlights the primary change in this PR (adding an AWS S3 data-availability backend). It directly relates to the main changeset (new aws module, S3 client, light/full DA layers) without extraneous file lists or emojis, so a teammate scanning history can quickly grasp the main intent. The wording is sufficiently specific for this feature-focused PR while remaining brief.
Linked Issues Check ✅ Passed The changes directly implement an S3-backed DA evaluation as described by issue #423 by adding an aws module (crates/da/src/aws/*) that provides configuration (config.rs), an S3 client that verifies Object Lock (WORM) and bucket access and performs worm-enabled uploads (client.rs), and both light and full-node DA layers that use those capabilities (light_client.rs, full_node.rs); factory wiring and README/docs were also updated to expose and document the feature. Configuration supports explicit/assume-role credentials, retention/legal-hold flags, and cross-region options, and the client includes explicit object-lock checks before enabling WORM uploads, which satisfies the coding-focused requirements of evaluating S3 WORM behavior. Tests and examples are present to exercise failure modes when AWS isn’t available and to validate configuration paths. Overall, the PR meets the coding objectives of [#423].
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch aws-da

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0fac008 and 0b3cf42.

📒 Files selected for processing (1)
  • crates/da/src/aws/full_node.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/da/src/aws/full_node.rs
🧬 Code graph analysis (1)
crates/da/src/aws/full_node.rs (3)
crates/da/src/aws/light_client.rs (4)
  • new (43-56)
  • event_channel (130-132)
  • event_publisher (59-61)
  • start (115-128)
crates/da/src/memory.rs (5)
  • new (60-84)
  • event_channel (205-207)
  • produce_blocks (113-152)
  • start (163-175)
  • start (178-190)
crates/da/src/aws/client.rs (2)
  • new_from_full_da_config (112-155)
  • submit_metadata (565-568)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build-and-push-image
  • GitHub Check: coverage
  • GitHub Check: unused dependencies
  • GitHub Check: unit-test
  • GitHub Check: integration-test
🔇 Additional comments (7)
crates/da/src/aws/full_node.rs (7)

120-150: Off-by-one error persists: use incremented height for all broadcasts and metadata.

The previous review correctly identified this issue. Lines 122-123 capture completed_height before incrementing, but then use the stale value for broadcasting (line 130), event publishing (line 134), and metadata (line 139). This creates inconsistency where get_latest_height() returns the incremented value but subscribers receive the old value.

Apply this diff to use the incremented height consistently:

-                // Take write lock to atomically increment height and prevent submissions
-                let mut height_guard = current_height.write().await;
-                let completed_height = *height_guard;
-                *height_guard += 1;
-                drop(height_guard);
-
-                // Reset transaction offset for new height
-                transaction_offset.store(0, Ordering::Relaxed);
-
-                // Broadcast height update
-                let _ = height_update_tx.send(completed_height);
-
-                // Publish event
-                event_publisher.send(PrismEvent::UpdateDAHeight {
-                    height: completed_height,
-                });
-
-                // Update metadata
-                let metadata = AwsDaMetaInfo {
-                    current_height: completed_height,
-                };
+                // Take write lock to atomically increment height and prevent submissions
+                let mut height_guard = current_height.write().await;
+                *height_guard += 1;
+                let new_height = *height_guard;
+                drop(height_guard);
+
+                // Reset transaction offset for new height
+                transaction_offset.store(0, Ordering::Relaxed);
+
+                // Broadcast height update
+                let _ = height_update_tx.send(new_height);
+
+                // Publish event
+                event_publisher.send(PrismEvent::UpdateDAHeight {
+                    height: new_height,
+                });
+
+                // Update metadata
+                let metadata = AwsDaMetaInfo {
+                    current_height: new_height,
+                };
@@
-                        info!("Completed block {}", completed_height);
+                        info!("Completed block {}", new_height);

203-214: Lock held across await and incorrect return value.

The previous review identified two issues: holding the read lock during the async upload (blocking height progression) and returning the raw height instead of the inclusion height to match memory DA semantics.

Apply this diff to drop the lock before awaiting and return inclusion height:

-    async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64> {
-        // Take read lock to get consistent height and prevent increments during submission
-        let height_guard = self.current_height.read().await;
-        let current_height = *height_guard;
-
-        self.client.submit_finalized_epoch(epoch, current_height).await?;
-
-        info!("Finalized epoch submitted at height {}", current_height);
-
-        // Return the height where the epoch was published
-        Ok(current_height)
-    }
+    async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64> {
+        // Take read lock to get consistent height and prevent increments during submission
+        let height_guard = self.current_height.read().await;
+        let current_height = *height_guard;
+        drop(height_guard);
+
+        self.client.submit_finalized_epoch(epoch, current_height).await?;
+
+        info!("Finalized epoch submitted at height {}", current_height);
+
+        // Return the inclusion height
+        Ok(current_height + 1)
+    }

216-229: Race condition and incorrect return value in transaction submission.

The previous review identified race conditions where concurrent submissions can double-use the same offset. The current implementation also returns the raw height instead of the inclusion height for consistency with memory DA.

Apply this diff to atomically reserve offset and return inclusion height:

-    async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
-        // Take read lock to get consistent height and prevent increments during submission
-        let height_guard = self.current_height.read().await;
-        let height = *height_guard;
-
-        let count = transactions.len() as u64;
-        let transaction_offset = self.transaction_offset.fetch_add(count, Ordering::AcqRel);
-
-        self.client.submit_transactions(transactions, transaction_offset, height).await?;
-
-        info!("Transactions submitted at height {}", height);
-
-        Ok(height)
-    }
+    async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
+        let count = transactions.len() as u64;
+        
+        // Reserve unique offset for current height under read lock
+        let height_guard = self.current_height.read().await;
+        let height = *height_guard;
+        let start_offset = self.transaction_offset.fetch_add(count, Ordering::AcqRel);
+        drop(height_guard);
+
+        // Perform upload with reserved offset
+        self.client.submit_transactions(transactions, start_offset, height).await?;
+
+        info!("Transactions submitted at height {}", height);
+
+        // Return inclusion height
+        Ok(height + 1)
+    }

58-73: Well-structured concurrent design with proper synchronization primitives.

The layer correctly uses Arc<RwLock<u64>> for height coordination, Arc<AtomicU64> for transaction offsetting, and integrates well with the cancellation token pattern. The broadcast channel setup with a reasonable buffer size (100) is appropriate for height notifications.


111-118: Proper cancellation handling with tokio::select!.

The implementation correctly uses tokio::select! to handle graceful shutdown via the cancellation token, which aligns with the established pattern in the codebase for async task cancellation.


180-186: Correct implementation of epoch retrieval with proper error mapping.

The get_finalized_epochs method properly maps AWS client epochs to VerifiableEpoch trait objects and converts errors to the expected anyhow::Error type, maintaining consistency with the trait interface.


231-233: Clean broadcast receiver subscription.

The subscribe_to_heights implementation correctly returns a new receiver from the broadcast channel, allowing multiple subscribers to receive height updates independently.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/node_types/prover/src/factory.rs (1)

100-107: Default key path moved to .p8 — add migration fallback to legacy .pk8 to avoid identity breakage.

If a user relied on the previous implicit default, they may silently generate a new identity. Load a .pk8 sibling if present.

Apply in create_prover_as_prover:

-    let signing_key = SigningKey::from_pkcs8_pem_file(&config.signing_key_path)
-        .or_else(|_| {
-            info!(
-                "Signing key not found at '{}', generating new Ed25519 key pair",
-                &config.signing_key_path
-            );
-            create_ed25519_key_pair_pem_files(&config.signing_key_path)
-        })
-        .map_err(|e| anyhow!("Failed to load signing key: {}", e))?;
+    let path = Path::new(&config.signing_key_path);
+    let signing_key = match SigningKey::from_pkcs8_pem_file(&path) {
+        Ok(k) => k,
+        Err(_) => {
+            // Backward-compat: try legacy .pk8 next to the new .p8
+            let legacy = if path.extension().and_then(|s| s.to_str()) == Some("p8") {
+                Some(path.with_extension("pk8"))
+            } else {
+                None
+            };
+            if let Some(legacy_path) = legacy.filter(|p| p.exists()) {
+                info!("Found legacy signing key '{}'; loading.", legacy_path.display());
+                SigningKey::from_pkcs8_pem_file(&legacy_path)
+                    .map_err(|e| anyhow!("Failed to load legacy signing key '{}': {}", legacy_path.display(), e))?
+            } else {
+                info!(
+                    "Signing key not found at '{}', generating new Ed25519 key pair",
+                    path.display()
+                );
+                create_ed25519_key_pair_pem_files(&path)
+                    .map_err(|e| anyhow!("Failed to generate signing key at '{}': {}", path.display(), e))?
+            }
+        }
+    };
crates/da/src/factory.rs (1)

113-117: Avoid dumping entire Celestia config in logs.

If this struct ever contains tokens/keys, redact. Prefer a brief summary.

-            info!("Using celestia config: {:?}", celestia_conf);
+            info!("Using Celestia DA backend (light client)");
crates/da/src/celestia/light_client.rs (1)

294-321: Fix Redb store init: create() will fail on restart if the DB file already exists.

Use open-if-exists, create-otherwise to avoid permanent InitializationError on subsequent boots.

Apply this diff:

-                let store_path = base_path.join("lumina.redb");
-                let db = spawn_blocking(move || RedbDatabase::create(&store_path))
+                let store_path = base_path.join("lumina.redb");
+                let store_path_clone = store_path.clone();
+                let db = spawn_blocking(move || {
+                    if store_path_clone.exists() {
+                        RedbDatabase::open(&store_path_clone)
+                    } else {
+                        RedbDatabase::create(&store_path_clone)
+                    }
+                })
                     .await
                     .map_err(|e| {
                         DataAvailabilityError::InitializationError(format!(
                             "Failed to join blocking task: {}",
                             e
                         ))
                     })?
                     .map_err(|e| {
                         DataAvailabilityError::InitializationError(format!(
-                            "Failed to open database at {}: {}",
-                            path, e
+                            "Failed to create/open database at {}: {}",
+                            store_path.display(),
+                            e
                         ))
                     })?;
🧹 Nitpick comments (51)
crates/node_types/prover/src/factory.rs (5)

38-41: Validate start_height >= 1 at construction time.

Prevent invalid config (e.g., 0) early to avoid undefined sync behavior.

Apply validation in both factory fns:

 pub fn create_prover_as_full_node(
     config: &FullNodeConfig,
     db: Arc<Box<dyn Database>>,
     da: Arc<dyn DataAvailabilityLayer>,
     cancellation_token: CancellationToken,
 ) -> Result<Prover> {
+    if config.start_height == 0 {
+        return Err(anyhow!("start_height must be >= 1"));
+    }
 pub fn create_prover_as_prover(
     config: &ProverConfig,
     db: Arc<Box<dyn Database>>,
     da: Arc<dyn DataAvailabilityLayer>,
     cancellation_token: CancellationToken,
 ) -> Result<Prover> {
+    if config.start_height == 0 {
+        return Err(anyhow!("start_height must be >= 1"));
+    }

55-56: Default start_height = 1 — LGTM.

Consider documenting that heights are 1-based (genesis=1), or switch to NonZeroU64 in a later change.


80-81: Docs out of date: .pk8.p8.

The default path in the docstring still says .pk8.

Apply:

-    /// Default: `~/.prism/prover_key.pk8`
+    /// Default: `~/.prism/prover_key.p8`

172-181: Key generation path robustness and security posture.

  • Ensure parent directory exists before writing keys to avoid ENOENT.
  • Optional: confirm private key perms are 0o600 post-write.

Apply in helper:

-fn create_ed25519_key_pair_pem_files(signing_key_path: impl AsRef<Path>) -> Result<SigningKey> {
-    let signing_key = SigningKey::new_ed25519();
-    signing_key.to_pkcs8_pem_file(&signing_key_path)?;
-
-    let verifying_key_path = signing_key_path.as_ref().with_extension("spki");
-    signing_key.verifying_key().to_spki_pem_file(verifying_key_path)?;
-
-    Ok(signing_key)
-}
+fn create_ed25519_key_pair_pem_files(signing_key_path: impl AsRef<Path>) -> Result<SigningKey> {
+    let path = signing_key_path.as_ref();
+    if let Some(dir) = path.parent() {
+        std::fs::create_dir_all(dir)?;
+    }
+    let signing_key = SigningKey::new_ed25519();
+    signing_key.to_pkcs8_pem_file(&path)?;
+    let verifying_key_path = path.with_extension("spki");
+    signing_key.verifying_key().to_spki_pem_file(verifying_key_path)?;
+    // Optionally enforce strict perms (Unix):
+    // #[cfg(unix)]
+    // std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600))?;
+    Ok(signing_key)
+}

Would you like me to wire the Unix-only permission tightening, or does prism_keys already enforce secure perms?


261-261: Tests updated for .p8 and start_height clone — LGTM. Add two small assertions.

  • Assert defaults include start_height = 1 for both configs.

Apply:

@@ fn test_full_node_config_default() {
-        assert!(config.verifying_key_str.contains(".prism/prover_key.spki"));
+        assert!(config.verifying_key_str.contains(".prism/prover_key.spki"));
+        assert_eq!(config.start_height, 1);
         assert_eq!(config.webserver, WebServerConfig::default());
@@ fn test_prover_config_default() {
-        assert!(config.signing_key_path.contains(".prism/prover_key.p8"));
+        assert!(config.signing_key_path.contains(".prism/prover_key.p8"));
+        assert_eq!(config.start_height, 1);
         assert_eq!(config.max_epochless_gap, DEFAULT_MAX_EPOCHLESS_GAP);

Optional new test (fallback to legacy .pk8):

#[test]
fn test_create_prover_as_prover_uses_legacy_pk8_if_present() {
    let temp = TempDir::new().unwrap();
    let legacy = temp.path().join("legacy.pk8");
    let desired = temp.path().join("desired.p8");
    // Write only legacy key
    let sk = SigningKey::new_ed25519();
    sk.to_pkcs8_pem_file(&legacy).unwrap();

    let cfg = ProverConfig {
        signing_key_path: desired.to_string_lossy().to_string(),
        ..ProverConfig::default()
    };
    let db = Arc::new(Box::new(InMemoryDatabase::new()) as Box<dyn Database>);
    let da = Arc::new(InMemoryDataAvailabilityLayer::default()) as Arc<dyn DataAvailabilityLayer>;
    let ct = CancellationToken::new();

    let r = create_prover_as_prover(&cfg, db, da, ct);
    assert!(r.is_ok());
}

Also applies to: 286-292, 304-307, 321-330, 344-349, 369-375, 383-392

crates/serde/src/binary.rs (2)

7-7: Prefer std::error::Error + Send + Sync + 'static over bare Display for Error associated types.

This makes the errors composable with anyhow/thiserror, usable across threads, and object‑safe for downcasting. Display is implied by Error anyway.

-    type Error: Display;
+    type Error: std::error::Error + Send + Sync + 'static;

Apply to both traits.

Also applies to: 24-24


6-10: Add brief rustdoc to clarify format (bincode) and guarantees.

Public traits benefit from quick docs stating the encoding/decoding contract.

+/// Binary encoding using `bincode` defaults.
+/// Implemented for all `serde::Serialize` types.
 pub trait ToBinary {
+/// Decode from `bincode` bytes into `Self`.
+/// Implemented for all `serde::Deserialize` types.
 pub trait FromBinary: Sized {
clippy.toml (1)

7-7: Consider expanding valid idents for AWS terms.

Add “S3”, “STS”, “IAM”, “KMS”, “CloudTrail” to avoid false positives in doc linting across new AWS modules.

 doc-valid-idents = [
   "RocksDB",
   "InMemory",
   "WebSocket",
   "OpenTelemetry",
   "IndexedDB",
   "CloudWatch",
+  "S3",
+  "STS",
+  "IAM",
+  "KMS",
+  "CloudTrail",
 ]
README.md (1)

25-25: Clarify AWS DA availability and build flag.

Mention that the AWS S3 DA backend is feature-gated (e.g., --features aws) and non-wasm to set expectations for builders and users.

-Multiple DA backends are supported including Celestia for decentralized availability.
+Multiple DA backends are supported, including Celestia for decentralized availability. An AWS S3‑backed DA is available behind the `aws` feature (non‑wasm).
CLAUDE.md (1)

57-62: Apply the “short public docstrings” rule to new AWS config docs.

crates/da/src/aws/config.rs has very long field docs; consider moving the long explanations to module docs and keep field docs concise.

crates/da/src/aws/config.rs (5)

22-47: Right-size docs per project guidelines.

These are detailed (good), but per CLAUDE.md the long explanations should live in module docs; keep field docs concise and move WORM/requirements to mod.rs or a module-level section.


296-402: Constructor ergonomics: prefer Into; drop const.

pub const fn here adds no practical benefit and blocks Into<String> ergonomics. Switch to regular fns accepting impl Into<String> to allow &str/String.

-    pub const fn development(access_key_id: String, secret_access_key: String) -> Self {
+    pub fn development<S1: Into<String>, S2: Into<String>>(access_key_id: S1, secret_access_key: S2) -> Self {
         Self::Explicit {
-            access_key_id,
-            secret_access_key,
+            access_key_id: access_key_id.into(),
+            secret_access_key: secret_access_key.into(),
             session_token: None,
         }
     }
@@
-    pub const fn cross_account(role_arn: String, session_name: String) -> Self {
+    pub fn cross_account<S1: Into<String>, S2: Into<String>>(role_arn: S1, session_name: S2) -> Self {
         Self::AssumeRole {
-            role_arn,
-            session_name,
+            role_arn: role_arn.into(),
+            session_name: session_name.into(),
             external_id: None,
             session_duration: Duration::from_secs(3600), // 1 hour default
         }
     }
@@
-    pub const fn profile(profile_name: String) -> Self {
+    pub fn profile<S: Into<String>>(profile_name: S) -> Self {
         Self::Default {
-            profile: Some(profile_name),
+            profile: Some(profile_name.into()),
         }
     }

493-616: Add serde round‑trip tests to lock in defaults.

Please add deserialization tests (partial configs) to ensure field defaults are applied as intended.

@@
     fn test_light_client_config_structure() {
         let config = create_test_light_client_config();
@@
     }
+
+    #[test]
+    fn test_light_client_serde_defaults() {
+        let json = r#"{"epochs_bucket":"eb","metadata_bucket":"mb"}"#;
+        let cfg: AwsLightClientDAConfig = serde_json::from_str(json).unwrap();
+        assert_eq!(cfg.region, DEFAULT_AWS_REGION);
+        assert_eq!(cfg.epochs_bucket, "eb");
+        assert_eq!(cfg.metadata_bucket, "mb");
+        assert_eq!(cfg.max_timeout, DEFAULT_S3_MAX_TIMEOUT);
+        assert_eq!(cfg.max_retries, DEFAULT_S3_MAX_RETRIES);
+        assert_eq!(cfg.block_time, Duration::from_secs(10));
+    }

710-723: Test naming/comment is misleading vs. actual default.

This test sets max_concurrent_uploads = 5 in the helper, but then asserts “default … 5”. Either assert the struct default (10) or reword the comment.

-        // Test default concurrent uploads
-        assert_eq!(config.max_concurrent_uploads, 5);
+        // Test helper-initialized concurrent uploads (not struct default)
+        assert_eq!(config.max_concurrent_uploads, 5);

67-94: Update S3 bucket naming comment to match AWS rules

Consecutive hyphens are allowed; periods are allowed but cannot be adjacent ("..") or adjacent to hyphens ("-." / ".-"); bucket must start and end with a letter or number; periods also have operational caveats (break virtual-hosted HTTPS wildcards and disable Transfer Acceleration). Replace the current hard rule list with a concise summary and add a link to the official AWS S3 bucket-naming docs.
Location: crates/da/src/aws/config.rs (lines 67–94)

crates/cli/src/apply_args/commands.rs (2)

65-67: Mirror validation for prover.start_height.

Apply the same bounds/checks here to keep behavior consistent across node types.


41-43: Enforce start_height >= 1 and document semantics

start_height is a u64 with Default: 1 (see crates/node_types/prover/src/factory.rs and crates/cli/src/cli_args/commands.rs); the CLI currently assigns overrides without validation — validate CLI input and reject 0 (or clamp) and add a short doc: "DA-layer block height of the first prism block; units = blocks; default = 1". Apply validation at crates/cli/src/apply_args/commands.rs (lines 41–43 and 65–67).

crates/presets/src/lib.rs (1)

10-12: Add brief doc on Development preset trade‑offs.

Note that it may enable insecure shortcuts (e.g., mock proofs) to set expectations for users reading generated docs/serde.

crates/node_types/lightclient/src/lightclient.rs (2)

251-253: Fix log message: height placeholder uses error value.

The format string says “at height {}” but passes the error. Use both height and error.

Apply this diff:

-            Err(e) => {
-                error!("failed to fetch data at height {}", e)
-            }
+            Err(e) => {
+                error!("failed to fetch data at height {}: {}", height, e)
+            }

199-205: Metric name is Celestia‑specific.

Recording celestia_synced_height under non‑Celestia DA (e.g., AWS) is misleading. Consider a generic metric or DA‑specific labels.

crates/cli/src/main.rs (1)

100-106: Make FullNode ordering consistent (init telemetry before DB/DA).

Align with LightClient/Prover to fail fast and avoid unnecessary resource init when telemetry setup fails.

Proposed diff:

-            let db = create_storage(&config.db).await?;
-            let da = create_full_node_da_layer(&config.da).await?;
             let telemetry = create_telemetry(
                 &config.telemetry,
                 vec![("node_type".to_string(), "fullnode".to_string())],
             )?;
+            let db = create_storage(&config.db).await?;
+            let da = create_full_node_da_layer(&config.da).await?;
crates/node_types/lightclient/src/factory.rs (1)

83-89: Consider bypassing verifying key parsing when mock proofs are enabled.

Today we still parse the key and may fail before tests/dev runs. Optionally skip parsing or accept a placeholder when allow_mock_proofs is true.

Do you want a patch that gates key parsing on allow_mock_proofs to simplify dev UX?

crates/cli/src/cli_args/commands.rs (3)

24-27: Add clap conflicts between dev and specter for LightClient args.

FullNode/Prover guard these flags, but LightClient doesn’t. Prevent ambiguous presets by mirroring the conflicts.

Apply this diff:

-    #[arg(long)]
+    #[arg(long, conflicts_with = "specter")]
     /// Start light client in development mode
     pub dev: bool,

-    #[arg(long)]
+    #[arg(long, conflicts_with = "dev")]
     /// Start light client with connection to specter testnet
     pub specter: bool,

Also applies to: 28-31


81-85: Validate start_height at parse time (u64 ≥ 1).

Avoid propagating invalid heights by constraining the clap parser.

Apply this diff:

-    #[arg(long)]
+    #[arg(long, value_parser = clap::value_parser!(u64).range(1..))]
     /// The height of the first prism block to consider
     /// Default: 1
     pub start_height: Option<u64>,

136-140: Mirror start_height validation for Prover.

Keep semantics consistent across binaries.

Apply this diff:

-    #[arg(long)]
+    #[arg(long, value_parser = clap::value_parser!(u64).range(1..))]
     /// The height of the first prism block to consider
     /// Default: 1
     pub start_height: Option<u64>,
crates/da/src/lib.rs (1)

28-33: AWS S3 doc tweak: call out Object Lock prerequisites.

Mention S3 Object Lock requires bucket versioning and a retention mode (governance/compliance) to actually be WORM.

crates/da/src/celestia/full_node.rs (1)

272-292: Improve parsing log message and retry semantics.

  • Log says “epoch json” but decoding isn’t JSON; adjust wording.
  • With fetch_max_retries == 0, try_fetch_blobs currently performs 0 attempts. If the intent is “0 = no retries but 1 attempt,” change loop to 0..=self.fetch_max_retries.

Apply this diff (in two places):

@@
-                        warn!(
-                            "Ignoring blob: marshalling blob from height {} to epoch json failed with error {}: {:?}",
-                            height, e, &blob
-                        );
+                        warn!(
+                            "Ignoring blob: decoding blob at height {} into FinalizedEpoch failed: {} (blob: {:?})",
+                            height, e, &blob
+                        );

And in try_fetch_blobs:

-        for attempt in 0..self.fetch_max_retries {
+        // attempt index is 0-based; 0 means the first (non-retry) attempt
+        for attempt in 0..=self.fetch_max_retries {
crates/da/src/factory.rs (1)

236-237: Remove unreachable!.

The function always returns inside the loop; unreachable!() is noisy in logs and coverage.

-            unreachable!() // This line should never be reached due to the return in the last iteration
+            // No-op: all code paths return inside the loop.

Also applies to: 261-262

crates/da/src/aws/light_client.rs (2)

55-81: Background task handle is dropped; no way to stop or prevent double start.

Store the JoinHandle and make start() idempotent to avoid multiple polling loops and allow graceful shutdown later.

-use std::{sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
+use tokio::sync::OnceCell;

 #[derive(Clone)]
 pub struct AwsLightDataAvailabilityLayer {
     /// AWS S3 client for data operations
     client: AwsDataAvailabilityClient,

     /// Event channel for publishing data availability events
     event_channel: Arc<EventChannel>,

     block_time: Duration,
+    height_monitor: Arc<OnceCell<tokio::task::JoinHandle<()>>>,
 }

 impl AwsLightDataAvailabilityLayer {
     /// Creates a new AWS light data availability layer.
     pub async fn new(config: &AwsLightClientDAConfig) -> Result<Self, DataAvailabilityError> {
         let client = AwsDataAvailabilityClient::new_from_light_da_config(config.clone()).await?;

         Ok(Self {
             client,
             event_channel: Arc::new(EventChannel::new()),
             block_time: config.block_time,
+            height_monitor: Arc::new(OnceCell::new()),
         })
     }
@@
-        let _handle = tokio::spawn(async move {
+        let handle = tokio::spawn(async move {
             let mut last_height = 0u64;
             loop {
                 // ...
             }
         });
-
+        // Best-effort; ignore if already set
+        let _ = self.height_monitor.set(handle);

99-105: Preserve error context; avoid to_string() in map_err.

Use Into<anyhow::Error> to retain backtraces and sources.

-            .map_err(|e| anyhow!(e.to_string()))
+            .map_err(Into::into)
crates/cli/src/tests/mod.rs (3)

21-27: Remove unnecessary unsafe and avoid mutating env while iterating.

Env ops aren’t unsafe. Also collect keys first to avoid iterator surprises.

-fn clear_env_vars() {
-    for (key, _) in env::vars() {
-        if key.starts_with("PRISM_") {
-            unsafe { env::remove_var(key) };
-        }
-    }
-}
+fn clear_env_vars() {
+    let keys: Vec<String> = env::vars()
+        .filter_map(|(k, _)| if k.starts_with("PRISM_") { Some(k) } else { None })
+        .collect();
+    for k in keys {
+        env::remove_var(k);
+    }
+}

42-43: Drop unnecessary unsafe blocks around env::set_var.

These calls are safe; remove unsafe { ... }.

-    unsafe { env::set_var("PRISM__LIGHT_CLIENT__VERIFYING_KEY_STR", "env_key") };
+    env::set_var("PRISM__LIGHT_CLIENT__VERIFYING_KEY_STR", "env_key");

Apply similarly at the other occurrences in this file.

Also applies to: 73-74, 154-160, 216-220, 264-267, 323-326


29-60: Tests mutate global env; make them serial to avoid flakiness.

Parallel test execution can interleave env changes. Mark tests serial (or run with --test-threads=1).

  • Add to dev-dependencies: serial_test = "3"
  • Annotate each env-mutating test with #[serial_test::serial]

Also applies to: 62-90, 92-111, 112-132, 200-242, 309-351

crates/da/src/aws/mod.rs (1)

140-158: API exposure LGTM; docs are thorough.

Nice module docs and correct conditional exports. Consider adding a brief note that full-node support is not available on wasm32.

crates/da/src/aws/full_node.rs (5)

95-135: Spawned task handle not stored; field exists but unused.

_height_monitor_handle is never set. Store the handle and make start idempotent to avoid duplicate producers.

-        let _handle = tokio::spawn(async move {
+        let handle = tokio::spawn(async move {
             loop {
                 // ...
             }
         });
-
-        debug!("Started height monitoring task with race condition protection");
+        // Store handle if not already set; ignore if already running
+        if let Some(slot) = unsafe { (&self._height_monitor_handle as *const _ as *mut Option<JoinHandle<()>>).as_mut() } {
+            if slot.is_none() {
+                *slot = Some(handle);
+            }
+        }
+        debug!("Started height monitoring task with race condition protection");

If you prefer sound interior mutability, switch the field to tokio::sync::OnceCell<JoinHandle<()>> and set it here. That avoids unsafe.


177-188: Holding an RwLock read guard across .await.

This blocks block production for the entire network operation and can deadlock in complex flows. Prefer a dedicated submission mutex or capture the height then coordinate with a barrier.

  • Option A: Add submission_lock: tokio::sync::Mutex<()> and lock it in both produce_blocks (around the write) and submission paths (around the network call), releasing the height lock before .await.
  • Option B: Snapshot height, release the lock, submit, then verify height hasn’t advanced; if it has, retry/abort.

154-160: Preserve error context; avoid .to_string() in map_err.

Use Into<anyhow::Error> for richer errors.

-            .map_err(|e| anyhow!(e.to_string()))
+            .map_err(Into::into)
-        self.client.fetch_transactions(height).await.map_err(|e| anyhow!(e.to_string()))
+        self.client.fetch_transactions(height).await.map_err(Into::into)

Also applies to: 173-175


169-171: Height semantics may be off-by-one vs. broadcasts.

You broadcast completed_height but get_latest_height returns the incremented value. Clarify intended contract: should latest be “produced” or “next”? Align both or rename for clarity.

Also applies to: 112-119


72-93: Consider seeding height from persisted AWS metadata.

Starting from 1 ignores existing chain state. Query S3 (e.g., fetch_height()) and initialize current_height accordingly.

crates/da/src/celestia/light_client.rs (4)

393-410: Improve bootnode parsing diagnostics; log invalid addresses explicitly.

Current warning prints only valid multiaddrs, not which inputs failed.

Apply this diff:

-        if !self.config.bootnodes.is_empty() {
-            let multiaddrs: Vec<Multiaddr> = self
-                .config
-                .bootnodes
-                .clone()
-                .into_iter()
-                .filter_map(|addr| Multiaddr::from_str(&addr).ok())
-                .collect();
-
-            if multiaddrs.len() != self.config.bootnodes.len() {
-                warn!(
-                    "Some bootnodes failed to parse to libp2p multiaddrs. Valid addresses contain: {:#?}",
-                    multiaddrs
-                );
-            }
-
-            node = node.bootnodes(multiaddrs);
-        }
+        if !self.config.bootnodes.is_empty() {
+            let mut multiaddrs = Vec::new();
+            let mut invalid = Vec::new();
+            for addr in &self.config.bootnodes {
+                match Multiaddr::from_str(addr) {
+                    Ok(m) => multiaddrs.push(m),
+                    Err(_) => invalid.push(addr.clone()),
+                }
+            }
+            if !invalid.is_empty() {
+                warn!("Failed to parse bootnodes to libp2p multiaddrs: {:?}", invalid);
+            }
+            node = node.bootnodes(multiaddrs);
+        }

442-469: Retries: use 1-based attempt counter and add simple backoff to avoid tight loop.

Apply this diff:

-        for attempt in 0..self.fetch_max_retries {
+        for attempt in 1..=self.fetch_max_retries {
             match node
                 .request_all_blobs(self.snark_namespace, height, Some(self.fetch_timeout))
                 .await
             {
                 Ok(blobs) => {
@@
                 }
                 Err(e) => {
                     warn!(
-                        "failed to fetch data on attempt {} with error: {}.",
+                        "failed to fetch data on attempt {} with error: {}.",
                         attempt, e
                     );
+                    if attempt < self.fetch_max_retries {
+                        // linear backoff bounded by fetch_timeout
+                        let delay_ms = (250 * attempt as u64).min(self.fetch_timeout.as_millis() as u64);
+                        tokio::time::sleep(Duration::from_millis(delay_ms)).await;
+                    }
                 }
             }
         }

8-8: Remove unnecessary std::self import.

Tiny cleanup; std need not import its own prelude.

-use std::{self, str::FromStr, sync::Arc, time::Duration};
+use std::{str::FromStr, sync::Arc, time::Duration};

238-249: Add rustdoc to new specter preset helpers for discoverability.

-impl CelestiaLightClientDAConfig {
-    pub fn new_for_specter() -> std::result::Result<Self, PresetError> {
+impl CelestiaLightClientDAConfig {
+    /// Construct a default config and apply the Specter preset (devnet-friendly).
+    pub fn new_for_specter() -> std::result::Result<Self, PresetError> {
         let mut config = Self::default();
         config.apply_specter_preset()?;
         Ok(config)
     }
 
-    pub fn apply_specter_preset(&mut self) -> std::result::Result<(), PresetError> {
+    /// Mutate this config to use Specter defaults (network + namespaces).
+    pub fn apply_specter_preset(&mut self) -> std::result::Result<(), PresetError> {
         self.celestia_network = CelestiaNetwork::Mocha;
         self.snark_namespace_id = DEVNET_SPECTER_SNARK_NAMESPACE_ID.to_string();
         Ok(())
     }
 }
crates/da/src/aws/client.rs (8)

512-525: Avoid string-matching AWS errors; handle typed NoSuchKey.

String contains("NoSuchKey"/"404") is brittle and locale-dependent.

Apply this diff (adjust if your SDK module paths differ):

-            Err(e) => {
-                // Check if this is a "not found" error
-                let error_string = e.to_string();
-                if error_string.contains("NoSuchKey") || error_string.contains("404") {
-                    trace!("Key '{}' not found in bucket '{}'", key, bucket);
-                    Ok(None)
-                } else {
-                    Err(AwsDaClientError::RequestFailed(format!(
-                        "Failed to get object for key '{}' in bucket '{}': {}",
-                        key, bucket, e
-                    )))
-                }
-            }
+            Err(e) => {
+                use aws_sdk_s3::{error::SdkError, operation::get_object::GetObjectError};
+                match e {
+                    SdkError::ServiceError { err, .. } if err.is_no_such_key() => {
+                        trace!("Key '{}' not found in bucket '{}'", key, bucket);
+                        Ok(None)
+                    }
+                    other => Err(AwsDaClientError::RequestFailed(format!(
+                        "Failed to get object for key '{}' in bucket '{}': {}",
+                        key, bucket, other
+                    ))),
+                }
+            }

613-616: Retention must be in the future for Object Lock; clamp 0 to minimum (1 day).

S3 rejects retain_until set to “now”. If retention_days is 0 this will likely fail.

Apply this diff:

-        let retain_until = DateTime::from(
-            SystemTime::now() + Duration::from_secs(self.retention_days as u64 * 24 * 3600),
-        );
+        let retention_days = self.retention_days.max(1);
+        let retain_until =
+            DateTime::from(SystemTime::now() + Duration::from_secs(retention_days as u64 * 24 * 3600));

560-568: Extension mismatch: writing binary data to a “.json” key.

submit_metadata uses binary codec but stores to info.json. Rename to .bin or switch to JSON.

Binary rename (minimal):

-        let metadata_key = format!("{}metadata/info.json", self.key_prefix);
+        let metadata_key = format!("{}metadata/info.bin", self.key_prefix);

Make the same change in fetch_metadata.

-        let metadata_key = format!("{}metadata/info.json", self.key_prefix);
+        let metadata_key = format!("{}metadata/info.bin", self.key_prefix);

528-556: Parallelize S3 downloads under a small cap to reduce tail latency.

Current loop is fully sequential; can be slow with many objects.

Outline (not full diff): use FuturesUnordered with a semaphore, similar to your upload path:

  • Build tasks: fetch_from_s3(bucket, key) for each key
  • Gate with Semaphore
  • Collect Ok(Some(_)) results, log others

384-396: List permissions may be restricted; consider a softer access check.

ListObjectsV2 can fail even when GetObject/PutObject are permitted. Optionally probe a known key or use HeadBucket.


237-241: Avoid unwrap() on semaphore acquire in uploads.

Prefer handling closed semaphore gracefully to avoid panics on shutdown paths.

-                    let _permit = sem.acquire().await.unwrap();
+                    let _permit = sem.acquire().await.map_err(|_| {
+                        AwsDaClientError::UploadFailed("Upload semaphore closed".into())
+                    })?;

278-281: Replace deprecated BehaviorVersion::v2025_01_17() with BehaviorVersion::latest() (or BehaviorVersion::v2025_08_07)

BehaviorVersion::v2025_01_17() is valid but deprecated; as of September 16, 2025, latest() maps to BehaviorVersion::v2025_08_07 — prefer latest() unless you intentionally need to pin the older behavior.
crates/da/src/aws/client.rs — lines 278–281 and 331–334.


278-281: Prefer BehaviorVersion::latest() to avoid SDK minor-bump breakage

Replace explicit BehaviorVersion::v2025_01_17() with BehaviorVersion::latest() in crates/da/src/aws/client.rs (around lines 278 and 331).

-        let mut config_loader = aws_config::defaults(BehaviorVersion::v2025_01_17())
+        let mut config_loader = aws_config::defaults(BehaviorVersion::latest())
             .region(region.clone())
             .retry_config(retry_config);
-                let base_config = aws_config::defaults(BehaviorVersion::v2025_01_17())
+                let base_config = aws_config::defaults(BehaviorVersion::latest())
                     .region(region.clone())
                     .load()
                     .await;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 50f3d2f and 847fd90.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (25)
  • CLAUDE.md (1 hunks)
  • Cargo.toml (2 hunks)
  • README.md (1 hunks)
  • clippy.toml (1 hunks)
  • crates/cli/src/apply_args/commands.rs (2 hunks)
  • crates/cli/src/cli_args/commands.rs (4 hunks)
  • crates/cli/src/main.rs (2 hunks)
  • crates/cli/src/tests/mod.rs (25 hunks)
  • crates/da/Cargo.toml (2 hunks)
  • crates/da/src/aws/client.rs (1 hunks)
  • crates/da/src/aws/config.rs (1 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
  • crates/da/src/aws/light_client.rs (1 hunks)
  • crates/da/src/aws/mod.rs (1 hunks)
  • crates/da/src/celestia/full_node.rs (1 hunks)
  • crates/da/src/celestia/light_client.rs (5 hunks)
  • crates/da/src/factory.rs (7 hunks)
  • crates/da/src/lib.rs (4 hunks)
  • crates/da/src/memory.rs (2 hunks)
  • crates/node_types/lightclient/src/factory.rs (5 hunks)
  • crates/node_types/lightclient/src/lightclient.rs (2 hunks)
  • crates/node_types/prover/src/factory.rs (13 hunks)
  • crates/node_types/prover/src/lib.rs (1 hunks)
  • crates/presets/src/lib.rs (2 hunks)
  • crates/serde/src/binary.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards
Use rustfmt for formatting across the codebase

Files:

  • crates/node_types/lightclient/src/lightclient.rs
  • crates/node_types/prover/src/lib.rs
  • crates/presets/src/lib.rs
  • crates/da/src/aws/mod.rs
  • crates/serde/src/binary.rs
  • crates/da/src/aws/config.rs
  • crates/cli/src/apply_args/commands.rs
  • crates/cli/src/cli_args/commands.rs
  • crates/da/src/aws/full_node.rs
  • crates/da/src/aws/client.rs
  • crates/node_types/lightclient/src/factory.rs
  • crates/da/src/aws/light_client.rs
  • crates/cli/src/main.rs
  • crates/da/src/celestia/full_node.rs
  • crates/da/src/memory.rs
  • crates/node_types/prover/src/factory.rs
  • crates/da/src/lib.rs
  • crates/da/src/factory.rs
  • crates/da/src/celestia/light_client.rs
  • crates/cli/src/tests/mod.rs
crates/**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

crates/**/*.rs: Naming: snake_case for functions/variables, CamelCase for types
Error handling: Use Result types with descriptive error messages
Handle errors with the ? operator and proper error types
Documentation: Add comments for public APIs and complex logic
File organization: Group related functionality in modules
Avoid allocations in hot paths by using references/borrowing
Use tokio for I/O-bound async/await operations

Files:

  • crates/node_types/lightclient/src/lightclient.rs
  • crates/node_types/prover/src/lib.rs
  • crates/presets/src/lib.rs
  • crates/da/src/aws/mod.rs
  • crates/serde/src/binary.rs
  • crates/da/src/aws/config.rs
  • crates/cli/src/apply_args/commands.rs
  • crates/cli/src/cli_args/commands.rs
  • crates/da/src/aws/full_node.rs
  • crates/da/src/aws/client.rs
  • crates/node_types/lightclient/src/factory.rs
  • crates/da/src/aws/light_client.rs
  • crates/cli/src/main.rs
  • crates/da/src/celestia/full_node.rs
  • crates/da/src/memory.rs
  • crates/node_types/prover/src/factory.rs
  • crates/da/src/lib.rs
  • crates/da/src/factory.rs
  • crates/da/src/celestia/light_client.rs
  • crates/cli/src/tests/mod.rs
🧠 Learnings (4)
📚 Learning: 2025-08-25T12:23:29.763Z
Learnt from: CR
PR: deltadevsde/prism#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-25T12:23:29.763Z
Learning: Applies to crates/**/*.rs : Documentation: Add comments for public APIs and complex logic

Applied to files:

  • CLAUDE.md
📚 Learning: 2025-08-25T12:23:29.763Z
Learnt from: CR
PR: deltadevsde/prism#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-25T12:23:29.763Z
Learning: Applies to **/*.rs : Follow Rust Coding Standards

Applied to files:

  • CLAUDE.md
📚 Learning: 2025-08-25T12:23:29.763Z
Learnt from: CR
PR: deltadevsde/prism#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-25T12:23:29.763Z
Learning: Applies to crates/**/*.rs : File organization: Group related functionality in modules

Applied to files:

  • CLAUDE.md
📚 Learning: 2025-06-17T07:14:03.223Z
Learnt from: sebasti810
PR: deltadevsde/prism#342
File: crates/cli/src/main.rs:152-155
Timestamp: 2025-06-17T07:14:03.223Z
Learning: In the Prism codebase, the verifying_key field in NetworkConfig was changed from Option<VerifyingKey> to VerifyingKey because it's always required by all node types (LightClient, Prover, FullNode) and a sensible default (generated key) can be provided when not explicitly configured.

Applied to files:

  • crates/node_types/lightclient/src/factory.rs
  • crates/node_types/prover/src/factory.rs
🧬 Code graph analysis (13)
crates/serde/src/binary.rs (3)
crates/da/src/lib.rs (1)
  • fmt (147-154)
crates/keys/src/verifying_keys.rs (1)
  • fmt (355-358)
crates/common/src/digest.rs (2)
  • fmt (95-97)
  • fmt (101-103)
crates/da/src/aws/full_node.rs (1)
crates/da/src/aws/client.rs (1)
  • new_from_full_da_config (112-155)
crates/da/src/aws/client.rs (1)
crates/da/src/aws/config.rs (1)
  • profile (433-437)
crates/node_types/lightclient/src/factory.rs (3)
crates/presets/src/lib.rs (1)
  • apply_preset (94-94)
crates/cli/src/config.rs (3)
  • apply_preset (20-24)
  • apply_preset (39-44)
  • apply_preset (59-64)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/da/src/aws/light_client.rs (1)
crates/da/src/aws/client.rs (1)
  • new_from_light_da_config (72-110)
crates/cli/src/main.rs (2)
crates/da/src/factory.rs (2)
  • create_light_client_da_layer (108-130)
  • create_full_node_da_layer (209-269)
crates/storage/src/factory.rs (1)
  • create_storage (62-73)
crates/da/src/celestia/full_node.rs (3)
crates/da/src/celestia/light_client.rs (3)
  • get_finalized_epochs (431-475)
  • new (357-366)
  • event_channel (477-479)
crates/da/src/lib.rs (7)
  • get_finalized_epochs (388-388)
  • height (213-213)
  • height (254-256)
  • try_from (170-176)
  • try_from (376-380)
  • new (187-189)
  • event_channel (389-389)
crates/da/src/memory.rs (3)
  • get_finalized_epochs (157-168)
  • new (50-71)
  • event_channel (170-172)
crates/da/src/memory.rs (3)
crates/da/src/lib.rs (2)
  • start (387-387)
  • new (187-189)
crates/node_types/prover/src/syncer.rs (2)
  • start (59-62)
  • new (30-53)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/node_types/prover/src/factory.rs (2)
crates/node_types/lightclient/src/factory.rs (1)
  • default (38-52)
crates/node_types/prover/src/prover/mod.rs (1)
  • default (69-88)
crates/da/src/lib.rs (6)
crates/da/src/celestia/full_node.rs (1)
  • start (242-270)
crates/da/src/celestia/light_client.rs (1)
  • start (376-429)
crates/da/src/memory.rs (2)
  • start (140-146)
  • start (149-155)
crates/cli/src/node_types.rs (3)
  • start (9-9)
  • start (15-17)
  • start (22-24)
crates/node_types/prover/src/syncer.rs (1)
  • start (59-62)
crates/node_types/uniffi-lightclient/src/lib.rs (1)
  • start (73-76)
crates/da/src/factory.rs (3)
crates/da/src/celestia/full_node.rs (3)
  • new_for_specter (132-136)
  • new (160-190)
  • default (119-128)
crates/da/src/celestia/light_client.rs (3)
  • new_for_specter (239-243)
  • new (357-366)
  • default (214-235)
crates/da/src/aws/config.rs (1)
  • development (414-420)
crates/da/src/celestia/light_client.rs (4)
crates/da/src/celestia/full_node.rs (7)
  • new_for_specter (132-136)
  • default (119-128)
  • apply_specter_preset (138-144)
  • new (160-190)
  • start (242-270)
  • get_finalized_epochs (272-292)
  • event_channel (294-296)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/da/src/lib.rs (6)
  • new (187-189)
  • start (387-387)
  • get_finalized_epochs (388-388)
  • height (213-213)
  • height (254-256)
  • event_channel (389-389)
crates/node_types/prover/src/syncer.rs (2)
  • new (30-53)
  • start (59-62)
crates/cli/src/tests/mod.rs (2)
crates/cli/src/cli_args/commands.rs (3)
  • config_path (48-50)
  • config_path (99-101)
  • config_path (158-160)
crates/cli/src/apply_args/traits.rs (1)
  • config_path (13-13)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: build-and-push-image
  • GitHub Check: clippy
  • GitHub Check: wasm-test
  • GitHub Check: integration-test
  • GitHub Check: unit-test
  • GitHub Check: coverage
  • GitHub Check: unused dependencies
🔇 Additional comments (24)
crates/node_types/prover/src/lib.rs (1)

61-61: Doc sample path switched to .p8 — good. Please ensure consistency across docs.

This matches the new default. Double-check factory.rs ProverConfig docs (Line 80) still mention .pk8; update to .p8 for consistency.

crates/node_types/prover/src/factory.rs (2)

143-144: Start height now flows from config — LGTM.

This unblocks custom sync start positions.


188-189: Config-driven start_height for prover — LGTM.

Matches the full-node path; consistent behavior.

crates/serde/src/binary.rs (1)

1-1: LGTM: Display import aligns with the new bound.

No issues with this import given the bound usage.

Cargo.toml (1)

148-148: Good call on adding backon.

Backoff/retry integration will be useful around S3 transient errors. No issues.

crates/da/src/aws/config.rs (1)

281-295: Default impls are fine but won’t affect serde.

Keep these, but correctness hinges on adding per‑field serde defaults as suggested.

crates/presets/src/lib.rs (1)

19-25: FromStr mapping LGTM; keep alias list in one place.

Looks correct and matches other presets. Consider a unit test asserting "dev" and "development" both parse.

crates/cli/src/main.rs (2)

64-64: Good: DA init moved after telemetry (light client).

Early telemetry setup before heavy resources is a sane ordering.


82-84: Good: DB/DA init moved after telemetry (prover).

Matches the intended startup sequence.

crates/node_types/lightclient/src/factory.rs (3)

31-35: Clearly labeled “mock proofs” flag is good; keep it visible in docs/CLI help.

Public field + docstring are appropriate.

Ensure CLI help and config docs warn prominently about bypassing verification.


50-50: Default false is correct.

Safe default minimizes risk.


131-131: Tests: using ..Default::default() is correct after adding the new field.

Keeps backward compatibility in test constructors.

Also applies to: 144-144

crates/node_types/lightclient/src/lightclient.rs (1)

156-157: ```shell
#!/bin/bash
set -euo pipefail
echo "PWD: $(pwd)"
echo "---- show lightclient.rs (1-260) ----"
if [ -f crates/node_types/lightclient/src/lightclient.rs ]; then
sed -n '1,260p' crates/node_types/lightclient/src/lightclient.rs
else
echo "File not found: crates/node_types/lightclient/src/lightclient.rs"
fi
echo "---- rg UpdateDAHeight ----"
rg -n --hidden --no-ignore -S 'UpdateDAHeight' || true
echo "---- rg UpdateDA ----"
rg -n --hidden --no-ignore -S 'UpdateDA' || true
echo "---- rg Update.*DA ----"
rg -n --hidden --no-ignore -S 'Update.*DA|DA.*Update' || true
echo "---- rg \.start\(\)\.await ----"
rg -n --hidden --no-ignore -S '.start().await' || true
echo "---- rg subscribe\( ----"
rg -n --hidden --no-ignore -S 'subscribe(' || true
echo "---- rg "async fn start" ----"
rg -n --hidden --no-ignore -S 'async fn start' || true
echo "---- rg trait .*Da|DataAvailability ----"
rg -n --hidden --no-ignore -S 'trait .*Da|trait .*DA|trait .*DataAvailability|struct .*DataAvailability|struct .*DaClient' || true
echo "---- rg word boundary da ----"
rg -n --hidden --no-ignore -S '\bda\b' --glob '!/target/' || true


</blockquote></details>
<details>
<summary>crates/cli/src/cli_args/commands.rs (1)</summary><blockquote>

`53-59`: **Preset resolution logic looks good.**

Dev takes precedence over Specter, else None. Matches presets’ intent.

</blockquote></details>
<details>
<summary>crates/da/src/lib.rs (2)</summary><blockquote>

`98-99`: **Feature-gated aws module is correctly wired.**

Module visibility matches Cargo features.

---

`207-207`: **Resolved — Send+Sync bound is safe: only FinalizedEpoch implements the trait.**
Repo search shows a single impl (crates/da/src/lib.rs: impl VerifiableStateTransition for FinalizedEpoch). FinalizedEpoch's fields are Digest, SuccinctProof, Option<String>, u64 and no Rc/RefCell/Cell/UnsafeCell usages were found in crates/da.

</blockquote></details>
<details>
<summary>crates/da/Cargo.toml (2)</summary><blockquote>

`14-14`: **Feature composition looks reasonable.**

aws feature correctly pulls only optional deps.

---

`55-55`: **wasm bindgen futures inclusion is correct for wasm32.**

Matches the new wasm start path in memory DA.

</blockquote></details>
<details>
<summary>crates/da/src/memory.rs (1)</summary><blockquote>

`15-16`: **wasm import is fine.**

Guarded correctly behind target_arch gate.

</blockquote></details>
<details>
<summary>crates/da/src/celestia/full_node.rs (3)</summary><blockquote>

`294-297`: **event_channel() exposure is correct.**

Matches other DA backends and enables uniform event wiring.

---

`240-270`: **Ensure StreamExt is in scope for header_sub.next().await.**

If `.next()` isn’t inherent on the subscription type, import the extension trait.

Apply this import if needed:

```diff
+use futures::StreamExt;

If that triggers a build error due to optional futures, switch to tokio-stream or make futures non-optional in Cargo.toml.


299-300: DA trait split adoption looks correct.

Light-only start moved here; DataAvailabilityLayer keeps R/W + subscriptions. Good separation.

crates/da/src/factory.rs (1)

18-27: AWS light-client is intentionally un-gated for wasm32; full-node is gated
factory.rs imports AwsFullNode under #[cfg(all(feature = "aws", not(target_arch = "wasm32")))] while AwsLight is imported under #[cfg(feature = "aws")]; crates/da/src/aws/mod.rs re-exports full_node with #[cfg(not(target_arch = "wasm32"))] and exports light_client unconditionally. The gating is intentional — no change required.

Likely an incorrect or invalid review comment.

crates/da/src/aws/client.rs (1)

263-266: Epoch key derivation: index source looks suspect.

Using epoch.height as the per-height “epoch index” is likely wrong unless height==index by design. Please confirm the intended index field and path contract.

If an index field exists (e.g., epoch.index), prefer:

let epoch_key = self.epoch_key(height, epoch.index);

Otherwise, consider passing the index explicitly to submit_finalized_epoch.

…nitialize_sync_target()

- Move start() method from DataAvailabilityLayer to LightDataAvailabilityLayer trait
- Remove initialize_sync_target() method from DataAvailabilityLayer trait
- Add Sync bound to VerifiableStateTransition trait
- Update Celestia implementations to match new trait structure
- Update InMemory implementation with proper WASM support for start() method
- Add aws feature flag with AWS SDK dependencies
- Add optional dependencies: aws-config, aws-sdk-s3, aws-sdk-sts
- Add supporting dependencies: futures, serde_json, thiserror, wasm-bindgen-futures
- Add AwsClient with S3 operations for epochs and transactions
- Add comprehensive configuration types for light client and full node
- Support multiple credential types: default, profile, static, and STS
- Add WORM compliance features: retention policies and legal holds
- Add cross-region replication and disaster recovery configuration
- Add AwsLightDataAvailabilityLayer for read-only epoch access
- Support efficient retrieval of finalized epochs from S3
- Add proper error handling and retry logic
- Support configurable timeouts and retry counts
- Implement LightDataAvailabilityLayer trait
- Add AwsFullNodeDataAvailabilityLayer with complete read-write functionality
- Support publishing epochs and transactions with WORM compliance
- Implement automatic Object Lock with configurable retention periods
- Add support for legal holds and cross-region replication
- Support concurrent uploads with proper synchronization
- Implement both DataAvailabilityLayer and LightDataAvailabilityLayer traits
- Add AWS module exports with proper feature gating
- Add extensive documentation covering WORM compliance features
- Include usage examples for both light client and full node
- Document security considerations and IAM permissions
- Provide data organization and architecture overview
- Add AWS variants to LightClientDAConfig and FullNodeDAConfig enums
- Implement factory functions for AWS DA layer creation
- Add proper error handling and retry logic for AWS connections
- Support feature-gated compilation for AWS backend
- Add aws-config, aws-sdk-s3, aws-sdk-sts dependencies
- Add wasm-bindgen-futures for WASM compatibility
- Add futures and thiserror for async support
- Call start() on DA layer in both WASM and non-WASM light client run methods
- Ensures DA layer is properly initialized before sync operations begin
- Replace 'in planning' language with current AWS S3 support
- Document multiple DA backend support including Celestia and AWS S3
- Clarify WORM-compliant enterprise deployment options
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
crates/da/src/lib.rs (1)

42-59: Docs: Light client example should call start() now.

The example omits start(), but Celestia light and in-memory paths require it before reads.

Apply this diff to the doc example:

     let config = LightClientDAConfig::InMemory;
     let da = create_light_client_da_layer(&config).await?;
+    da.start().await?;
 
     let epochs = da.get_finalized_epochs(100).await?;
crates/da/src/aws/full_node.rs (1)

190-201: Transaction offset never advances; repeated submissions may clash.

Increment the offset after successful publish so subsequent calls append.

Apply this diff to fix the logic:

 async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
     // Take read lock to get consistent height and prevent increments during submission
     let height_guard = self.current_height.read().await;
     let height = *height_guard;
     let transaction_offset = self.transaction_offset.load(Ordering::Relaxed);
+    let count = transactions.len() as u64;
 
     self.client.submit_transactions(transactions, transaction_offset, height).await?;
+    
+    // Advance offset only on success
+    self.transaction_offset.fetch_add(count, Ordering::Relaxed);
 
     info!("Transactions submitted at height {}", height);
 
     Ok(height)
 }
crates/da/src/aws/client.rs (1)

399-443: Bucket-name validation is too strict; it rejects valid names (dots allowed, consecutive hyphens allowed).

This will incorrectly block many legitimate S3 buckets.

Apply this diff to align with AWS rules (3–63 chars, lowercase letters, digits, hyphens, dots; start/end with alnum; no IP address; no consecutive dots):

 fn verify_bucket_name(bucket_name: &str) -> Result<(), AwsDaClientError> {
     if bucket_name.len() < 3 || bucket_name.len() > 63 {
         return Err(AwsDaClientError::InitializationError(format!(
             "Bucket name '{}' must be between 3 and 63 characters",
             bucket_name
         )));
     }
 
     // Check if bucket name looks like an IP address first (before character validation)
     if bucket_name
         .split('.')
         .map(|part| part.parse::<u8>())
         .collect::<Result<Vec<_>, _>>()
         .is_ok()
         && bucket_name.split('.').count() == 4
     {
         return Err(AwsDaClientError::InitializationError(format!(
             "Bucket name '{}' cannot be formatted as an IP address",
             bucket_name
         )));
     }
 
-    if !bucket_name.chars().all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-') {
+    if !bucket_name
+        .chars()
+        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.')
+    {
         return Err(AwsDaClientError::InitializationError(format!(
-            "Bucket name '{}' can only contain lowercase letters, numbers, and hyphens",
+            "Bucket name '{}' can only contain lowercase letters, numbers, hyphens, and dots",
             bucket_name
         )));
     }
 
-    if bucket_name.starts_with('-') || bucket_name.ends_with('-') {
+    if !bucket_name.chars().next().map_or(false, |c| c.is_ascii_lowercase() || c.is_ascii_digit())
+        || !bucket_name.chars().last().map_or(false, |c| c.is_ascii_lowercase() || c.is_ascii_digit())
+    {
         return Err(AwsDaClientError::InitializationError(format!(
-            "Bucket name '{}' cannot start or end with a hyphen",
+            "Bucket name '{}' must start and end with a letter or number",
             bucket_name
         )));
     }
 
-    if bucket_name.contains("--") {
+    if bucket_name.contains("..") {
         return Err(AwsDaClientError::InitializationError(format!(
-            "Bucket name '{}' cannot contain consecutive hyphens",
+            "Bucket name '{}' cannot contain consecutive dots",
             bucket_name
         )));
     }
 
     Ok(())
 }
🧹 Nitpick comments (5)
crates/da/src/aws/light_client.rs (3)

1-1: Consider using anyhow::Context for better error context.

Instead of converting errors to strings, consider using anyhow::Context to preserve the error chain while adding context.

use anyhow::{Result, Context};

55-80: Consider graceful shutdown for the background task.

The spawned task runs indefinitely without a shutdown mechanism. Consider adding a cancellation token or shutdown channel for clean termination.

Here's a suggested implementation with proper shutdown handling:

 async fn start_height_monitoring(&self) -> Result<(), DataAvailabilityError> {
     let client = Arc::new(self.client.clone());
     let event_publisher = self.event_channel.publisher();
     let poll_interval = self.block_time;
+    let shutdown = self.shutdown_tx.subscribe();
 
     let _handle = tokio::spawn(async move {
         let mut last_height = 0u64;
+        tokio::pin!(shutdown);
 
         loop {
-            match client.fetch_height().await {
-                Ok(Some(max_height)) => {
-                    if max_height > last_height {
-                        last_height = max_height;
-
-                        // Publish event
-                        event_publisher.send(PrismEvent::UpdateDAHeight { height: max_height });
-
-                        info!("Height updated to {}", max_height);
+            tokio::select! {
+                _ = &mut shutdown => {
+                    info!("Height monitoring task shutting down");
+                    break;
+                }
+                _ = tokio::time::sleep(poll_interval) => {
+                    match client.fetch_height().await {
+                        Ok(Some(max_height)) => {
+                            if max_height > last_height {
+                                last_height = max_height;
+
+                                // Publish event
+                                event_publisher.send(PrismEvent::UpdateDAHeight { height: max_height });
+
+                                info!("Height updated to {}", max_height);
+                            }
+                        }
+                        Ok(None) => {
+                            debug!("No height metadata available yet");
+                        }
+                        Err(e) => {
+                            warn!("Failed to check for height updates: {}", e);
+                        }
                     }
                 }
-                Ok(None) => {
-                    debug!("No height metadata available yet");
-                }
-                Err(e) => {
-                    warn!("Failed to check for height updates: {}", e);
-                }
             }
-
-            tokio::time::sleep(poll_interval).await;
         }
     });

You'll also need to add a shutdown channel to the struct:

 pub struct AwsLightDataAvailabilityLayer {
     /// AWS S3 client for data operations
     client: AwsDataAvailabilityClient,
 
     /// Event channel for publishing data availability events
     event_channel: Arc<EventChannel>,
 
     block_time: Duration,
+
+    /// Shutdown signal for background tasks
+    shutdown_tx: broadcast::Sender<()>,
 }

99-105: Use Context extension for better error context.

Consider using .context() to preserve the original error while adding context.

 async fn get_finalized_epochs(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
     self.client
         .fetch_epochs(height)
         .await
         .map(|epochs| epochs.into_iter().map(|e| Box::new(e) as VerifiableEpoch).collect())
-        .map_err(|e| anyhow!(e.to_string()))
+        .context(format!("Failed to fetch epochs at height {}", height))
 }
crates/da/src/aws/full_node.rs (1)

95-135: Consider adding shutdown mechanism for the background task.

Similar to the light client, the produce_blocks task runs indefinitely without a clean shutdown path.

Consider implementing a shutdown mechanism using cancellation tokens or channels for graceful termination during node shutdown. This would prevent orphaned tasks and ensure clean resource cleanup.

crates/da/src/aws/client.rs (1)

613-614: Consider making retention period calculation more explicit.

The retention calculation could overflow for very large values of retention_days.

-    let retain_until = DateTime::from(
-        SystemTime::now() + Duration::from_secs(self.retention_days as u64 * 24 * 3600),
-    );
+    let retain_until = DateTime::from(
+        SystemTime::now()
+            .checked_add(Duration::from_secs(
+                (self.retention_days as u64).saturating_mul(24 * 3600)
+            ))
+            .ok_or_else(|| AwsDaClientError::UploadFailed(
+                "Retention period calculation overflow".to_string()
+            ))?,
+    );
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 847fd90 and 198d494.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • Cargo.toml (2 hunks)
  • README.md (1 hunks)
  • crates/da/Cargo.toml (2 hunks)
  • crates/da/src/aws/client.rs (1 hunks)
  • crates/da/src/aws/config.rs (1 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
  • crates/da/src/aws/light_client.rs (1 hunks)
  • crates/da/src/aws/mod.rs (1 hunks)
  • crates/da/src/celestia/full_node.rs (1 hunks)
  • crates/da/src/celestia/light_client.rs (4 hunks)
  • crates/da/src/factory.rs (6 hunks)
  • crates/da/src/lib.rs (4 hunks)
  • crates/da/src/memory.rs (2 hunks)
  • crates/node_types/lightclient/src/lightclient.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
  • crates/da/src/aws/mod.rs
  • crates/da/src/factory.rs
  • crates/node_types/lightclient/src/lightclient.rs
  • Cargo.toml
  • README.md
  • crates/da/Cargo.toml
  • crates/da/src/memory.rs
  • crates/da/src/celestia/light_client.rs
  • crates/da/src/aws/config.rs
  • crates/da/src/celestia/full_node.rs
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/da/src/aws/light_client.rs
  • crates/da/src/aws/client.rs
  • crates/da/src/lib.rs
  • crates/da/src/aws/full_node.rs
crates/**/src/lib.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Add comments primarily for public APIs exposed at the crate level

Files:

  • crates/da/src/lib.rs
crates/**/src/@(lib|mod).rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use longer examples only in module root files (mod.rs or lib.rs)

Files:

  • crates/da/src/lib.rs
🧬 Code graph analysis (4)
crates/da/src/aws/light_client.rs (1)
crates/da/src/aws/client.rs (1)
  • new_from_light_da_config (72-110)
crates/da/src/aws/client.rs (1)
crates/da/src/aws/config.rs (1)
  • profile (433-437)
crates/da/src/lib.rs (5)
crates/da/src/celestia/full_node.rs (1)
  • start (242-270)
crates/da/src/celestia/light_client.rs (1)
  • start (376-429)
crates/da/src/memory.rs (2)
  • start (140-146)
  • start (149-155)
crates/node_types/prover/src/syncer.rs (1)
  • start (63-66)
crates/node_types/uniffi-lightclient/src/lib.rs (1)
  • start (73-76)
crates/da/src/aws/full_node.rs (1)
crates/da/src/aws/client.rs (1)
  • new_from_full_da_config (112-155)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: unit-test
  • GitHub Check: clippy
  • GitHub Check: integration-test
  • GitHub Check: unused dependencies
  • GitHub Check: coverage
  • GitHub Check: build-and-push-image
🔇 Additional comments (6)
crates/da/src/lib.rs (4)

28-33: LGTM! AWS S3 DA backend documentation is well-structured.

The new documentation clearly explains the AWS S3 backend's WORM compliance model, configuration options, and support for both light client and full node protocols.


98-99: LGTM! AWS module properly feature-gated.

The AWS module is correctly exposed with the aws feature flag, following the same pattern as other optional modules.


207-207: LGTM! Appropriate trait bounds for VerifiableStateTransition.

Adding Sync to complement the existing Send bound ensures thread-safe usage across async boundaries.


387-387: LGTM! Startup lifecycle correctly moved to LightDataAvailabilityLayer.

Moving the start method to the light layer aligns with the architectural change where light clients now own their initialization sequence, as evidenced by the implementations across Celestia, AWS, and in-memory backends.

crates/da/src/aws/client.rs (2)

268-382: LGTM! Comprehensive S3 client creation with multiple credential strategies.

The implementation properly handles default credentials, explicit credentials, and STS role assumption with appropriate error handling and configuration options.


364-365: Session token must be explicitly converted to String.

The session_token() method returns &str, not String. Need to call to_string().

 let aws_credentials = Credentials::new(
     credentials.access_key_id(),
     credentials.secret_access_key(),
-    Some(credentials.session_token().to_string()),
+    Some(credentials.session_token().to_string()),

Wait, I see this is already correct in the code. Skipping this comment.

As AWS Rust SDK does not support WASM, we are disabling the module for that architecture.
- Add allow_mock_proofs CLI argument to light client commands
- Implement configuration override handling in apply_args
- Add comprehensive tests for CLI, env, and config file precedence
- Add atomic boolean guards to prevent duplicate start calls
- Implement idempotent start behavior across all DA implementations
- Fix potential race conditions in concurrent start scenarios
- Fix epoch key naming format to use underscore separator
- Add proper WASM32 target exclusion for AWS features
- Implement transaction offset management and reset logic
- Remove verbose debug logging from factory initialization
- Add missing start() call in library usage example
- Update AWS module documentation with correct file structure
- Improve bucket layout examples with proper naming conventions
- Clean up redundant section headers in documentation
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
crates/da/Cargo.toml (1)

14-14: Build can fail without --features aws: futures is optional while .next().await is used elsewhere.

Make futures non-optional (or add tokio-stream + import) so StreamExt is always available.

Apply this minimal fix (Option A):

 [features]
-default = []
-aws = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-sdk-sts", "dep:futures"]
+default = []
+aws = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-sdk-sts"]

 [dependencies]
@@
-serde_json = { workspace = true }
+serde_json = { workspace = true }
@@
-thiserror = { workspace = true }
+thiserror = { workspace = true }
+futures = { workspace = true }
@@
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@@
-aws-config = { workspace = true, optional = true }
-aws-sdk-s3 = { workspace = true, optional = true }
-aws-sdk-sts = { workspace = true, optional = true }
-futures = { workspace = true, optional = true }
+aws-config = { workspace = true, optional = true }
+aws-sdk-s3 = { workspace = true, optional = true }
+aws-sdk-sts = { workspace = true, optional = true }

Alternatively (Option B): keep futures optional and add tokio-stream as non-optional plus use tokio_stream::StreamExt; at Celestia call sites.

#!/bin/bash
set -euo pipefail
echo "=== Call sites of .next().await and StreamExt imports in DA crate ==="
rg -nP -C2 '\.next\(\)\.await' crates/da/src || true
rg -nP -C2 '\b(StreamExt)\b' crates/da/src || true

Also applies to: 45-49

crates/da/src/aws/client.rs (1)

399-443: Bucket‑name validation is too strict (disallows dots, rejects many valid names). Align with AWS rules.

Allow dots, require start/end alnum, and forbid consecutive dots; keep IP‑address check. This was flagged previously.

-    fn verify_bucket_name(bucket_name: &str) -> Result<(), AwsDaClientError> {
+    fn verify_bucket_name(bucket_name: &str) -> Result<(), AwsDaClientError> {
         if bucket_name.len() < 3 || bucket_name.len() > 63 {
             return Err(AwsDaClientError::InitializationError(format!(
                 "Bucket name '{}' must be between 3 and 63 characters",
                 bucket_name
             )));
         }
 
         // Check if bucket name looks like an IP address first (before character validation)
         if bucket_name
             .split('.')
             .map(|part| part.parse::<u8>())
             .collect::<Result<Vec<_>, _>>()
             .is_ok()
             && bucket_name.split('.').count() == 4
         {
             return Err(AwsDaClientError::InitializationError(format!(
                 "Bucket name '{}' cannot be formatted as an IP address",
                 bucket_name
             )));
         }
 
-        if !bucket_name.chars().all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-') {
+        if !bucket_name
+            .chars()
+            .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.')
+        {
             return Err(AwsDaClientError::InitializationError(format!(
-                "Bucket name '{}' can only contain lowercase letters, numbers, and hyphens",
+                "Bucket name '{}' can only contain lowercase letters, numbers, hyphens, and dots",
                 bucket_name
             )));
         }
 
-        if bucket_name.starts_with('-') || bucket_name.ends_with('-') {
+        // No consecutive dots
+        if bucket_name.contains("..") {
             return Err(AwsDaClientError::InitializationError(format!(
-                "Bucket name '{}' cannot start or end with a hyphen",
+                "Bucket name '{}' cannot contain consecutive dots",
                 bucket_name
             )));
         }
 
-        if bucket_name.contains("--") {
+        // Must start and end with alphanumeric
+        let first = bucket_name.as_bytes().first().copied();
+        let last = bucket_name.as_bytes().last().copied();
+        let is_alnum = |b: u8| (b'a'..=b'z').contains(&b) || (b'0'..=b'9').contains(&b);
+        if !first.map(is_alnum).unwrap_or(false) || !last.map(is_alnum).unwrap_or(false) {
             return Err(AwsDaClientError::InitializationError(format!(
-                "Bucket name '{}' cannot contain consecutive hyphens",
+                "Bucket name '{}' must start and end with a letter or number",
                 bucket_name
             )));
         }
 
         Ok(())
     }
🧹 Nitpick comments (13)
crates/cli/src/cli_args/commands.rs (1)

41-45: Make --allow-mock-proofs a friendlier tri-state flag (0 or 1 value).

With Option<bool> the flag currently requires an explicit value (--allow-mock-proofs true). Allow bare usage while still supporting =false.

Apply this diff:

-    #[arg(long)]
-    /// Whether to allow the verification of mock proofs
-    /// Default: false
+    #[arg(long, num_args = 0..=1, default_missing_value = "true")]
+    /// Allow accepting mock proofs (testing only).
+    /// Default: false
     pub allow_mock_proofs: Option<bool>,
crates/da/Cargo.toml (1)

16-18: Duplicate blockstore declaration.

blockstore appears in both general and non-wasm sections. Remove the target-specific one unless you intend to gate it.

 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@@
-tokio = { workspace = true }
-blockstore = { workspace = true }
+tokio = { workspace = true }

Please confirm whether blockstore compiles for wasm; if not, move it exclusively into the non‑wasm section instead.

Also applies to: 44-44

crates/da/src/memory.rs (2)

147-160: Non‑wasm start() made idempotent and spawns producer once.

Consider exposing a shutdown handle (e.g., CancellationToken) to stop the infinite loop cleanly in tests.


162-175: WASM start() path mirrors non‑wasm with spawn_local.

Same optional shutdown handle consideration for symmetry.

crates/da/src/aws/light_client.rs (2)

65-90: Background task lacks shutdown; consider a cancellation token.

Add a CancellationToken or similar to support graceful shutdown and tests.


119-125: Avoid erasing error context when mapping to anyhow::Error.

Use Into::into to preserve the source error chain.

-            .map(|epochs| epochs.into_iter().map(|e| Box::new(e) as VerifiableEpoch).collect())
-            .map_err(|e| anyhow!(e.to_string()))
+            .map(|epochs| epochs.into_iter().map(|e| Box::new(e) as VerifiableEpoch).collect())
+            .map_err(anyhow::Error::from)
crates/da/src/aws/full_node.rs (3)

84-92: Initialize current_height to 0 for consistency with in‑memory DA semantics.

Memory DA increments from 0 → 1 and broadcasts 1; this layer should mirror that to avoid off‑by‑one across implementations.

-            current_height: Arc::new(RwLock::new(1)),
+            current_height: Arc::new(RwLock::new(0)),

Please confirm there are no callers relying on a non‑zero initial height.


101-106: Use tokio::time::interval to avoid timer drift from repeated sleep.

interval provides steadier ticks under load.

-            loop {
-                tokio::time::sleep(block_time).await;
+            let mut ticker = tokio::time::interval(block_time);
+            loop {
+                ticker.tick().await;

12-16: Remove unused JoinHandle import.

It’s not used and may trigger warnings under #![deny(warnings)].

-use tokio::{
-    sync::{RwLock, broadcast},
-    task::JoinHandle,
-    time::Duration,
-};
+use tokio::{
+    sync::{RwLock, broadcast},
+    time::Duration,
+};
crates/da/src/aws/client.rs (4)

672-678: Map list‑objects failures to RequestFailed, not InitializationError.

This is an operational error during listing, not a config bootstrap failure.

-            let response = request.send().await.map_err(|e| {
-                AwsDaClientError::InitializationError(format!(
-                    "Failed to list objects in bucket '{}' with prefix '{}': {}",
-                    bucket, prefix, e
-                ))
-            })?;
+            let response = request.send().await.map_err(|e| {
+                AwsDaClientError::RequestFailed(format!(
+                    "Failed to list objects in bucket '{}' with prefix '{}': {}",
+                    bucket, prefix, e
+                ))
+            })?;

512-524: Avoid string‑matching for 404/NoSuchKey; match typed SDK errors.

Use the generated error types (e.g., aws_sdk_s3::operation::get_object::GetObjectError and its kind()) to detect not‑found, instead of to_string() checks which are brittle across SDK versions.

Would you like me to propose a typed error‑matching patch once we confirm the exact SDK version in Cargo.lock?


448-475: Similarly, detect Object Lock config absence via typed errors instead of substring checks.

Prefer matching the service error code rather than contains("ObjectLockConfigurationNotFoundError").


233-241: Avoid unwrap() on semaphore acquisition; use owned permits and propagate errors.

This prevents panics if the semaphore gets closed.

-                async move {
-                    let _permit = sem.acquire().await.unwrap();
-                    task.await
-                }
+                async move {
+                    let _permit = sem
+                        .acquire_owned()
+                        .await
+                        .map_err(|_| AwsDaClientError::UploadFailed("Semaphore closed".into()))?;
+                    task.await
+                }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 198d494 and 582b787.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • Cargo.toml (1 hunks)
  • crates/cli/src/apply_args/commands.rs (1 hunks)
  • crates/cli/src/cli_args/commands.rs (1 hunks)
  • crates/cli/src/tests/mod.rs (4 hunks)
  • crates/da/Cargo.toml (2 hunks)
  • crates/da/src/aws/client.rs (1 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
  • crates/da/src/aws/light_client.rs (1 hunks)
  • crates/da/src/aws/mod.rs (1 hunks)
  • crates/da/src/factory.rs (6 hunks)
  • crates/da/src/lib.rs (5 hunks)
  • crates/da/src/memory.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/da/src/factory.rs
  • crates/da/src/aws/mod.rs
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/cli/src/apply_args/commands.rs
  • crates/cli/src/cli_args/commands.rs
  • crates/da/src/lib.rs
  • crates/cli/src/tests/mod.rs
  • crates/da/src/aws/full_node.rs
  • crates/da/src/aws/light_client.rs
  • crates/da/src/aws/client.rs
  • crates/da/src/memory.rs
crates/**/src/lib.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Add comments primarily for public APIs exposed at the crate level

Files:

  • crates/da/src/lib.rs
crates/**/src/@(lib|mod).rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use longer examples only in module root files (mod.rs or lib.rs)

Files:

  • crates/da/src/lib.rs
🧠 Learnings (2)
📚 Learning: 2025-09-16T11:08:31.029Z
Learnt from: jns-ps
PR: deltadevsde/prism#428
File: crates/node_types/lightclient/src/factory.rs:83-89
Timestamp: 2025-09-16T11:08:31.029Z
Learning: The `enable_mock_proof_verification()` method in the LightClient struct already contains appropriate warnings about mock proof usage, so additional warnings at call sites are not needed.

Applied to files:

  • crates/cli/src/apply_args/commands.rs
  • crates/cli/src/cli_args/commands.rs
  • crates/cli/src/tests/mod.rs
📚 Learning: 2025-09-16T11:08:31.029Z
Learnt from: jns-ps
PR: deltadevsde/prism#428
File: crates/node_types/lightclient/src/factory.rs:83-89
Timestamp: 2025-09-16T11:08:31.029Z
Learning: The `enable_mock_proof_verification()` method in the LightClient struct already contains an appropriate error-level warning message ("PROOF VERIFICATION IS DISABLED - FOR TESTING ONLY"), so additional warnings at call sites are redundant.

Applied to files:

  • crates/cli/src/apply_args/commands.rs
  • crates/cli/src/cli_args/commands.rs
  • crates/cli/src/tests/mod.rs
🧬 Code graph analysis (6)
crates/da/src/lib.rs (5)
crates/da/src/memory.rs (2)
  • start (148-160)
  • start (163-175)
crates/da/src/celestia/full_node.rs (1)
  • start (242-270)
crates/da/src/celestia/light_client.rs (1)
  • start (376-429)
crates/node_types/prover/src/syncer.rs (1)
  • start (63-66)
crates/node_types/uniffi-lightclient/src/lib.rs (1)
  • start (73-76)
crates/cli/src/tests/mod.rs (2)
crates/node_types/lightclient/src/factory.rs (1)
  • default (38-52)
crates/cli/src/apply_args/traits.rs (2)
  • load (24-37)
  • config_path (13-13)
crates/da/src/aws/full_node.rs (3)
crates/da/src/aws/light_client.rs (5)
  • new (39-48)
  • event_channel (115-117)
  • event_publisher (51-53)
  • start (100-113)
  • get_finalized_epochs (119-125)
crates/da/src/memory.rs (11)
  • new (56-78)
  • event_channel (190-192)
  • produce_blocks (105-137)
  • start (148-160)
  • start (163-175)
  • get_finalized_epochs (177-188)
  • get_latest_height (202-204)
  • get_transactions (218-224)
  • submit_finalized_epoch (206-216)
  • submit_transactions (226-231)
  • subscribe_to_heights (198-200)
crates/da/src/aws/client.rs (1)
  • new_from_full_da_config (112-155)
crates/da/src/aws/light_client.rs (3)
crates/da/src/aws/full_node.rs (4)
  • new (73-93)
  • start (144-163)
  • event_channel (173-175)
  • get_finalized_epochs (165-171)
crates/da/src/memory.rs (5)
  • new (56-78)
  • start (148-160)
  • start (163-175)
  • event_channel (190-192)
  • get_finalized_epochs (177-188)
crates/da/src/aws/client.rs (1)
  • new_from_light_da_config (72-110)
crates/da/src/aws/client.rs (1)
crates/da/src/aws/config.rs (1)
  • profile (433-437)
crates/da/src/memory.rs (3)
crates/da/src/lib.rs (2)
  • new (188-190)
  • start (388-388)
crates/da/src/aws/full_node.rs (2)
  • new (73-93)
  • start (144-163)
crates/da/src/aws/light_client.rs (2)
  • new (39-48)
  • start (100-113)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build-and-push-image
  • GitHub Check: unused dependencies
  • GitHub Check: coverage
  • GitHub Check: integration-test
  • GitHub Check: unit-test
🔇 Additional comments (16)
crates/cli/src/apply_args/commands.rs (1)

25-27: Propagation looks correct.

Tri-state semantics preserved: only overrides when CLI provided.

crates/cli/src/tests/mod.rs (4)

50-59: Good: asserts CLI precedence for allow_mock_proofs.

Covers the CLI→config path.


107-115: Good: default remains false when unspecified.

Covers the missing-file fallback path.


131-131: Good: preset path unaffected.

Explicitly leaving allow_mock_proofs unset here is correct.


75-92: Verify namespaced env var handling and add a test.

Add a companion test in crates/cli/src/tests/mod.rs (next to the existing env-override test around lines ~74–92) that sets PRISM__LIGHT_CLIENT__ALLOW_MOCK_PROOFS=true and asserts config.light_client.allow_mock_proofs == true; if the loader doesn't map PRISM__LIGHT_CLIENT__* namespaced keys, either update the loader to support them or document that only PRISM__ALLOW_MOCK_PROOFS is supported.

crates/da/Cargo.toml (2)

39-49: Good gating: AWS SDKs under non-wasm and optional.


50-56: WASM path wired correctly with wasm-bindgen-futures.

crates/da/src/lib.rs (5)

28-33: Docs: Clear AWS section added.


99-100: Module gating tightened to all(feature = "aws", not(wasm32)). Good.


51-52: Doc example now calls da.start().await?. Good.


387-389: Trait change verified — all impls implement start().
Implementations found: crates/da/src/memory.rs, crates/da/src/celestia/full_node.rs, crates/da/src/celestia/light_client.rs, crates/da/src/aws/light_client.rs, crates/da/src/aws/full_node.rs.


208-208: Added Sync bound — verify implementors are Sync

Looks correct. Only implementor found: FinalizedEpoch (crates/da/src/lib.rs:254). Confirm FinalizedEpoch and all types it contains are Sync (no Rc/RefCell/Cell/UnsafeCell or other non‑Sync types). Paste the struct here or run: sed -n '230,300p' crates/da/src/lib.rs to show its definition.

crates/da/src/memory.rs (2)

9-12: Imports for AtomicBool look good.


45-46: Idempotent startup flag added and initialized. Good.

Also applies to: 73-73, 98-98

crates/da/src/aws/light_client.rs (1)

100-113: Idempotent start() with background monitor: LGTM.

Cargo.toml (1)

132-136: Approve — AWS workspace deps confined to DA crate

aws-config / aws-sdk-s3 / aws-sdk-sts are declared only in crates/da/Cargo.toml as optional behind the "aws" feature and gated under target.'cfg(not(target_arch = "wasm32"))'; ripgrep shows AWS usage only under crates/da/src/aws and no matches in wasm crates (e.g., crates/node_types/wasm-lightclient).

Comment on lines +190 to +199
let height_guard = self.current_height.read().await;
let current_height = *height_guard;

self.client.submit_finalized_epoch(epoch, current_height).await?;

info!("Finalized epoch submitted at height {}", current_height);

// Return the height where the epoch was published
Ok(current_height)
}
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t hold the read lock across .await; also return the inclusion height (height + 1) to match memory DA.

Holding the read lock during the async upload blocks height progression. Drop it before awaiting. Memory DA returns latest + 1; align for cross‑backend consistency.

-        let height_guard = self.current_height.read().await;
-        let current_height = *height_guard;
-
-        self.client.submit_finalized_epoch(epoch, current_height).await?;
+        let height_guard = self.current_height.read().await;
+        let current_height = *height_guard;
+        drop(height_guard);
+
+        self.client
+            .submit_finalized_epoch(epoch, current_height)
+            .await?;
@@
-        Ok(current_height)
+        Ok(current_height + 1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let height_guard = self.current_height.read().await;
let current_height = *height_guard;
self.client.submit_finalized_epoch(epoch, current_height).await?;
info!("Finalized epoch submitted at height {}", current_height);
// Return the height where the epoch was published
Ok(current_height)
}
let height_guard = self.current_height.read().await;
let current_height = *height_guard;
drop(height_guard);
self.client
.submit_finalized_epoch(epoch, current_height)
.await?;
info!("Finalized epoch submitted at height {}", current_height);
// Return the height where the epoch was published
Ok(current_height + 1)
}
🤖 Prompt for AI Agents
In crates/da/src/aws/full_node.rs around lines 190 to 199, you're holding the
read lock across an .await and returning the raw height; to fix: read and copy
the current height into a local variable, explicitly drop the read guard before
calling any async methods (so the lock isn't held across await), then call
self.client.submit_finalized_epoch with the copied height, and finally return
the inclusion height by returning current_height + 1 to match Memory DA
semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@distractedm1nd Perfect! Thanks for confirming the fix. The explicit lock drop and returning height + 1 will ensure proper async behavior and cross-backend consistency with the memory DA implementation.


If you found this review helpful, would you consider giving us a shout-out on X?

Thank you for using CodeRabbit!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/da/src/factory.rs (1)

109-129: API change: require CancellationToken for DA layer — update remaining call sites

Update the remaining call sites and doc examples to pass a CancellationToken (e.g. cancellation_token.clone() or CancellationToken::new()):

  • crates/node_types/wasm-lightclient/src/worker.rs:68 — create_light_client_da_layer(&config.da).await
  • crates/da/src/factory.rs:421 — create_light_client_da_layer(&config).await
  • crates/da/src/factory.rs:469 — create_full_node_da_layer(&config).await
  • Doc examples to update:
    • crates/da/src/lib.rs:50 (light-client example)
    • crates/da/src/lib.rs:82 (full-node example)
    • crates/node_types/lightclient/src/lib.rs:35 (light-client example)
    • crates/node_types/prover/src/lib.rs:57 and 112 (full-node examples)
♻️ Duplicate comments (4)
crates/da/src/memory.rs (1)

46-51: Good: start idempotency and cancellation wired in.

Adding started and a CancellationToken addresses runaway producer tasks and enables graceful shutdown. This also resolves the prior idempotency concern.

Also applies to: 55-56, 62-63, 78-80

crates/da/src/aws/full_node.rs (3)

231-232: Return inclusion height from submit_transactions for cross-backend consistency.

-        Ok(height)
+        Ok(height + 1)

If Memory/Celestia semantics differ, keep consistent with those; otherwise adopt inclusion height.


121-148: Fix off-by-one: broadcast/publish/metadata should use the incremented height.

Current code notifies with the pre-increment height, diverging from get_latest_height and memory DA semantics.

-                // Take write lock to atomically increment height and prevent submissions
-                let mut height_guard = current_height.write().await;
-                let completed_height = *height_guard;
-                *height_guard += 1;
-                drop(height_guard);
+                // Increment height and notify using the new height
+                let mut height_guard = current_height.write().await;
+                *height_guard += 1;
+                let new_height = *height_guard;
+                drop(height_guard);
@@
-                // Broadcast height update
-                let _ = height_update_tx.send(completed_height);
+                // Broadcast height update
+                let _ = height_update_tx.send(new_height);
@@
-                event_publisher.send(PrismEvent::UpdateDAHeight {
-                    height: completed_height,
-                });
+                event_publisher.send(PrismEvent::UpdateDAHeight { height: new_height });
@@
-                let metadata = AwsDaMetaInfo {
-                    current_height: completed_height,
-                };
+                let metadata = AwsDaMetaInfo { current_height: new_height };
@@
-                        info!("Completed block {}", completed_height);
+                        info!("Completed block {}", new_height);

Also applies to: 138-147


206-215: Release read lock before await; return inclusion height to match Memory DA.

Holding the lock across await can stall block production; returning height+1 aligns with other backends.

         // Take read lock to get consistent height and prevent increments during submission
         let height_guard = self.current_height.read().await;
         let current_height = *height_guard;
+        drop(height_guard);
 
         self.client.submit_finalized_epoch(epoch, current_height).await?;
@@
-        // Return the height where the epoch was published
-        Ok(current_height)
+        // Return inclusion height
+        Ok(current_height + 1)
🧹 Nitpick comments (12)
crates/node_types/lightclient/src/tests/mod.rs (1)

33-36: Constrain mock start() to a single invocation

Good to set the expectation in the macro so all tests inherit it. Also assert exactly one call to catch accidental retries.

-        mock_da.expect_start().returning(|| Ok(()));
+        mock_da.expect_start().times(1).returning(|| Ok(()));
crates/da/src/memory.rs (1)

123-149: Drop write-locks before broadcasting to reduce contention.

Minor: send() while holding blocks and latest_height write-locks can delay readers. Drop guards first.

Apply this diff inside produce_blocks after pushing the block and before the sends:

-                blocks.push(new_block.clone());
-
-                // Notify subscribers of the new height and block
-                let _ = self.height_update_tx.send(*latest_height);
-                let _ = self.block_update_tx.send(new_block);
+                blocks.push(new_block.clone());
+
+                // Release locks before broadcasting
+                drop(blocks);
+                drop(pending_transactions);
+                drop(pending_epochs);
+                drop(latest_height);
+
+                // Notify subscribers of the new height and block
+                let _ = self.height_update_tx.send(new_block.height);
+                let _ = self.block_update_tx.send(new_block);
crates/cli/src/main.rs (1)

64-65: LGTM: CancellationToken propagated to DA constructors.

Optional: add a ctrl_c fallback for non-Unix targets to cancel the token on Windows.

@@
-    tokio::spawn(async move {
-        use tokio::signal::unix::{SignalKind, signal};
+    tokio::spawn(async move {
+        #[cfg(unix)]
+        {
+            use tokio::signal::unix::{SignalKind, signal};
@@
-        cancellation_for_signal.cancel();
+            cancellation_for_signal.cancel();
+        }
+        #[cfg(not(unix))]
+        {
+            let _ = tokio::signal::ctrl_c().await;
+            cancellation_for_signal.cancel();
+        }
     });

Also applies to: 83-84, 101-102

crates/da/src/aws/light_client.rs (4)

135-140: Avoid lossy error mapping: pass the source error to anyhow directly.

Prevents losing context and backtraces.

-            .map_err(|e| anyhow!(e.to_string()))
+            .map_err(|e| anyhow!(e))

69-73: Unnecessary Arc around client.

Clone and move the client into the task; Arc adds overhead without benefit here.

-        let client = Arc::new(self.client.clone());
+        let client = self.client.clone();

74-85: Use tokio::time::interval to avoid drift and get predictable ticks.

sleep in a loop accumulates delay; interval is the idiomatic timer.

-        let _handle = tokio::spawn(async move {
-            let mut last_height = 0u64;
+        let _handle = tokio::spawn(async move {
+            let mut last_height = 0u64;
+            let mut interval = tokio::time::interval(poll_interval);
@@
-                    _ = tokio::time::sleep(poll_interval) => {
+                    _ = interval.tick() => {

Also applies to: 83-101


74-76: Consider tracking the spawned task for orderly shutdown.

Right now the JoinHandle is dropped; unexpected panics are invisible and shutdown relies solely on the token. Store the handle (or add a Drop that cancels the token) if you need stronger guarantees.

crates/da/src/aws/full_node.rs (5)

181-187: Avoid lossy error mapping in get_finalized_epochs.

-            .map_err(|e| anyhow!(e.to_string()))
+            .map_err(|e| anyhow!(e))

200-202: Avoid lossy error mapping in get_transactions.

-        self.client.fetch_transactions(height).await.map_err(|e| anyhow!(e.to_string()))
+        self.client.fetch_transactions(height).await.map_err(|e| anyhow!(e))

81-101: Seed current_height from persisted metadata (resume on restart).

Starting at 1 can regress height after restarts. Read metadata and fall back gracefully.

     ) -> Result<Self, DataAvailabilityError> {
         let client = AwsDataAvailabilityClient::new_from_full_da_config(config.clone()).await?;
 
-        let (height_update_tx, _) = broadcast::channel(100);
+        let (height_update_tx, _) = broadcast::channel(100);
         let event_channel = Arc::new(EventChannel::new());
 
+        // Initialize height from metadata if present
+        let initial_height = match client.fetch_height().await {
+            Ok(Some(h)) => h,
+            Ok(None) => 0,
+            Err(e) => {
+                warn!("Failed to fetch initial height from metadata: {}", e);
+                0
+            }
+        };
+
         debug!(
             "AWS full node initialized for region '{}', epochs bucket '{}', retention {} days",
             config.light_client.region, config.light_client.epochs_bucket, config.retention_days
         );
 
         Ok(Self {
             client: Arc::new(client),
-            current_height: Arc::new(RwLock::new(1)),
+            current_height: Arc::new(RwLock::new(initial_height)),
             transaction_offset: Arc::new(AtomicU64::new(0)),
             block_time: config.light_client.block_time,
             height_update_tx,
             event_channel,
             started: Arc::new(AtomicBool::new(false)),
             cancellation_token,
         })

Also applies to: 91-97


12-16: Remove unused import.

JoinHandle isn’t used.

 use tokio::{
     sync::{RwLock, broadcast},
-    task::JoinHandle,
     time::Duration,
 };

119-120: Optional: prefer interval over sleep in block production.

Reduces drift and provides immediate tick control.

-                    _ = tokio::time::sleep(block_time) => {
+                    _ = tokio::time::interval(block_time).tick() => {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 582b787 and 438a75c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • CLAUDE.md (1 hunks)
  • crates/cli/src/main.rs (3 hunks)
  • crates/da/Cargo.toml (2 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
  • crates/da/src/aws/light_client.rs (1 hunks)
  • crates/da/src/celestia/full_node.rs (4 hunks)
  • crates/da/src/factory.rs (9 hunks)
  • crates/da/src/memory.rs (7 hunks)
  • crates/events/Cargo.toml (1 hunks)
  • crates/node_types/lightclient/src/tests/mod.rs (2 hunks)
  • crates/node_types/prover/src/prover/tests/mod.rs (7 hunks)
  • crates/node_types/uniffi-lightclient/src/lib.rs (1 hunks)
  • crates/node_types/wasm-lightclient/src/tests/mod.rs (5 hunks)
  • crates/tests/src/lib.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • CLAUDE.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/da/Cargo.toml
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/node_types/lightclient/src/tests/mod.rs
  • crates/node_types/uniffi-lightclient/src/lib.rs
  • crates/tests/src/lib.rs
  • crates/node_types/wasm-lightclient/src/tests/mod.rs
  • crates/da/src/aws/full_node.rs
  • crates/da/src/aws/light_client.rs
  • crates/da/src/memory.rs
  • crates/node_types/prover/src/prover/tests/mod.rs
  • crates/da/src/celestia/full_node.rs
  • crates/da/src/factory.rs
  • crates/cli/src/main.rs
crates/**/src/lib.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Add comments primarily for public APIs exposed at the crate level

Files:

  • crates/node_types/uniffi-lightclient/src/lib.rs
  • crates/tests/src/lib.rs
crates/**/src/@(lib|mod).rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use longer examples only in module root files (mod.rs or lib.rs)

Files:

  • crates/node_types/uniffi-lightclient/src/lib.rs
  • crates/tests/src/lib.rs
🧠 Learnings (1)
📚 Learning: 2025-09-16T14:44:17.678Z
Learnt from: CR
PR: deltadevsde/prism#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-16T14:44:17.678Z
Learning: Use tokio for I/O-bound async operations

Applied to files:

  • crates/da/src/celestia/full_node.rs
🧬 Code graph analysis (9)
crates/node_types/uniffi-lightclient/src/lib.rs (3)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/da/src/factory.rs (1)
  • create_light_client_da_layer (109-132)
crates/node_types/lightclient/src/factory.rs (1)
  • create_light_client (76-89)
crates/tests/src/lib.rs (4)
crates/da/src/aws/full_node.rs (1)
  • new (77-101)
crates/da/src/celestia/full_node.rs (1)
  • new (162-197)
crates/da/src/memory.rs (1)
  • new (60-84)
crates/da/src/factory.rs (2)
  • create_full_node_da_layer (211-274)
  • create_light_client_da_layer (109-132)
crates/da/src/aws/full_node.rs (4)
crates/da/src/aws/light_client.rs (5)
  • new (43-56)
  • event_channel (130-132)
  • event_publisher (59-61)
  • start (115-128)
  • get_finalized_epochs (134-140)
crates/da/src/celestia/full_node.rs (9)
  • new (162-197)
  • event_channel (314-316)
  • start (249-290)
  • get_finalized_epochs (292-312)
  • get_latest_height (325-327)
  • get_transactions (355-377)
  • submit_finalized_epoch (329-353)
  • submit_transactions (379-416)
  • subscribe_to_heights (321-323)
crates/da/src/memory.rs (11)
  • new (60-84)
  • event_channel (205-207)
  • produce_blocks (113-152)
  • start (163-175)
  • start (178-190)
  • get_finalized_epochs (192-203)
  • get_latest_height (217-219)
  • get_transactions (233-239)
  • submit_finalized_epoch (221-231)
  • submit_transactions (241-246)
  • subscribe_to_heights (213-215)
crates/da/src/aws/client.rs (2)
  • new_from_full_da_config (112-155)
  • submit_metadata (565-568)
crates/da/src/aws/light_client.rs (1)
crates/da/src/aws/client.rs (2)
  • new_from_light_da_config (72-110)
  • fetch_height (158-161)
crates/da/src/memory.rs (4)
crates/da/src/celestia/full_node.rs (2)
  • new (162-197)
  • start (249-290)
crates/da/src/aws/full_node.rs (2)
  • new (77-101)
  • start (160-179)
crates/da/src/aws/light_client.rs (3)
  • new (43-56)
  • event_publisher (59-61)
  • start (115-128)
crates/da/src/lib.rs (4)
  • new (188-190)
  • height (214-214)
  • height (255-257)
  • start (388-388)
crates/node_types/prover/src/prover/tests/mod.rs (2)
crates/da/src/memory.rs (2)
  • new (60-84)
  • new_with_epoch_delay (86-111)
crates/node_types/prover/src/prover/mod.rs (1)
  • new (145-153)
crates/da/src/celestia/full_node.rs (6)
crates/da/src/aws/full_node.rs (2)
  • new (77-101)
  • start (160-179)
crates/da/src/aws/light_client.rs (2)
  • new (43-56)
  • start (115-128)
crates/da/src/memory.rs (3)
  • new (60-84)
  • start (163-175)
  • start (178-190)
crates/da/src/lib.rs (4)
  • new (188-190)
  • start (388-388)
  • height (214-214)
  • height (255-257)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/node_types/prover/src/syncer.rs (2)
  • new (30-57)
  • start (63-66)
crates/da/src/factory.rs (7)
crates/da/src/aws/full_node.rs (1)
  • new (77-101)
crates/da/src/aws/light_client.rs (1)
  • new (43-56)
crates/da/src/celestia/full_node.rs (2)
  • new (162-197)
  • default (120-129)
crates/da/src/memory.rs (2)
  • new (60-84)
  • default (54-56)
crates/node_types/uniffi-lightclient/src/lib.rs (1)
  • new (36-73)
crates/da/src/celestia/light_client.rs (2)
  • new (357-366)
  • default (214-235)
crates/node_types/wasm-lightclient/src/worker.rs (1)
  • new (52-88)
crates/cli/src/main.rs (1)
crates/da/src/factory.rs (2)
  • create_light_client_da_layer (109-132)
  • create_full_node_da_layer (211-274)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: coverage
  • GitHub Check: build-and-push-image
  • GitHub Check: unused dependencies
  • GitHub Check: unit-test
  • GitHub Check: integration-test
🔇 Additional comments (8)
crates/node_types/uniffi-lightclient/src/lib.rs (2)

61-61: Threading the same token into create_light_client is spot on.

This ensures DA and LC share a single cancellation domain.


49-57: Export a UniFFI cancel/stop handle — keep a CancellationToken clone on LightClient

CancellationToken is moved into create_light_client; retain a clone on the LightClient and export a UniFFI cancel() so mobile can terminate the client without relying on Drop.

         })?;
 
-        let cancellation_token = CancellationToken::new();
+        let cancellation_token = CancellationToken::new();
+        // Keep a handle for UniFFI-side shutdown.
+        let ffi_token = cancellation_token.clone();
         let da = create_light_client_da_layer(&config.da, cancellation_token.clone())
             .await
             .map_err(|e| {
                 LightClientError::initialization_error(format!(
                     "Failed to create light client DA: {}",
                     e
                 ))
             })?;
 pub struct LightClient {
     inner: Arc<CoreLightClient>,
     event_subscriber: Mutex<EventSubscriber>,
+    cancellation_token: CancellationToken,
 }
         Ok(Self {
             inner: Arc::new(light_client),
             event_subscriber: Mutex::new(event_sub),
+            cancellation_token: ffi_token,
         })
#[uniffi::export]
pub fn cancel(&self) {
    self.cancellation_token.cancel();
}

Verify that core tasks observe CancellationToken::cancel() (or that Drop triggers shutdown) before merging.

crates/events/Cargo.toml (1)

12-12: LGTM: dependency addition is fine.

tokio-util at workspace scope is consistent with CancellationToken usage across the codebase.

crates/da/src/memory.rs (1)

162-175: Good: start() now spawns once per instance per target.

Atomic CAS + per-target spawn looks correct.

Also applies to: 177-190

crates/node_types/prover/src/prover/tests/mod.rs (1)

33-36: LGTM: tests now thread a shared CancellationToken.

Clean updates; shutdown paths in long-running tests will be more reliable.

Also applies to: 48-49, 298-314, 372-377, 388-395, 514-516, 528-529

crates/tests/src/lib.rs (1)

80-85: LGTM: shared cancellation across DA init.

Keeps both light/full DA layers aligned for shutdown in tests.

crates/da/src/factory.rs (1)

211-273: API change: require CancellationToken for full node DA — retry loop looks good.

Retry/backoff and error mapping look fine; just ensure all call sites pass the token.

(Use the script above.)

crates/da/src/aws/full_node.rs (1)

176-179: Start behavior LGTM (idempotent and spawns producer).

Please confirm that cancellation_token is wired to a process-wide shutdown path so the spawned task exits reliably.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (7)
crates/da/src/factory.rs (4)

120-125: Do not log AWS configs; credentials can leak.

{:?} on AwsLightClientDAConfig will include credentials. Replace with a redacted/summarized log.

-            info!("Using AWS config: {:?}", aws_config);
+            info!(
+                "Using AWS light client DA (region={}, endpoint={:?}, epochs_bucket={}, metadata_bucket={})",
+                aws_config.region,
+                aws_config.endpoint,
+                aws_config.epochs_bucket,
+                aws_config.metadata_bucket,
+            );

242-267: Do not log AWS configs; credentials can leak.

Same issue for the full node path. Log a summary without secrets.

-            info!("Using AWS config: {:?}", aws_config);
+            info!(
+                "Using AWS full node DA (region={}, endpoint={:?}, epochs_bucket={}, tx_bucket={}, retention_days={})",
+                aws_config.light_client.region,
+                aws_config.light_client.endpoint,
+                aws_config.light_client.epochs_bucket,
+                aws_config.transactions_bucket,
+                aws_config.retention_days,
+            );

421-421: Fix tests: pass CancellationToken to DA factory fns.

These calls will not compile after the signature change.

Apply this diff:

-        let result = create_light_client_da_layer(&config).await;
+        let result = create_light_client_da_layer(&config, CancellationToken::new()).await;

469-469: Fix tests: pass CancellationToken to DA factory fns.

These calls will not compile after the signature change.

Apply this diff:

-        let result = create_full_node_da_layer(&config).await;
+        let result = create_full_node_da_layer(&config, CancellationToken::new()).await;
crates/da/src/aws/full_node.rs (2)

121-150: Fix off‑by‑one: broadcast/publish/metadata should use the incremented height (align with memory DA).

Currently you broadcast completed_height (pre‑increment), while current_height has already been incremented, causing inconsistency across subscribers and get_latest_height(). Use the incremented height everywhere.

-                // Take write lock to atomically increment height and prevent submissions
-                let mut height_guard = current_height.write().await;
-                let completed_height = *height_guard;
-                *height_guard += 1;
-                drop(height_guard);
-
-                // Reset transaction offset for new height
-                transaction_offset.store(0, Ordering::Relaxed);
-
-                // Broadcast height update
-                let _ = height_update_tx.send(completed_height);
-
-                // Publish event
-                event_publisher.send(PrismEvent::UpdateDAHeight {
-                    height: completed_height,
-                });
-
-                // Update metadata
-                let metadata = AwsDaMetaInfo {
-                    current_height: completed_height,
-                };
+                // Take write lock to atomically increment height and notify using the new height
+                let mut height_guard = current_height.write().await;
+                *height_guard += 1;
+                let new_height = *height_guard;
+                drop(height_guard);
+
+                // Reset transaction offset for new height
+                transaction_offset.store(0, Ordering::Relaxed);
+
+                // Broadcast height update
+                let _ = height_update_tx.send(new_height);
+
+                // Publish event
+                event_publisher.send(PrismEvent::UpdateDAHeight { height: new_height });
+
+                // Update metadata
+                let metadata = AwsDaMetaInfo { current_height: new_height };
-                info!("Completed block {}", completed_height);
+                info!("Completed block {}", new_height);

204-215: Don't hold the read lock across .await; also return the inclusion height (height + 1) to match memory DA.

Holding the read lock during the async upload blocks height progression. Drop it before awaiting. Memory DA returns latest + 1; align for cross‑backend consistency.

-        let height_guard = self.current_height.read().await;
-        let current_height = *height_guard;
-
-        self.client.submit_finalized_epoch(epoch, current_height).await?;
+        let height_guard = self.current_height.read().await;
+        let current_height = *height_guard;
+        drop(height_guard);
+
+        self.client
+            .submit_finalized_epoch(epoch, current_height)
+            .await?;
-        Ok(current_height)
+        Ok(current_height + 1)
crates/da/src/memory.rs (1)

46-51: Idempotent start + cooperative cancellation: resolved.

The started flag and CancellationToken integration address duplicate producer loops and allow clean shutdown. This also resolves the prior “make start() idempotent” feedback.

Also applies to: 117-121, 162-175, 177-190

🧹 Nitpick comments (8)
crates/da/src/aws/full_node.rs (1)

217-232: Return height + 1 to match memory DA inclusion height semantics.

The lock is properly dropped before the await (good), and the transaction offset is correctly advanced. However, the return value should be the inclusion height.

-        Ok(height)
+        Ok(height + 1)
crates/events/Cargo.toml (1)

12-12: Remove unused tokio-util from prism-events Cargo.toml

No occurrences of tokio-util were found in crates/events/src (lib.rs, utils.rs). Remove tokio-util = { workspace = true } from crates/events/Cargo.toml to avoid unnecessary bloat.

crates/da/src/memory.rs (6)

53-57: Expose a way to cancel defaults/instances created here.

Default::default() creates an internal token that callers can’t cancel; spawned tasks may outlive tests. Expose the token or add a shutdown method.

Apply this diff to add both a getter and a shutdown:

@@
 impl InMemoryDataAvailabilityLayer {
@@
     pub fn subscribe_blocks(&self) -> broadcast::Receiver<Block> {
         self.block_update_tx.subscribe()
     }
+
+    /// Returns a clone of the cancellation token for external shutdown orchestration.
+    pub fn cancellation_token(&self) -> CancellationToken {
+        self.cancellation_token.clone()
+    }
+
+    /// Signals the background producer to stop.
+    pub fn shutdown(&self) {
+        self.cancellation_token.cancel();
+    }
 }

Also applies to: 59-64, 78-80, 86-93, 105-107


164-168: Log when start() is a no‑op for consistency with other DA layers.

AWS layers log this; mirror it here for parity.

-        if self.started.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire).is_err()
-        {
-            return Ok(());
-        }
+        if self.started.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire).is_err() {
+            debug!("InMemory DA already started; start() is a no-op");
+            return Ok(());
+        }

Repeat the same change in the wasm32 block.

Also applies to: 180-183


123-149: Reduce lock hold time in the hot path.

Hold the blocks write lock only for push; take pending items and bump height under their own locks. This lowers contention.

-                let mut blocks = self.blocks.write().await;
-                let mut pending_transactions = self.pending_transactions.write().await;
-                let mut pending_epochs = self.pending_epochs.write().await;
-                let mut latest_height = self.latest_height.write().await;
-
-                *latest_height += 1;
-                let new_block = Block {
-                    height: *latest_height,
-                    transactions: std::mem::take(&mut *pending_transactions),
-                    epochs: std::mem::take(&mut *pending_epochs),
-                };
+                // Take pending items without holding the blocks lock
+                let transactions = {
+                    let mut pending_transactions = self.pending_transactions.write().await;
+                    std::mem::take(&mut *pending_transactions)
+                };
+                let epochs = {
+                    let mut pending_epochs = self.pending_epochs.write().await;
+                    std::mem::take(&mut *pending_epochs)
+                };
+                // Increment height
+                let height = {
+                    let mut latest_height = self.latest_height.write().await;
+                    *latest_height += 1;
+                    *latest_height
+                };
+                let new_block = Block { height, transactions, epochs };
                 debug!(
                     "new block produced at height {} with {} transactions",
                     new_block.height,
                     new_block.transactions.len(),
                 );
-                blocks.push(new_block.clone());
+                {
+                    let mut blocks = self.blocks.write().await;
+                    blocks.push(new_block.clone());
+                }
 
                 // Notify subscribers of the new height and block
-                let _ = self.height_update_tx.send(*latest_height);
+                let _ = self.height_update_tx.send(height);
                 let _ = self.block_update_tx.send(new_block);
 
                 // Publish UpdateDAHeight event
-                event_publisher.send(PrismEvent::UpdateDAHeight {
-                    height: *latest_height,
-                });
+                event_publisher.send(PrismEvent::UpdateDAHeight { height });

64-66: Avoid magic numbers for channel capacity.

Centralize channel capacity for easier tuning.

@@
-const IN_MEMORY_DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(15);
+const IN_MEMORY_DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(15);
+const BROADCAST_CAPACITY: usize = 100;
@@
-        let (height_tx, height_rx) = broadcast::channel(100);
-        let (block_tx, block_rx) = broadcast::channel(100);
+        let (height_tx, height_rx) = broadcast::channel(BROADCAST_CAPACITY);
+        let (block_tx, block_rx) = broadcast::channel(BROADCAST_CAPACITY);
@@
-        let (height_tx, height_rx) = broadcast::channel(100);
-        let (block_tx, block_rx) = broadcast::channel(100);
+        let (height_tx, height_rx) = broadcast::channel(BROADCAST_CAPACITY);
+        let (block_tx, block_rx) = broadcast::channel(BROADCAST_CAPACITY);

Also applies to: 91-93, 22-23


193-203: Height 0 semantics: avoid indexing the first block for height=0.

With saturating_sub(1), passing height=0 returns block 0, which is likely unintended. Guard early.

     async fn get_finalized_epochs(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
+        if height == 0 {
+            return Ok(vec![]);
+        }
         let blocks = self.blocks.read().await;
-        match blocks.get(height.saturating_sub(1) as usize) {
+        match blocks.get((height - 1) as usize) {
             Some(block) => Ok(block
@@
     async fn get_transactions(&self, height: u64) -> Result<Vec<Transaction>> {
+        if height == 0 {
+            return Ok(vec![]);
+        }
         let blocks = self.blocks.read().await;
-        match blocks.get(height.saturating_sub(1) as usize) {
+        match blocks.get((height - 1) as usize) {
             Some(block) => Ok(block.transactions.clone()),
             None => Ok(vec![]),
         }
     }

Also applies to: 233-239


38-39: Optional: broadcast Arc to avoid cloning large blocks.

If blocks may be large, switch to broadcast::Sender<Arc<Block>> and publish Arc::new(new_block) to cut copying.

Also applies to: 154-156, 139-144

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 438a75c and 4e95a9e.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • CLAUDE.md (1 hunks)
  • crates/cli/src/main.rs (3 hunks)
  • crates/da/Cargo.toml (2 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
  • crates/da/src/aws/light_client.rs (1 hunks)
  • crates/da/src/celestia/full_node.rs (4 hunks)
  • crates/da/src/factory.rs (9 hunks)
  • crates/da/src/memory.rs (7 hunks)
  • crates/events/Cargo.toml (1 hunks)
  • crates/node_types/prover/src/prover/tests/mod.rs (7 hunks)
  • crates/node_types/uniffi-lightclient/src/lib.rs (1 hunks)
  • crates/tests/src/lib.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • CLAUDE.md
  • crates/tests/src/lib.rs
  • crates/da/src/aws/light_client.rs
  • crates/da/Cargo.toml
  • crates/da/src/celestia/full_node.rs
  • crates/node_types/prover/src/prover/tests/mod.rs
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/da/src/aws/full_node.rs
  • crates/da/src/factory.rs
  • crates/da/src/memory.rs
  • crates/cli/src/main.rs
  • crates/node_types/uniffi-lightclient/src/lib.rs
crates/**/src/lib.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Add comments primarily for public APIs exposed at the crate level

Files:

  • crates/node_types/uniffi-lightclient/src/lib.rs
crates/**/src/@(lib|mod).rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use longer examples only in module root files (mod.rs or lib.rs)

Files:

  • crates/node_types/uniffi-lightclient/src/lib.rs
🧬 Code graph analysis (5)
crates/da/src/aws/full_node.rs (4)
crates/da/src/aws/light_client.rs (5)
  • new (43-56)
  • event_channel (130-132)
  • event_publisher (59-61)
  • start (115-128)
  • get_finalized_epochs (134-140)
crates/da/src/celestia/full_node.rs (6)
  • new (162-197)
  • event_channel (314-316)
  • start (249-290)
  • get_finalized_epochs (292-312)
  • get_transactions (355-377)
  • subscribe_to_heights (321-323)
crates/da/src/memory.rs (8)
  • new (60-84)
  • event_channel (205-207)
  • produce_blocks (113-152)
  • start (163-175)
  • start (178-190)
  • get_finalized_epochs (192-203)
  • get_transactions (233-239)
  • subscribe_to_heights (213-215)
crates/da/src/aws/client.rs (2)
  • new_from_full_da_config (112-155)
  • submit_metadata (565-568)
crates/da/src/factory.rs (6)
crates/da/src/aws/full_node.rs (1)
  • new (77-101)
crates/da/src/aws/light_client.rs (1)
  • new (43-56)
crates/da/src/celestia/full_node.rs (2)
  • new (162-197)
  • default (120-129)
crates/da/src/memory.rs (2)
  • new (60-84)
  • default (54-56)
crates/node_types/uniffi-lightclient/src/lib.rs (1)
  • new (36-73)
crates/node_types/wasm-lightclient/src/worker.rs (1)
  • new (52-88)
crates/da/src/memory.rs (6)
crates/da/src/celestia/full_node.rs (3)
  • default (120-129)
  • new (162-197)
  • start (249-290)
crates/da/src/aws/full_node.rs (2)
  • new (77-101)
  • start (160-179)
crates/da/src/aws/light_client.rs (3)
  • new (43-56)
  • event_publisher (59-61)
  • start (115-128)
crates/da/src/lib.rs (4)
  • new (188-190)
  • height (214-214)
  • height (255-257)
  • start (388-388)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/node_types/wasm-lightclient/src/worker.rs (1)
  • new (52-88)
crates/cli/src/main.rs (1)
crates/da/src/factory.rs (2)
  • create_light_client_da_layer (109-132)
  • create_full_node_da_layer (211-274)
crates/node_types/uniffi-lightclient/src/lib.rs (8)
crates/da/src/aws/full_node.rs (1)
  • new (77-101)
crates/da/src/aws/light_client.rs (1)
  • new (43-56)
crates/da/src/celestia/full_node.rs (1)
  • new (162-197)
crates/da/src/memory.rs (1)
  • new (60-84)
crates/node_types/lightclient/src/lightclient.rs (1)
  • new (109-133)
crates/node_types/wasm-lightclient/src/worker.rs (1)
  • new (52-88)
crates/da/src/factory.rs (1)
  • create_light_client_da_layer (109-132)
crates/node_types/lightclient/src/factory.rs (1)
  • create_light_client (76-89)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build-and-push-image
  • GitHub Check: coverage
  • GitHub Check: integration-test
  • GitHub Check: unit-test
  • GitHub Check: unused dependencies
🔇 Additional comments (16)
crates/node_types/uniffi-lightclient/src/lib.rs (2)

49-57: LGTM! Proper cancellation token propagation.

The change correctly creates a single cancellation token and propagates it to both the DA layer and light client creation, ensuring coordinated shutdown across components.


61-61: LGTM! Consistent token usage.

Using the same cancellation token for light client creation ensures all components share the same shutdown signal.

crates/da/src/factory.rs (7)

109-112: LGTM! Proper cancellation token parameter added.

The function signature correctly adds the cancellation_token parameter to support graceful shutdown across all DA backends.


127-129: LGTM! Proper cancellation token propagation to InMemory DA.

The cancellation token is correctly passed to the InMemoryDataAvailabilityLayer constructor.


210-213: LGTM! Proper cancellation token parameter added to full node factory.

The function signature correctly adds the cancellation_token parameter for full node DA layers.


220-221: LGTM! Proper cancellation token propagation to Celestia full node.

The cancellation token is correctly cloned and passed to CelestiaConnection::new.


269-271: LGTM! Proper cancellation token propagation to InMemory full node.

The cancellation token is correctly passed to the InMemoryDataAvailabilityLayer constructor.


339-339: LGTM! Test correctly updated with cancellation token.

Test properly passes a CancellationToken to the factory function.


397-397: LGTM! Test correctly updated with cancellation token.

Test properly passes a CancellationToken to the factory function.

crates/cli/src/main.rs (3)

64-64: LGTM! Proper cancellation token propagation for light client DA.

The cancellation token is correctly cloned and passed to create_light_client_da_layer.


83-83: LGTM! Proper cancellation token propagation for prover DA.

The cancellation token is correctly cloned and passed to create_full_node_da_layer.


101-101: LGTM! Proper cancellation token propagation for full node DA.

The cancellation token is correctly cloned and passed to create_full_node_da_layer.

crates/da/src/aws/full_node.rs (3)

77-101: LGTM! Well-structured constructor with proper initialization.

The constructor properly initializes all necessary components including the AWS client, channels, and atomic counters. The debug logging appropriately logs only non-sensitive configuration details.


160-179: LGTM! Proper start implementation with idempotency check.

The start method correctly implements idempotent startup using atomic compare-and-exchange and properly logs the current height before starting block production.


181-187: LGTM! Proper epoch retrieval implementation.

The get_finalized_epochs method correctly delegates to the AWS client and properly transforms the results to VerifiableEpoch trait objects.

crates/da/src/memory.rs (1)

59-64: Audit complete — constructor call sites already pass a CancellationToken

Found and verified usages at:

  • crates/da/src/factory.rs:128, crates/da/src/factory.rs:270
  • crates/node_types/prover/src/prover/tests/mod.rs:35, 300, 516
  • new_with_epoch_delay at crates/node_types/prover/src/prover/tests/mod.rs:373–375

Each invocation supplies a CancellationToken (tests use .clone()).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
crates/da/src/aws/full_node.rs (3)

26-49: Doc claims exceed guarantees made by this module

"Cross-Region Replication" and "Automatic Retry" are stated, but this file itself does not enforce them. If these are entirely handled by AwsDataAvailabilityClient or S3 policies, clarify that; otherwise, trim the claims.


152-154: Prefer interval over chained sleep to avoid drift

tokio::time::interval(self.block_time) keeps cadence stable across iterations and simplifies cancellation.


223-227: u64 overflow edge case (extreme) on offset math

If offsets approach u64::MAX, fetch_add can wrap. Unlikely, but add a checked guard to fail fast.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e95a9e and 0fac008.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • crates/da/Cargo.toml (3 hunks)
  • crates/da/src/aws/full_node.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/da/Cargo.toml
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Follow Rust Coding Standards for all Rust code
Use rustfmt with project settings (merge_imports=true, imports_granularity="Crate", max_width=100)
Error handling: prefer Result types with descriptive error messages
Use the ? operator and proper error types for propagation
Naming: snake_case for functions/variables and CamelCase for types
Only add comments for private APIs when behavior is complex or non-obvious
Keep public method docstrings short and focused (brief summary, brief params/return)

Files:

  • crates/da/src/aws/full_node.rs
🧬 Code graph analysis (1)
crates/da/src/aws/full_node.rs (3)
crates/da/src/aws/light_client.rs (5)
  • new (43-56)
  • event_channel (130-132)
  • event_publisher (59-61)
  • start (115-128)
  • get_finalized_epochs (134-140)
crates/da/src/memory.rs (6)
  • new (60-84)
  • event_channel (205-207)
  • produce_blocks (113-152)
  • start (163-175)
  • start (178-190)
  • get_finalized_epochs (192-203)
crates/da/src/aws/client.rs (2)
  • new_from_full_da_config (112-155)
  • submit_metadata (565-568)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: build-and-push-image
  • GitHub Check: clippy
  • GitHub Check: unit-test
  • GitHub Check: coverage
  • GitHub Check: unused dependencies
  • GitHub Check: integration-test
🔇 Additional comments (7)
crates/da/src/aws/full_node.rs (7)

76-100: Confirm initial height semantics (starts at 1)

Memory DA starts from 0 and broadcasts the incremented height. Starting AWS at 1 is fine if you always broadcast the incremented height (see next comment). Please confirm cross‑backend expectations.


159-178: Start logic LGTM

Atomic start gate + spawning producer is correct.


180-186: Epoch fetch mapping LGTM

Error mapping and boxing to VerifiableEpoch look good.


195-201: Read paths LGTM

get_latest_height/get_transactions are straightforward and consistent.


233-236: Heights subscription LGTM

Broadcast subscribe is correct; ignoring send errors earlier is fine when no subscribers.


111-151: Off‑by‑one: broadcast/publish/metadata/log should use the incremented height

You compute completed_height before increment, then notify with it while current_height has already advanced. Use the post‑increment value everywhere to align with Memory DA.

Apply:

-                // Take write lock to atomically increment height and prevent submissions
-                let mut height_guard = current_height.write().await;
-                let completed_height = *height_guard;
-                *height_guard += 1;
-                drop(height_guard);
+                // Take write lock to atomically increment height and notify using the new height
+                let mut height_guard = current_height.write().await;
+                *height_guard += 1;
+                let new_height = *height_guard;
+                drop(height_guard);
@@
-                // Broadcast height update
-                let _ = height_update_tx.send(completed_height);
+                // Broadcast height update
+                let _ = height_update_tx.send(new_height);
@@
-                event_publisher.send(PrismEvent::UpdateDAHeight {
-                    height: completed_height,
-                });
+                event_publisher.send(PrismEvent::UpdateDAHeight { height: new_height });
@@
-                let metadata = AwsDaMetaInfo {
-                    current_height: completed_height,
-                };
+                let metadata = AwsDaMetaInfo { current_height: new_height };
@@
-                        info!("Completed block {}", completed_height);
+                        info!("Completed block {}", new_height);

203-214: Release read lock before await and return inclusion height

Holding the read lock across the await can stall block production; also return height + 1 to match Memory DA’s “inclusion height”.

     async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64> {
-        // Take read lock to get consistent height and prevent increments during submission
-        let height_guard = self.current_height.read().await;
-        let current_height = *height_guard;
-
-        self.client.submit_finalized_epoch(epoch, current_height).await?;
+        // Snapshot height, then release lock before I/O
+        let height_guard = self.current_height.read().await;
+        let current_height = *height_guard;
+        drop(height_guard);
+
+        self.client
+            .submit_finalized_epoch(epoch, current_height)
+            .await?;
@@
-        // Return the height where the epoch was published
-        Ok(current_height)
+        // Return inclusion height
+        Ok(current_height + 1)
     }

Comment on lines 216 to 231
async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
// Take read lock to get consistent height and prevent increments during submission
let height_guard = self.current_height.read().await;
let height = *height_guard;
drop(height_guard);
let transaction_offset = self.transaction_offset.load(Ordering::Relaxed);

let count = transactions.len() as u64;
self.client.submit_transactions(transactions, transaction_offset, height).await?;
// Advance offset on success
self.transaction_offset.fetch_add(count, Ordering::Relaxed);

info!("Transactions submitted at height {}", height);

Ok(height)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Race + collision risk on transaction_offset; reserve before upload, then return inclusion height

Current flow can double‑use the same offset across concurrent submissions and misalign offset with height when a rollover happens during upload. Reserve a unique offset atomically under the height read lock, then upload with that reserved start. Finally, return height + 1.

-    async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
-        // Take read lock to get consistent height and prevent increments during submission
-        let height_guard = self.current_height.read().await;
-        let height = *height_guard;
-        drop(height_guard);
-        let transaction_offset = self.transaction_offset.load(Ordering::Relaxed);
-
-        let count = transactions.len() as u64;
-        self.client.submit_transactions(transactions, transaction_offset, height).await?;
-        // Advance offset on success
-        self.transaction_offset.fetch_add(count, Ordering::Relaxed);
-
-        info!("Transactions submitted at height {}", height);
-
-        Ok(height)
-    }
+    async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
+        let count = transactions.len() as u64;
+
+        // Reserve a unique offset for the current height under read lock
+        let height_guard = self.current_height.read().await;
+        let height = *height_guard;
+        let start_offset = self.transaction_offset.fetch_add(count, Ordering::AcqRel);
+        drop(height_guard);
+
+        // Perform upload with reserved offset
+        self.client
+            .submit_transactions(transactions, start_offset, height)
+            .await?;
+
+        info!("Transactions submitted at height {}", height);
+
+        // Return inclusion height
+        Ok(height + 1)
+    }

Note: This may leave gaps if an upload fails, which is acceptable for uniqueness and WORM safety. If gaps are undesirable, we can add a compensating metric.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
// Take read lock to get consistent height and prevent increments during submission
let height_guard = self.current_height.read().await;
let height = *height_guard;
drop(height_guard);
let transaction_offset = self.transaction_offset.load(Ordering::Relaxed);
let count = transactions.len() as u64;
self.client.submit_transactions(transactions, transaction_offset, height).await?;
// Advance offset on success
self.transaction_offset.fetch_add(count, Ordering::Relaxed);
info!("Transactions submitted at height {}", height);
Ok(height)
}
async fn submit_transactions(&self, transactions: Vec<Transaction>) -> Result<u64> {
let count = transactions.len() as u64;
// Reserve a unique offset for the current height under read lock
let height_guard = self.current_height.read().await;
let height = *height_guard;
let start_offset = self.transaction_offset.fetch_add(count, Ordering::AcqRel);
drop(height_guard);
// Perform upload with reserved offset
self.client
.submit_transactions(transactions, start_offset, height)
.await?;
info!("Transactions submitted at height {}", height);
// Return inclusion height
Ok(height + 1)
}
🤖 Prompt for AI Agents
In crates/da/src/aws/full_node.rs around lines 216-231, the current logic reads
height, then reads transaction_offset separately which allows concurrent
submissions to reserve the same offset and misalign with height on rollovers; to
fix, while holding the current_height read lock atomically reserve a unique
start offset by fetching and adding the count to transaction_offset (or
otherwise atomically increment by count) so the reserved range is tied to the
observed height, release the lock, upload using that reserved start offset, on
success advance the offset only if not already advanced by the reservation step
(or simply use the reservation as the advancement), and return height + 1 as the
inclusion height; accept that failed uploads may leave gaps (WORM-safe) and
optionally emit a metric for gaps if needed.

///
/// See the crate-level documentation for usage examples and integration patterns.
#[cfg(not(target_arch = "wasm32"))]
pub async fn create_full_node_da_layer(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have moved almost all? connection logic out of the constructor, we should't be retrying the creation of that connection any more.
If we want to have retries here, we could move retries to the start method, or incorporate it into the DA layer implementations itself, like with AWS.

Copy link
Contributor

@distractedm1nd distractedm1nd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice PR sir

Prism is a decentralized key transparency protocol, first inspired by the paper [Tzialla et. al](https://eprint.iacr.org/2021/1263.pdf), leveraging zkSNARKs and DA solutions to enable trust-minimized verification of E2EE services via WASM/Native light clients. This eliminates the possibility for hidden backdoors in E2EE services through a user-verifiable key management system. It uses transparency dictionaries under the hood, offering a generalized solution for managing a label-value map in environments where the service maintaining the map is not completely trusted.

Prism provides the first key-transparency solution to enable automatic verification of the service provider. This is achieved by providing constant size succinct proofs to WASM/Native light clients over a [data availability layer](https://arxiv.org/abs/1809.09044). Alternative DA solutions are in planning, allowing for a less trust-minimized client where a p2p node cannot be embedded.
Prism provides the first key-transparency solution to enable automatic verification of the service provider. This is achieved by providing constant size succinct proofs to WASM/Native light clients over a [data availability layer](https://arxiv.org/abs/1809.09044). Multiple DA backends are supported including Celestia for decentralized availability.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Prism provides the first key-transparency solution to enable automatic verification of the service provider. This is achieved by providing constant size succinct proofs to WASM/Native light clients over a [data availability layer](https://arxiv.org/abs/1809.09044). Multiple DA backends are supported including Celestia for decentralized availability.
Prism provides the first key-transparency solution to enable automatic verification of the service provider. This is achieved by providing constant size succinct proofs to WASM/Native light clients over a [data availability layer](https://arxiv.org/abs/1809.09044). Multiple DA backends are supported including Celestia for decentralized retrieval.


#[arg(long)]
/// Whether to allow the verification of mock proofs
/// Default: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't default false shown automatically anyways on clap help output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if we use clap's annotations for that. But then the CLI args will never return None, because clap will put in a value when the user doesn't. But in order to use conf file/ENV/default when the user doesn't supply a cli arg for something, we can't use that clap feature and have to document those defaults ourselves.

metadata_bucket: config.metadata_bucket,
key_prefix: config.key_prefix,
retention_days: 0,
enable_legal_holds: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is default here false? and what does retention_days do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For legal hold: No specific reason. We should change it to true.
Retention days: Quoting something from the AWS doc

Legal holds

With Object Lock, you can also place a legal hold on an object version. Like a retention period, a legal hold prevents an object version from being overwritten or deleted. However, a legal hold doesn't have an associated fixed amount of time and remains in effect until removed. Legal holds can be freely placed and removed by any user who has the s3:PutObjectLegalHold permission.

Legal holds are independent from retention periods. Placing a legal hold on an object version doesn't affect the retention mode or retention period for that object version.

For example, suppose that you place a legal hold on an object version and that object version is also protected by a retention period. If the retention period expires, the object doesn't lose its WORM protection. Rather, the legal hold continues to protect the object until an authorized user explicitly removes the legal hold. Similarly, if you remove a legal hold while an object version has a retention period in effect, the object version remains protected until the retention period expires.


/// Fetches and parses epoch data from S3.
pub async fn fetch_epochs(&self, height: u64) -> Result<Vec<FinalizedEpoch>, AwsDaClientError> {
let padded_height = format!("{:012}", height);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would doc the decision here for the format

let transaction_offset = self.transaction_offset.clone();
let cancellation_token = self.cancellation_token.clone();

let _handle = tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't throw away the handle, if we are doing this in the other concurrent components we shouldn't be. Fixing might/probably requires moving the select statement elsewhere (higher up) and having some sort of main loop or listener for errored/panicked handles. If this method panics for example, the application will not quit as intended! Nor will it cascade into a graceful shutdown

Comment on lines +190 to +199
let height_guard = self.current_height.read().await;
let current_height = *height_guard;

self.client.submit_finalized_epoch(epoch, current_height).await?;

info!("Finalized epoch submitted at height {}", current_height);

// Return the height where the epoch was published
Ok(current_height)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

info!("AWS height monitoring task cancelled");
break;
}
_ = tokio::time::sleep(poll_interval) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what timers/tickers are for :)

let poll_interval = self.block_time;
let cancellation_token = self.cancellation_token.clone();

let _handle = tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't throw away handle

Some(Ok(extended_header)) => {
let height = extended_header.header.height.value();
sync_target.store(height, Ordering::Relaxed);
// todo: correct error handling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you address this? I know its just moved code but I think its an easy fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be an error log. I am relatively sure the only case for an error here is if the channel is full or already closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add AWS S3 WORM as DA layer

3 participants