From b2fce2bea41c80c86c5c1c38f48881d6a97e972c Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Fri, 22 May 2026 12:18:40 +0200 Subject: [PATCH 1/4] Improve the deadline api --- crates/core/src/deadline/mod.rs | 91 +++++++------------ crates/core/src/dutydb/memory.rs | 55 ++++++----- crates/core/src/parsigdb/memory.rs | 20 ++-- .../core/src/parsigdb/memory_internal_test.rs | 19 ++-- crates/dkg/src/exchanger.rs | 4 - 5 files changed, 78 insertions(+), 111 deletions(-) diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index 45be4055..e6ffac19 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -8,25 +8,22 @@ //! //! ```no_run //! use pluto_core::{ -//! deadline::{DeadlinerTask, DutyDeadlineCalculator}, +//! deadline::{Deadliner, DeadlinerTask, DutyDeadlineCalculator}, //! types::{Duty, SlotNumber}, //! }; //! use pluto_eth2api::EthBeaconNodeApiClient; -//! use std::sync::Arc; //! use tokio_util::sync::CancellationToken; //! //! # async fn example(client: &EthBeaconNodeApiClient) -> anyhow::Result<()> { //! let cancel_token = CancellationToken::new(); //! let calculator = DutyDeadlineCalculator::from_client(client).await?; -//! let deadliner = DeadlinerTask::start(cancel_token, "example", calculator); +//! let (deadliner, mut rx) = DeadlinerTask::start(cancel_token, "example", calculator); //! //! let duty = Duty::new_attester_duty(SlotNumber::new(1)); //! let added = deadliner.add(duty).await; //! -//! if let Some(mut rx) = deadliner.c() { -//! while let Some(expired_duty) = rx.recv().await { -//! println!("Duty expired: {}", expired_duty); -//! } +//! while let Some(expired_duty) = rx.recv().await { +//! println!("Duty expired: {}", expired_duty); //! } //! # Ok(()) //! # } @@ -41,11 +38,7 @@ use crate::types::{Duty, DutyType, SlotNumber}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use pluto_eth2api::EthBeaconNodeApiClientError; -use std::{ - collections::HashSet, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashSet, time::Duration}; use tokio::{ sync::{mpsc, oneshot}, time::sleep, @@ -87,10 +80,9 @@ fn to_chrono_duration(duration: Duration) -> Result { /// Deadliner provides duty deadline functionality. /// -/// The `c()` method returns a channel for receiving expired duties. -/// It may only be called once and the returned channel should be used -/// by a single task. Multiple instances are required for different -/// components and use cases. +/// Producers submit duties via [`add`](Self::add). Expired duties are +/// delivered on the receiver paired with the handle at +/// [`DeadlinerTask::start`]. #[async_trait] pub trait Deadliner: Send + Sync { /// Adds a duty for deadline scheduling. @@ -104,12 +96,6 @@ pub trait Deadliner: Send + Sync { /// - The calculator reports the duty has no deadline (`Ok(None)`) /// - The calculator failed to compute the deadline (`Err(_)`) async fn add(&self, duty: Duty) -> bool; - - /// Returns the channel for receiving deadlined duties. - /// - /// This method may only be called once and returns `None` on subsequent - /// calls. The returned channel should only be used by a single task. - fn c(&self) -> Option>; } /// Internal message type for adding duties to the deadliner. @@ -118,13 +104,13 @@ struct DeadlineInput { response_tx: oneshot::Sender, } -/// Public-facing handle: the `Arc` returned by -/// [`DeadlinerTask::start`] wraps this. Holds the input channel, the -/// take-once output receiver, and the cancellation token. -struct DeadlinerHandle { +/// Public-facing handle returned (paired with the expired-duty receiver) by +/// [`DeadlinerTask::start`]. Cloning is cheap and shares the same background +/// task — share it freely across producers inside one service. +#[derive(Clone)] +pub struct DeadlinerHandle { cancel_token: CancellationToken, input_tx: mpsc::Sender, - output_rx: Mutex>>, } #[async_trait] @@ -146,18 +132,11 @@ impl Deadliner for DeadlinerHandle { // Wait for response response_rx.await.unwrap_or(false) } - - fn c(&self) -> Option> { - self.output_rx - .lock() - .ok() - .and_then(|mut guard| guard.take()) - } } /// Owned state of the background task that drives a [`DeadlinerHandle`]'s /// duty timers. Held exclusively by the spawned task — that's why it lives -/// outside the `Arc` and `run_task` can take `mut self`. +/// outside the public handle and `run_task` can take `mut self`. /// Constructed and spawned via [`DeadlinerTask::start`]. pub struct DeadlinerTask { cancel_token: CancellationToken, @@ -172,14 +151,15 @@ pub struct DeadlinerTask { } impl DeadlinerTask { - /// Builds the public-facing [`Deadliner`] handle and spawns the background - /// task that drives it. The background loop exits when `cancel_token` is - /// cancelled. + /// Spawns the background task and returns a `(handle, expired_rx)` pair. + /// The cloneable `handle` is for adding duties from any number of + /// producers; `expired_rx` is the single consumer's receiver of expired + /// duties. The background loop exits when `cancel_token` is cancelled. pub fn start( cancel_token: CancellationToken, label: impl Into, calculator: C, - ) -> Arc { + ) -> (DeadlinerHandle, mpsc::Receiver) { // Matches Charon's `outputBuffer = 10` — big enough for all duty // types expiring simultaneously while the consumer drains synchronously. const OUTPUT_BUFFER: usize = 10; @@ -204,13 +184,12 @@ impl DeadlinerTask { }; tokio::spawn(task.run_task()); - let link = DeadlinerHandle { + let handle = DeadlinerHandle { cancel_token, input_tx, - output_rx: Mutex::new(Some(output_rx)), }; - Arc::new(link) + (handle, output_rx) } /// Background task that manages duty deadlines. @@ -407,7 +386,7 @@ mod tests { /// channel. async fn add_duties( duties: Vec, - deadliner: Arc, + deadliner: DeadlinerHandle, result_tx: mpsc::Sender, ) { for duty in duties { @@ -461,9 +440,8 @@ mod tests { }; let cancel_token = CancellationToken::new(); - let deadliner = DeadlinerTask::start(cancel_token.clone(), "test", calculator); - - let mut output_rx = deadliner.c().context("output receiver already taken")?; + let (deadliner, mut output_rx) = + DeadlinerTask::start(cancel_token.clone(), "test", calculator); let (expired_tx, mut expired_rx) = mpsc::channel(100); let (non_expired_tx, mut non_expired_rx) = mpsc::channel(100); @@ -472,21 +450,15 @@ mod tests { let non_expired_len = non_expired_duties.len(); let future_duties_len = future_duties.len(); - let handler_expired = tokio::spawn(add_duties( - expired_duties, - Arc::clone(&deadliner), - expired_tx, - )); + let handler_expired = + tokio::spawn(add_duties(expired_duties, deadliner.clone(), expired_tx)); let handler_non_expired = tokio::spawn(add_duties( non_expired_duties.clone(), - Arc::clone(&deadliner), + deadliner.clone(), non_expired_tx.clone(), )); - let handler_future_duties = tokio::spawn(add_duties( - future_duties, - Arc::clone(&deadliner), - non_expired_tx, - )); + let handler_future_duties = + tokio::spawn(add_duties(future_duties, deadliner.clone(), non_expired_tx)); let (result_expired, result_non_expired, result_future_duties) = tokio::join!(handler_expired, handler_non_expired, handler_future_duties); @@ -545,9 +517,8 @@ mod tests { }; let cancel_token = CancellationToken::new(); - let deadliner = DeadlinerTask::start(cancel_token.clone(), "order-test", calculator); - - let mut output_rx = deadliner.c().context("output receiver already taken")?; + let (deadliner, mut output_rx) = + DeadlinerTask::start(cancel_token.clone(), "order-test", calculator); // TestCalculator: deadline = start_time + slot * 500ms. // Insert the later one first to make sure ordering is by deadline, diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 94dbc98d..865bcf52 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -8,7 +8,7 @@ use pluto_eth2api::{ spec::{altair, phase0}, versioned, }; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::{Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use tree_hash::TreeHash; @@ -203,11 +203,13 @@ pub struct MemDB { } impl MemDB { - /// Creates a new in-memory DutyDB. - pub fn new(deadliner: Arc, cancel: &CancellationToken) -> Self { - let deadliner_rx = deadliner.c().expect( - "Deadliner::c() returned None — the receiver was already consumed. Each MemDB must use a fresh Deadliner.", - ); + /// Creates a new in-memory DutyDB. `deadliner_rx` is the receiver paired + /// with `deadliner` (typically from `DeadlinerTask::start`). + pub fn new( + deadliner: Arc, + deadliner_rx: mpsc::Receiver, + cancel: &CancellationToken, + ) -> Self { Self { state: RwLock::new(State { attestation_duties: HashMap::new(), @@ -620,11 +622,12 @@ mod tests { async fn add(&self, _duty: Duty) -> bool { true } + } - fn c(&self) -> Option> { - let (_, rx) = channel(1); - Some(rx) - } + /// Builds a never-firing receiver for tests that don't exercise eviction. + pub(crate) fn noop_deadliner_rx() -> Receiver { + let (_, rx) = channel(1); + rx } /// Deadliner that collects duties and can flush them to a channel on @@ -632,17 +635,18 @@ mod tests { pub(crate) struct TestDeadliner { added: Mutex>, tx: Sender, - rx: Mutex>>, } impl TestDeadliner { - pub(crate) fn new() -> Arc { + /// Returns the test deadliner alongside the matching expiry receiver + /// — call `expire()` on the deadliner to drive `rx`. + pub(crate) fn new() -> (Arc, Receiver) { let (tx, rx) = channel(64); - Arc::new(Self { + let deadliner = Arc::new(Self { added: Mutex::new(Vec::new()), tx, - rx: Mutex::new(Some(rx)), - }) + }); + (deadliner, rx) } /// Send all collected duties to the expiry channel. @@ -663,18 +667,21 @@ mod tests { self.added.lock().unwrap().push(duty); true } - - fn c(&self) -> Option> { - self.rx.lock().unwrap().take() - } } fn make_db() -> MemDB { - MemDB::new(Arc::new(NoopDeadliner), &CancellationToken::new()) + MemDB::new( + Arc::new(NoopDeadliner), + noop_deadliner_rx(), + &CancellationToken::new(), + ) } - fn make_db_with_deadliner(deadliner: Arc) -> MemDB { - MemDB::new(deadliner, &CancellationToken::new()) + fn make_db_with_deadliner( + deadliner: Arc, + deadliner_rx: Receiver, + ) -> MemDB { + MemDB::new(deadliner, deadliner_rx, &CancellationToken::new()) } fn att_data(slot: u64, committee_index: u64, validator_index: u64) -> AttestationData { @@ -1104,8 +1111,8 @@ mod tests { #[tokio::test] async fn duty_expiry() { - let deadliner = TestDeadliner::new(); - let db = make_db_with_deadliner(Arc::clone(&deadliner) as Arc); + let (deadliner, deadliner_rx) = TestDeadliner::new(); + let db = make_db_with_deadliner(deadliner.clone(), deadliner_rx); const SLOT: u64 = 123; diff --git a/crates/core/src/parsigdb/memory.rs b/crates/core/src/parsigdb/memory.rs index 44ffa9f8..efe0dd32 100644 --- a/crates/core/src/parsigdb/memory.rs +++ b/crates/core/src/parsigdb/memory.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, error::Error as StdError, future::Future, pin::Pin, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::debug; use crate::{ deadline::Deadliner, @@ -300,16 +300,12 @@ impl MemDB { /// Trims expired duties from the database. /// - /// This method runs in a loop, listening for expired duties from the - /// deadliner and removing their associated data from the database. It - /// should be spawned as a background task and will run until the - /// cancellation token is triggered. - pub async fn trim(&self) { - let Some(mut deadliner_rx) = self.deadliner.c() else { - warn!("Deadliner channel is not available"); - return; - }; - + /// Runs in a loop, listening on `deadliner_rx` for expired duties and + /// removing their associated data. Should be spawned as a background task; + /// returns when the cancellation token is triggered or the receiver + /// closes. The receiver is the one paired with the [`Deadliner`] handle at + /// `DeadlinerTask::start`. + pub async fn trim(&self, mut deadliner_rx: mpsc::Receiver) { loop { tokio::select! { biased; diff --git a/crates/core/src/parsigdb/memory_internal_test.rs b/crates/core/src/parsigdb/memory_internal_test.rs index d5a932af..8eb32cfe 100644 --- a/crates/core/src/parsigdb/memory_internal_test.rs +++ b/crates/core/src/parsigdb/memory_internal_test.rs @@ -103,14 +103,15 @@ async fn memdb_threshold() { const THRESHOLD: u64 = 7; const N: usize = 10; - let deadliner = Arc::new(TestDeadliner::new()); + let (deadliner, deadliner_rx) = TestDeadliner::new(); + let deadliner = Arc::new(deadliner); let cancel = CancellationToken::new(); let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner.clone())); let trim_handle = tokio::spawn({ let db = db.clone(); async move { - db.trim().await; + db.trim(deadliner_rx).await; } }); @@ -166,17 +167,17 @@ async fn memdb_threshold() { struct TestDeadliner { added: StdMutex>, tx: mpsc::Sender, - rx: StdMutex>>, } impl TestDeadliner { - fn new() -> Self { + /// Returns the test deadliner with the matching expiry receiver. + fn new() -> (Self, mpsc::Receiver) { let (tx, rx) = mpsc::channel(32); - Self { + let deadliner = Self { added: StdMutex::new(Vec::new()), tx, - rx: StdMutex::new(Some(rx)), - } + }; + (deadliner, rx) } async fn expire(&self) -> bool { @@ -204,8 +205,4 @@ impl Deadliner for TestDeadliner { .push(duty); true } - - fn c(&self) -> Option> { - self.rx.lock().expect("test deadliner lock poisoned").take() - } } diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index 26a33beb..735bbe7d 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -143,10 +143,6 @@ impl Deadliner for NoopDeadliner { async fn add(&self, _duty: Duty) -> bool { true } - - fn c(&self) -> Option> { - None - } } impl Exchanger { From 53f0d7cb54b304a36f1bcf9084ca4fd0d9932335 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Fri, 22 May 2026 14:17:59 +0200 Subject: [PATCH 2/4] Add outcomes --- crates/core/src/deadline/mod.rs | 108 ++++++++++++------ crates/core/src/dutydb/memory.rs | 11 +- .../core/src/parsigdb/memory_internal_test.rs | 6 +- crates/dkg/src/exchanger.rs | 6 +- 4 files changed, 85 insertions(+), 46 deletions(-) diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index e6ffac19..d6793e50 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -8,7 +8,7 @@ //! //! ```no_run //! use pluto_core::{ -//! deadline::{Deadliner, DeadlinerTask, DutyDeadlineCalculator}, +//! deadline::{AddOutcome, Deadliner, DeadlinerTask, DutyDeadlineCalculator}, //! types::{Duty, SlotNumber}, //! }; //! use pluto_eth2api::EthBeaconNodeApiClient; @@ -20,7 +20,12 @@ //! let (deadliner, mut rx) = DeadlinerTask::start(cancel_token, "example", calculator); //! //! let duty = Duty::new_attester_duty(SlotNumber::new(1)); -//! let added = deadliner.add(duty).await; +//! match deadliner.add(duty).await { +//! AddOutcome::Scheduled => {} +//! AddOutcome::AlreadyExpired => eprintln!("duty already expired — skipped"), +//! AddOutcome::NoDeadline => {} +//! AddOutcome::FailedToCompute => eprintln!("deadline calculation failed"), +//! } //! //! while let Some(expired_duty) = rx.recv().await { //! println!("Duty expired: {}", expired_duty); @@ -78,6 +83,32 @@ fn to_chrono_duration(duration: Duration) -> Result { chrono::Duration::from_std(duration).map_err(|_| DeadlineError::DurationConversion) } +/// Outcome of [`Deadliner::add`]. +/// +/// Spells out the four distinct cases the previous `bool` return value +/// conflated, so callers can react specifically (e.g. drop a duty that +/// already expired vs. log a calculator error). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AddOutcome { + /// The duty was accepted and a timer is now armed for its deadline. + Scheduled, + /// The duty's deadline is already in the past — nothing scheduled. + AlreadyExpired, + /// The calculator reports this duty type has no deadline (e.g. Exit, + /// BuilderRegistration). Not an error — just not tracked. + NoDeadline, + /// The calculator returned an error while computing the deadline. + FailedToCompute, +} + +impl AddOutcome { + /// `true` only for [`AddOutcome::Scheduled`] — convenient for callers + /// that still treat "added" as a yes/no question. + pub fn is_scheduled(self) -> bool { + matches!(self, AddOutcome::Scheduled) + } +} + /// Deadliner provides duty deadline functionality. /// /// Producers submit duties via [`add`](Self::add). Expired duties are @@ -87,21 +118,16 @@ fn to_chrono_duration(duration: Duration) -> Result { pub trait Deadliner: Send + Sync { /// Adds a duty for deadline scheduling. /// - /// Returns `true` if the duty was added for future deadline scheduling. - /// This method is idempotent and returns `true` if the duty was previously - /// added and still awaits deadline scheduling. - /// - /// Returns `false` if: - /// - The duty has already expired and cannot be scheduled - /// - The calculator reports the duty has no deadline (`Ok(None)`) - /// - The calculator failed to compute the deadline (`Err(_)`) - async fn add(&self, duty: Duty) -> bool; + /// Idempotent: re-adding a duty already tracked returns + /// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of + /// each variant. + async fn add(&self, duty: Duty) -> AddOutcome; } /// Internal message type for adding duties to the deadliner. struct DeadlineInput { duty: Duty, - response_tx: oneshot::Sender, + response_tx: oneshot::Sender, } /// Public-facing handle returned (paired with the expired-duty receiver) by @@ -115,10 +141,10 @@ pub struct DeadlinerHandle { #[async_trait] impl Deadliner for DeadlinerHandle { - async fn add(&self, duty: Duty) -> bool { + async fn add(&self, duty: Duty) -> AddOutcome { // Check if shut down if self.cancel_token.is_cancelled() { - return false; + return AddOutcome::FailedToCompute; } let (response_tx, response_rx) = oneshot::channel(); @@ -126,11 +152,12 @@ impl Deadliner for DeadlinerHandle { // Send the duty to the background task if self.input_tx.send(input).await.is_err() { - return false; + return AddOutcome::FailedToCompute; } - // Wait for response - response_rx.await.unwrap_or(false) + // Wait for response — `FailedToCompute` if the task dropped the + // sender (shutdown race). + response_rx.await.unwrap_or(AddOutcome::FailedToCompute) } } @@ -274,11 +301,11 @@ impl DeadlinerTask { let duty = input.duty; match self.calculator.deadline(&duty) { Ok(Some(deadline)) => { - let expired = deadline < Utc::now(); - let _ = input.response_tx.send(!expired); - if expired { + if deadline < Utc::now() { + let _ = input.response_tx.send(AddOutcome::AlreadyExpired); return None; } + let _ = input.response_tx.send(AddOutcome::Scheduled); self.duties.insert(duty); if deadline < self.curr_deadline { self.recompute_curr(); @@ -294,12 +321,13 @@ impl DeadlinerTask { error = %err, "Failed to compute deadline for duty" ); - let _ = input.response_tx.send(false); + let _ = input.response_tx.send(AddOutcome::FailedToCompute); None } Ok(None) => { - // Drop duties that never expire - let _ = input.response_tx.send(false); + // Duty type has no deadline (Exit, BuilderRegistration) — + // not tracked. + let _ = input.response_tx.send(AddOutcome::NoDeadline); None } } @@ -340,7 +368,7 @@ impl DeadlinerTask { mod tests { use super::{msecs::Msecs, *}; use crate::types::SlotNumber; - use anyhow::{Context, Result, bail}; + use anyhow::{Context, Result, bail, ensure}; use pluto_testutil::BeaconMock; use tokio::time::timeout; @@ -387,11 +415,11 @@ mod tests { async fn add_duties( duties: Vec, deadliner: DeadlinerHandle, - result_tx: mpsc::Sender, + result_tx: mpsc::Sender, ) { for duty in duties { - let added = deadliner.add(duty).await; - let _ = result_tx.send(added).await; + let outcome = deadliner.add(duty).await; + let _ = result_tx.send(outcome).await; } } @@ -467,19 +495,25 @@ mod tests { result_future_duties?; for _ in 0..expired_len { - let result = expired_rx.recv().await.context("expected expired ack")?; - assert!(!result, "expired duties should return false"); + let outcome = expired_rx.recv().await.context("expected expired ack")?; + ensure!( + outcome == AddOutcome::AlreadyExpired, + "expired duties should report AlreadyExpired, got {outcome:?}" + ); } let added_count = non_expired_len .checked_add(future_duties_len) .context("added_count overflow")?; for _ in 0..added_count { - let result = non_expired_rx + let outcome = non_expired_rx .recv() .await .context("expected non-expired ack")?; - assert!(result, "non-expired duties should return true"); + ensure!( + outcome == AddOutcome::Scheduled, + "non-expired duties should be Scheduled, got {outcome:?}" + ); } // Collect expired duties from output channel. @@ -508,8 +542,6 @@ mod tests { /// deadliner. #[tokio::test] async fn expired_duties_arrive_in_deadline_order() -> Result<()> { - use anyhow::ensure; - let start_time = Utc::now(); let calculator = TestCalculator { start_time, @@ -527,9 +559,15 @@ mod tests { let earlier = Duty::new_attester_duty(SlotNumber::new(1)); let added_later = deadliner.add(later.clone()).await; - ensure!(added_later, "later duty should be added"); + ensure!( + added_later == AddOutcome::Scheduled, + "later duty should be Scheduled, got {added_later:?}" + ); let added_earlier = deadliner.add(earlier.clone()).await; - ensure!(added_earlier, "earlier duty should be added"); + ensure!( + added_earlier == AddOutcome::Scheduled, + "earlier duty should be Scheduled, got {added_earlier:?}" + ); let first = timeout(Duration::from_secs(5), output_rx.recv()) .await diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 865bcf52..ce0a3639 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -247,7 +247,7 @@ impl MemDB { let mut state = self.state.write().await; - if !self.deadliner.add(duty.clone()).await { + if !self.deadliner.add(duty.clone()).await.is_scheduled() { return Err(Error::ExpiredDuty); } @@ -609,6 +609,7 @@ mod tests { use super::*; use crate::{ + deadline::AddOutcome, signeddata::{AttesterDuty, ProposalBlock}, testutils::random_core_pub_key, types::{DutyType, SlotNumber}, @@ -619,8 +620,8 @@ mod tests { #[async_trait] impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> bool { - true + async fn add(&self, _duty: Duty) -> AddOutcome { + AddOutcome::Scheduled } } @@ -663,9 +664,9 @@ mod tests { #[async_trait] impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> bool { + async fn add(&self, duty: Duty) -> AddOutcome { self.added.lock().unwrap().push(duty); - true + AddOutcome::Scheduled } } diff --git a/crates/core/src/parsigdb/memory_internal_test.rs b/crates/core/src/parsigdb/memory_internal_test.rs index 8eb32cfe..cd122f64 100644 --- a/crates/core/src/parsigdb/memory_internal_test.rs +++ b/crates/core/src/parsigdb/memory_internal_test.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use super::{MemDB, get_threshold_matching, threshold_subscriber}; use crate::{ - deadline::Deadliner, + deadline::{AddOutcome, Deadliner}, signeddata::{BeaconCommitteeSelection, SignedSyncMessage, VersionedAttestation}, testutils::random_core_pub_key, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber}, @@ -198,11 +198,11 @@ impl TestDeadliner { #[async_trait::async_trait] impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> bool { + async fn add(&self, duty: Duty) -> AddOutcome { self.added .lock() .expect("test deadliner lock poisoned") .push(duty); - true + AddOutcome::Scheduled } } diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index 735bbe7d..c2d4e4bb 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -49,7 +49,7 @@ use tokio_util::sync::CancellationToken; use tracing::warn; use pluto_core::{ - deadline::Deadliner, + deadline::{AddOutcome, Deadliner}, parsigdb::memory::{ InternalSubscriberError, MemDB, MemDBError, internal_subscriber, threshold_subscriber, }, @@ -140,8 +140,8 @@ struct NoopDeadliner; #[async_trait] impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> bool { - true + async fn add(&self, _duty: Duty) -> AddOutcome { + AddOutcome::Scheduled } } From 207827d879327ce200594dbbaf58b702d03cd65c Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Fri, 22 May 2026 14:58:28 +0200 Subject: [PATCH 3/4] Remove the deadliner trait --- crates/core/src/deadline/calculator.rs | 11 +++ crates/core/src/deadline/mod.rs | 35 ++----- crates/core/src/dutydb/memory.rs | 99 ++++++++----------- crates/core/src/parsigdb/memory.rs | 6 +- .../core/src/parsigdb/memory_internal_test.rs | 76 ++++---------- crates/dkg/src/exchanger.rs | 26 ++--- 6 files changed, 92 insertions(+), 161 deletions(-) diff --git a/crates/core/src/deadline/calculator.rs b/crates/core/src/deadline/calculator.rs index 5fc79360..7ce188ab 100644 --- a/crates/core/src/deadline/calculator.rs +++ b/crates/core/src/deadline/calculator.rs @@ -86,6 +86,17 @@ pub trait DeadlineCalculator: Send + Sync + 'static { fn deadline(&self, duty: &Duty) -> Result>>; } +/// Calculator that reports every duty as never expiring. Useful for +/// scenarios that need to plug into the deadliner API but don't actually want +/// any eviction (e.g. DKG, which is one-shot and outside the slot timeline). +pub struct NeverExpiringCalculator; + +impl DeadlineCalculator for NeverExpiringCalculator { + fn deadline(&self, _duty: &Duty) -> Result>> { + Ok(None) + } +} + impl DeadlineCalculator for DutyDeadlineCalculator { fn deadline(&self, duty: &Duty) -> Result>> { if duty.duty_type.never_expires() { diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index d6793e50..cc02690e 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -8,7 +8,7 @@ //! //! ```no_run //! use pluto_core::{ -//! deadline::{AddOutcome, Deadliner, DeadlinerTask, DutyDeadlineCalculator}, +//! deadline::{AddOutcome, DeadlinerTask, DutyDeadlineCalculator}, //! types::{Duty, SlotNumber}, //! }; //! use pluto_eth2api::EthBeaconNodeApiClient; @@ -37,10 +37,9 @@ mod calculator; mod msecs; -pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator}; +pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator, NeverExpiringCalculator}; use crate::types::{Duty, DutyType, SlotNumber}; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use pluto_eth2api::EthBeaconNodeApiClientError; use std::{collections::HashSet, time::Duration}; @@ -109,21 +108,6 @@ impl AddOutcome { } } -/// Deadliner provides duty deadline functionality. -/// -/// Producers submit duties via [`add`](Self::add). Expired duties are -/// delivered on the receiver paired with the handle at -/// [`DeadlinerTask::start`]. -#[async_trait] -pub trait Deadliner: Send + Sync { - /// Adds a duty for deadline scheduling. - /// - /// Idempotent: re-adding a duty already tracked returns - /// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of - /// each variant. - async fn add(&self, duty: Duty) -> AddOutcome; -} - /// Internal message type for adding duties to the deadliner. struct DeadlineInput { duty: Duty, @@ -139,10 +123,13 @@ pub struct DeadlinerHandle { input_tx: mpsc::Sender, } -#[async_trait] -impl Deadliner for DeadlinerHandle { - async fn add(&self, duty: Duty) -> AddOutcome { - // Check if shut down +impl DeadlinerHandle { + /// Adds a duty for deadline scheduling. + /// + /// Idempotent: re-adding a duty already tracked returns + /// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of + /// each variant. + pub async fn add(&self, duty: Duty) -> AddOutcome { if self.cancel_token.is_cancelled() { return AddOutcome::FailedToCompute; } @@ -150,13 +137,11 @@ impl Deadliner for DeadlinerHandle { let (response_tx, response_rx) = oneshot::channel(); let input = DeadlineInput { duty, response_tx }; - // Send the duty to the background task if self.input_tx.send(input).await.is_err() { return AddOutcome::FailedToCompute; } - // Wait for response — `FailedToCompute` if the task dropped the - // sender (shutdown race). + // `FailedToCompute` if the task dropped the sender (shutdown race). response_rx.await.unwrap_or(AddOutcome::FailedToCompute) } } diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index ce0a3639..d596c0fb 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -2,7 +2,7 @@ //! //! Equivalent to charon/core/dutydb/memory.go. -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use pluto_eth2api::{ spec::{altair, phase0}, @@ -14,7 +14,7 @@ use tracing::{info, warn}; use tree_hash::TreeHash; use crate::{ - deadline::Deadliner, + deadline::DeadlinerHandle, signeddata::{ AttestationData, SyncContribution, VersionedAggregatedAttestation, VersionedProposal, }, @@ -199,14 +199,14 @@ pub struct MemDB { aggregation_notify: Notify, contrib_notify: Notify, cancel: CancellationToken, - deadliner: Arc, + deadliner: DeadlinerHandle, } impl MemDB { /// Creates a new in-memory DutyDB. `deadliner_rx` is the receiver paired /// with `deadliner` (typically from `DeadlinerTask::start`). pub fn new( - deadliner: Arc, + deadliner: DeadlinerHandle, deadliner_rx: mpsc::Receiver, cancel: &CancellationToken, ) -> Self { @@ -601,27 +601,29 @@ impl State { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; + use std::sync::Arc; - use async_trait::async_trait; - use tokio::sync::mpsc::{Receiver, Sender, channel}; + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc::{Receiver, channel}; use tokio_util::sync::CancellationToken; use super::*; use crate::{ - deadline::AddOutcome, + deadline::{self, DeadlineCalculator, DeadlinerTask}, signeddata::{AttesterDuty, ProposalBlock}, testutils::random_core_pub_key, types::{DutyType, SlotNumber}, }; - /// Deadliner that always accepts duties and never expires them. - pub(crate) struct NoopDeadliner; + /// Test calculator whose every duty is `Scheduled` (deadline is `MAX_UTC`). + /// The deadliner never actually fires for this calculator, so the paired + /// output receiver stays silent — eviction in tests is driven manually + /// through a separate channel (see `duty_expiry`). + struct FarFutureCalculator; - #[async_trait] - impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> AddOutcome { - AddOutcome::Scheduled + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> deadline::Result>> { + Ok(Some(DateTime::::MAX_UTC)) } } @@ -631,57 +633,26 @@ mod tests { rx } - /// Deadliner that collects duties and can flush them to a channel on - /// demand. - pub(crate) struct TestDeadliner { - added: Mutex>, - tx: Sender, - } - - impl TestDeadliner { - /// Returns the test deadliner alongside the matching expiry receiver - /// — call `expire()` on the deadliner to drive `rx`. - pub(crate) fn new() -> (Arc, Receiver) { - let (tx, rx) = channel(64); - let deadliner = Arc::new(Self { - added: Mutex::new(Vec::new()), - tx, - }); - (deadliner, rx) - } - - /// Send all collected duties to the expiry channel. - pub(crate) async fn expire(&self) { - let duties: Vec = { - let mut added = self.added.lock().unwrap(); - std::mem::take(&mut *added) - }; - for duty in duties { - let _ = self.tx.send(duty).await; - } - } - } - - #[async_trait] - impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> AddOutcome { - self.added.lock().unwrap().push(duty); - AddOutcome::Scheduled - } + /// Creates a real deadliner handle backed by [`FarFutureCalculator`] — + /// `add()` always reports `Scheduled` but nothing naturally expires. + fn far_future_handle() -> DeadlinerHandle { + let (handle, _drop_rx) = DeadlinerTask::start( + CancellationToken::new(), + "dutydb-tests", + FarFutureCalculator, + ); + handle } fn make_db() -> MemDB { MemDB::new( - Arc::new(NoopDeadliner), + far_future_handle(), noop_deadliner_rx(), &CancellationToken::new(), ) } - fn make_db_with_deadliner( - deadliner: Arc, - deadliner_rx: Receiver, - ) -> MemDB { + fn make_db_with_deadliner(deadliner: DeadlinerHandle, deadliner_rx: Receiver) -> MemDB { MemDB::new(deadliner, deadliner_rx, &CancellationToken::new()) } @@ -1112,8 +1083,12 @@ mod tests { #[tokio::test] async fn duty_expiry() { - let (deadliner, deadliner_rx) = TestDeadliner::new(); - let db = make_db_with_deadliner(deadliner.clone(), deadliner_rx); + // Real handle so `store()`'s `add(...).is_scheduled()` check passes. + // Eviction is driven manually via `trim_tx` so the test stays + // deterministic instead of racing the deadliner's timer. + let deadliner = far_future_handle(); + let (trim_tx, trim_rx) = channel::(64); + let db = make_db_with_deadliner(deadliner, trim_rx); const SLOT: u64 = 123; @@ -1130,8 +1105,12 @@ mod tests { // Should be findable now. db.pub_key_by_attestation(SLOT, 0, 0).await.unwrap(); - // Expire the duty. - deadliner.expire().await; + // Expire the duty: simulate the deadliner emitting it. + let expired_duty = Duty::new(SlotNumber::new(SLOT), DutyType::Attester); + trim_tx + .send(expired_duty) + .await + .expect("trim_tx should be open"); // Trigger expiry processing by storing another duty. let proposal = phase0_proposal(SLOT.saturating_add(1), 0); diff --git a/crates/core/src/parsigdb/memory.rs b/crates/core/src/parsigdb/memory.rs index efe0dd32..fee252b2 100644 --- a/crates/core/src/parsigdb/memory.rs +++ b/crates/core/src/parsigdb/memory.rs @@ -4,7 +4,7 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use crate::{ - deadline::Deadliner, + deadline::DeadlinerHandle, parsigdb::metrics::PARSIG_DB_METRICS, signeddata::SignedDataError, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey}, @@ -178,7 +178,7 @@ pub struct MemDBInner { pub struct MemDB { ct: CancellationToken, inner: Arc>, - deadliner: Arc, + deadliner: DeadlinerHandle, threshold: u64, } @@ -189,7 +189,7 @@ impl MemDB { /// * `ct` - Cancellation token for graceful shutdown /// * `threshold` - Number of matching partial signatures required /// * `deadliner` - Deadliner for managing duty expiration - pub fn new(ct: CancellationToken, threshold: u64, deadliner: Arc) -> Self { + pub fn new(ct: CancellationToken, threshold: u64, deadliner: DeadlinerHandle) -> Self { Self { ct, inner: Arc::new(Mutex::new(MemDBInner { diff --git a/crates/core/src/parsigdb/memory_internal_test.rs b/crates/core/src/parsigdb/memory_internal_test.rs index cd122f64..a62aec32 100644 --- a/crates/core/src/parsigdb/memory_internal_test.rs +++ b/crates/core/src/parsigdb/memory_internal_test.rs @@ -1,17 +1,17 @@ -use std::{ - sync::{Arc, Mutex as StdMutex}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use pluto_eth2api::{spec::altair, v1}; use pluto_testutil as testutil; use test_case::test_case; -use tokio::sync::{Mutex, mpsc}; +use tokio::{ + sync::{Mutex, mpsc}, + time::sleep, +}; use tokio_util::sync::CancellationToken; use super::{MemDB, get_threshold_matching, threshold_subscriber}; use crate::{ - deadline::{AddOutcome, Deadliner}, + deadline::{DeadlinerTask, NeverExpiringCalculator}, signeddata::{BeaconCommitteeSelection, SignedSyncMessage, VersionedAttestation}, testutils::random_core_pub_key, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber}, @@ -103,15 +103,19 @@ async fn memdb_threshold() { const THRESHOLD: u64 = 7; const N: usize = 10; - let (deadliner, deadliner_rx) = TestDeadliner::new(); - let deadliner = Arc::new(deadliner); let cancel = CancellationToken::new(); - let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner.clone())); + // Real deadliner so `MemDB.store_external` has a handle to call `.add` on. + // The calculator never expires anything, so the deadliner's natural output + // is silent — eviction is driven manually through `trim_tx` below. + let (deadliner, _drop_rx) = + DeadlinerTask::start(cancel.clone(), "memdb_threshold", NeverExpiringCalculator); + let (trim_tx, trim_rx) = mpsc::channel::(32); + let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner)); let trim_handle = tokio::spawn({ let db = db.clone(); async move { - db.trim(deadliner_rx).await; + db.trim(trim_rx).await; } }); @@ -152,8 +156,13 @@ async fn memdb_threshold() { enqueue_n().await; assert_eq!(1, *times_called.lock().await); - deadliner.expire().await; - tokio::time::sleep(Duration::from_millis(20)).await; + // Drive eviction manually: simulate the deadliner emitting `duty` as + // expired. Wait a beat so the trim task processes it. + trim_tx + .send(duty.clone()) + .await + .expect("trim_tx should be open"); + sleep(Duration::from_millis(20)).await; enqueue_n().await; assert_eq!(2, *times_called.lock().await); @@ -163,46 +172,3 @@ async fn memdb_threshold() { .await .expect("trim task should shut down cleanly"); } - -struct TestDeadliner { - added: StdMutex>, - tx: mpsc::Sender, -} - -impl TestDeadliner { - /// Returns the test deadliner with the matching expiry receiver. - fn new() -> (Self, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(32); - let deadliner = Self { - added: StdMutex::new(Vec::new()), - tx, - }; - (deadliner, rx) - } - - async fn expire(&self) -> bool { - let duties = { - let mut added = self.added.lock().expect("test deadliner lock poisoned"); - std::mem::take(&mut *added) - }; - - for duty in duties { - if self.tx.send(duty).await.is_err() { - return false; - } - } - - true - } -} - -#[async_trait::async_trait] -impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> AddOutcome { - self.added - .lock() - .expect("test deadliner lock poisoned") - .push(duty); - AddOutcome::Scheduled - } -} diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index c2d4e4bb..4bc8292a 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -42,14 +42,13 @@ use std::{ sync::Arc, }; -use async_trait::async_trait; use libp2p::PeerId; use tokio::sync::{Mutex, Notify}; use tokio_util::sync::CancellationToken; use tracing::warn; use pluto_core::{ - deadline::{AddOutcome, Deadliner}, + deadline::{DeadlinerTask, NeverExpiringCalculator}, parsigdb::memory::{ InternalSubscriberError, MemDB, MemDBError, internal_subscriber, threshold_subscriber, }, @@ -134,17 +133,6 @@ pub struct Exchanger { pub ct: CancellationToken, } -/// Deadliner that never expires any duty, used during DKG where slot-based -/// expiry does not apply. -struct NoopDeadliner; - -#[async_trait] -impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> AddOutcome { - AddOutcome::Scheduled - } -} - impl Exchanger { /// Creates a new exchanger and wires up the three core subscriptions: /// @@ -181,11 +169,13 @@ impl Exchanger { // threshold is len(peers) to wait until we get all the partial sigs from all // the peers per DV let threshold = u64::try_from(peers.len()).expect("usize fits in u64"); - let sigdb = Arc::new(Mutex::new(MemDB::new( - ct.clone(), - threshold, - Arc::new(NoopDeadliner), - ))); + // DKG is one-shot and outside the slot timeline; we wire a real + // deadliner with a never-expiring calculator just to satisfy the + // `MemDB` API. The paired receiver is dropped — the calculator + // guarantees the background task never tries to publish. + let (deadliner, _expired_rx) = + DeadlinerTask::start(ct.clone(), "dkg-exchanger", NeverExpiringCalculator); + let sigdb = Arc::new(Mutex::new(MemDB::new(ct.clone(), threshold, deadliner))); let sig_data = DataByPubkey::default(); // Wiring core workflow components From 0e1f3b6439f2072d46c3c72abfed790235e5743e Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Fri, 22 May 2026 19:14:35 +0200 Subject: [PATCH 4/4] Fix docs --- crates/core/src/deadline/mod.rs | 8 ++++---- crates/core/src/parsigdb/memory.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index cc02690e..37cf546b 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -1,8 +1,8 @@ //! Duty deadline tracking and notification functionality. //! -//! This module provides the [`Deadliner`] trait for tracking duty deadlines -//! and notifying when duties expire. It implements a background task that -//! manages timers for multiple duties and sends expired duties to a channel. +//! Provides `DeadlinerHandle` for tracking duty deadlines and notifying when +//! duties expire. A background task spawned by `DeadlinerTask::start` manages +//! timers for multiple duties and emits expired ones on a channel. //! //! # Example //! @@ -82,7 +82,7 @@ fn to_chrono_duration(duration: Duration) -> Result { chrono::Duration::from_std(duration).map_err(|_| DeadlineError::DurationConversion) } -/// Outcome of [`Deadliner::add`]. +/// Outcome of [`DeadlinerHandle::add`]. /// /// Spells out the four distinct cases the previous `bool` return value /// conflated, so callers can react specifically (e.g. drop a duty that diff --git a/crates/core/src/parsigdb/memory.rs b/crates/core/src/parsigdb/memory.rs index fee252b2..1eb4abe4 100644 --- a/crates/core/src/parsigdb/memory.rs +++ b/crates/core/src/parsigdb/memory.rs @@ -303,7 +303,7 @@ impl MemDB { /// Runs in a loop, listening on `deadliner_rx` for expired duties and /// removing their associated data. Should be spawned as a background task; /// returns when the cancellation token is triggered or the receiver - /// closes. The receiver is the one paired with the [`Deadliner`] handle at + /// closes. The receiver is the one paired with the [`DeadlinerHandle`] at /// `DeadlinerTask::start`. pub async fn trim(&self, mut deadliner_rx: mpsc::Receiver) { loop {