diff --git a/crates/core/src/deadline/calculator.rs b/crates/core/src/deadline/calculator.rs index 5fc79360..7ce188ab 100644 --- a/crates/core/src/deadline/calculator.rs +++ b/crates/core/src/deadline/calculator.rs @@ -86,6 +86,17 @@ pub trait DeadlineCalculator: Send + Sync + 'static { fn deadline(&self, duty: &Duty) -> Result>>; } +/// Calculator that reports every duty as never expiring. Useful for +/// scenarios that need to plug into the deadliner API but don't actually want +/// any eviction (e.g. DKG, which is one-shot and outside the slot timeline). +pub struct NeverExpiringCalculator; + +impl DeadlineCalculator for NeverExpiringCalculator { + fn deadline(&self, _duty: &Duty) -> Result>> { + Ok(None) + } +} + impl DeadlineCalculator for DutyDeadlineCalculator { fn deadline(&self, duty: &Duty) -> Result>> { if duty.duty_type.never_expires() { diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index 45be4055..37cf546b 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -1,32 +1,34 @@ //! Duty deadline tracking and notification functionality. //! -//! This module provides the [`Deadliner`] trait for tracking duty deadlines -//! and notifying when duties expire. It implements a background task that -//! manages timers for multiple duties and sends expired duties to a channel. +//! Provides `DeadlinerHandle` for tracking duty deadlines and notifying when +//! duties expire. A background task spawned by `DeadlinerTask::start` manages +//! timers for multiple duties and emits expired ones on a channel. //! //! # Example //! //! ```no_run //! use pluto_core::{ -//! deadline::{DeadlinerTask, DutyDeadlineCalculator}, +//! deadline::{AddOutcome, DeadlinerTask, DutyDeadlineCalculator}, //! types::{Duty, SlotNumber}, //! }; //! use pluto_eth2api::EthBeaconNodeApiClient; -//! use std::sync::Arc; //! use tokio_util::sync::CancellationToken; //! //! # async fn example(client: &EthBeaconNodeApiClient) -> anyhow::Result<()> { //! let cancel_token = CancellationToken::new(); //! let calculator = DutyDeadlineCalculator::from_client(client).await?; -//! let deadliner = DeadlinerTask::start(cancel_token, "example", calculator); +//! let (deadliner, mut rx) = DeadlinerTask::start(cancel_token, "example", calculator); //! //! let duty = Duty::new_attester_duty(SlotNumber::new(1)); -//! let added = deadliner.add(duty).await; +//! match deadliner.add(duty).await { +//! AddOutcome::Scheduled => {} +//! AddOutcome::AlreadyExpired => eprintln!("duty already expired — skipped"), +//! AddOutcome::NoDeadline => {} +//! AddOutcome::FailedToCompute => eprintln!("deadline calculation failed"), +//! } //! -//! if let Some(mut rx) = deadliner.c() { -//! while let Some(expired_duty) = rx.recv().await { -//! println!("Duty expired: {}", expired_duty); -//! } +//! while let Some(expired_duty) = rx.recv().await { +//! println!("Duty expired: {}", expired_duty); //! } //! # Ok(()) //! # } @@ -35,17 +37,12 @@ mod calculator; mod msecs; -pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator}; +pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator, NeverExpiringCalculator}; use crate::types::{Duty, DutyType, SlotNumber}; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use pluto_eth2api::EthBeaconNodeApiClientError; -use std::{ - collections::HashSet, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashSet, time::Duration}; use tokio::{ sync::{mpsc, oneshot}, time::sleep, @@ -85,79 +82,73 @@ fn to_chrono_duration(duration: Duration) -> Result { chrono::Duration::from_std(duration).map_err(|_| DeadlineError::DurationConversion) } -/// Deadliner provides duty deadline functionality. +/// Outcome of [`DeadlinerHandle::add`]. /// -/// The `c()` method returns a channel for receiving expired duties. -/// It may only be called once and the returned channel should be used -/// by a single task. Multiple instances are required for different -/// components and use cases. -#[async_trait] -pub trait Deadliner: Send + Sync { - /// Adds a duty for deadline scheduling. - /// - /// Returns `true` if the duty was added for future deadline scheduling. - /// This method is idempotent and returns `true` if the duty was previously - /// added and still awaits deadline scheduling. - /// - /// Returns `false` if: - /// - The duty has already expired and cannot be scheduled - /// - The calculator reports the duty has no deadline (`Ok(None)`) - /// - The calculator failed to compute the deadline (`Err(_)`) - async fn add(&self, duty: Duty) -> bool; +/// Spells out the four distinct cases the previous `bool` return value +/// conflated, so callers can react specifically (e.g. drop a duty that +/// already expired vs. log a calculator error). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AddOutcome { + /// The duty was accepted and a timer is now armed for its deadline. + Scheduled, + /// The duty's deadline is already in the past — nothing scheduled. + AlreadyExpired, + /// The calculator reports this duty type has no deadline (e.g. Exit, + /// BuilderRegistration). Not an error — just not tracked. + NoDeadline, + /// The calculator returned an error while computing the deadline. + FailedToCompute, +} - /// Returns the channel for receiving deadlined duties. - /// - /// This method may only be called once and returns `None` on subsequent - /// calls. The returned channel should only be used by a single task. - fn c(&self) -> Option>; +impl AddOutcome { + /// `true` only for [`AddOutcome::Scheduled`] — convenient for callers + /// that still treat "added" as a yes/no question. + pub fn is_scheduled(self) -> bool { + matches!(self, AddOutcome::Scheduled) + } } /// Internal message type for adding duties to the deadliner. struct DeadlineInput { duty: Duty, - response_tx: oneshot::Sender, + response_tx: oneshot::Sender, } -/// Public-facing handle: the `Arc` returned by -/// [`DeadlinerTask::start`] wraps this. Holds the input channel, the -/// take-once output receiver, and the cancellation token. -struct DeadlinerHandle { +/// Public-facing handle returned (paired with the expired-duty receiver) by +/// [`DeadlinerTask::start`]. Cloning is cheap and shares the same background +/// task — share it freely across producers inside one service. +#[derive(Clone)] +pub struct DeadlinerHandle { cancel_token: CancellationToken, input_tx: mpsc::Sender, - output_rx: Mutex>>, } -#[async_trait] -impl Deadliner for DeadlinerHandle { - async fn add(&self, duty: Duty) -> bool { - // Check if shut down +impl DeadlinerHandle { + /// Adds a duty for deadline scheduling. + /// + /// Idempotent: re-adding a duty already tracked returns + /// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of + /// each variant. + pub async fn add(&self, duty: Duty) -> AddOutcome { if self.cancel_token.is_cancelled() { - return false; + return AddOutcome::FailedToCompute; } let (response_tx, response_rx) = oneshot::channel(); let input = DeadlineInput { duty, response_tx }; - // Send the duty to the background task if self.input_tx.send(input).await.is_err() { - return false; + return AddOutcome::FailedToCompute; } - // Wait for response - response_rx.await.unwrap_or(false) - } - - fn c(&self) -> Option> { - self.output_rx - .lock() - .ok() - .and_then(|mut guard| guard.take()) + // `FailedToCompute` if the task dropped the sender (shutdown race). + response_rx.await.unwrap_or(AddOutcome::FailedToCompute) } } /// Owned state of the background task that drives a [`DeadlinerHandle`]'s /// duty timers. Held exclusively by the spawned task — that's why it lives -/// outside the `Arc` and `run_task` can take `mut self`. +/// outside the public handle and `run_task` can take `mut self`. /// Constructed and spawned via [`DeadlinerTask::start`]. pub struct DeadlinerTask { cancel_token: CancellationToken, @@ -172,14 +163,15 @@ pub struct DeadlinerTask { } impl DeadlinerTask { - /// Builds the public-facing [`Deadliner`] handle and spawns the background - /// task that drives it. The background loop exits when `cancel_token` is - /// cancelled. + /// Spawns the background task and returns a `(handle, expired_rx)` pair. + /// The cloneable `handle` is for adding duties from any number of + /// producers; `expired_rx` is the single consumer's receiver of expired + /// duties. The background loop exits when `cancel_token` is cancelled. pub fn start( cancel_token: CancellationToken, label: impl Into, calculator: C, - ) -> Arc { + ) -> (DeadlinerHandle, mpsc::Receiver) { // Matches Charon's `outputBuffer = 10` — big enough for all duty // types expiring simultaneously while the consumer drains synchronously. const OUTPUT_BUFFER: usize = 10; @@ -204,13 +196,12 @@ impl DeadlinerTask { }; tokio::spawn(task.run_task()); - let link = DeadlinerHandle { + let handle = DeadlinerHandle { cancel_token, input_tx, - output_rx: Mutex::new(Some(output_rx)), }; - Arc::new(link) + (handle, output_rx) } /// Background task that manages duty deadlines. @@ -295,11 +286,11 @@ impl DeadlinerTask { let duty = input.duty; match self.calculator.deadline(&duty) { Ok(Some(deadline)) => { - let expired = deadline < Utc::now(); - let _ = input.response_tx.send(!expired); - if expired { + if deadline < Utc::now() { + let _ = input.response_tx.send(AddOutcome::AlreadyExpired); return None; } + let _ = input.response_tx.send(AddOutcome::Scheduled); self.duties.insert(duty); if deadline < self.curr_deadline { self.recompute_curr(); @@ -315,12 +306,13 @@ impl DeadlinerTask { error = %err, "Failed to compute deadline for duty" ); - let _ = input.response_tx.send(false); + let _ = input.response_tx.send(AddOutcome::FailedToCompute); None } Ok(None) => { - // Drop duties that never expire - let _ = input.response_tx.send(false); + // Duty type has no deadline (Exit, BuilderRegistration) — + // not tracked. + let _ = input.response_tx.send(AddOutcome::NoDeadline); None } } @@ -361,7 +353,7 @@ impl DeadlinerTask { mod tests { use super::{msecs::Msecs, *}; use crate::types::SlotNumber; - use anyhow::{Context, Result, bail}; + use anyhow::{Context, Result, bail, ensure}; use pluto_testutil::BeaconMock; use tokio::time::timeout; @@ -407,12 +399,12 @@ mod tests { /// channel. async fn add_duties( duties: Vec, - deadliner: Arc, - result_tx: mpsc::Sender, + deadliner: DeadlinerHandle, + result_tx: mpsc::Sender, ) { for duty in duties { - let added = deadliner.add(duty).await; - let _ = result_tx.send(added).await; + let outcome = deadliner.add(duty).await; + let _ = result_tx.send(outcome).await; } } @@ -461,9 +453,8 @@ mod tests { }; let cancel_token = CancellationToken::new(); - let deadliner = DeadlinerTask::start(cancel_token.clone(), "test", calculator); - - let mut output_rx = deadliner.c().context("output receiver already taken")?; + let (deadliner, mut output_rx) = + DeadlinerTask::start(cancel_token.clone(), "test", calculator); let (expired_tx, mut expired_rx) = mpsc::channel(100); let (non_expired_tx, mut non_expired_rx) = mpsc::channel(100); @@ -472,21 +463,15 @@ mod tests { let non_expired_len = non_expired_duties.len(); let future_duties_len = future_duties.len(); - let handler_expired = tokio::spawn(add_duties( - expired_duties, - Arc::clone(&deadliner), - expired_tx, - )); + let handler_expired = + tokio::spawn(add_duties(expired_duties, deadliner.clone(), expired_tx)); let handler_non_expired = tokio::spawn(add_duties( non_expired_duties.clone(), - Arc::clone(&deadliner), + deadliner.clone(), non_expired_tx.clone(), )); - let handler_future_duties = tokio::spawn(add_duties( - future_duties, - Arc::clone(&deadliner), - non_expired_tx, - )); + let handler_future_duties = + tokio::spawn(add_duties(future_duties, deadliner.clone(), non_expired_tx)); let (result_expired, result_non_expired, result_future_duties) = tokio::join!(handler_expired, handler_non_expired, handler_future_duties); @@ -495,19 +480,25 @@ mod tests { result_future_duties?; for _ in 0..expired_len { - let result = expired_rx.recv().await.context("expected expired ack")?; - assert!(!result, "expired duties should return false"); + let outcome = expired_rx.recv().await.context("expected expired ack")?; + ensure!( + outcome == AddOutcome::AlreadyExpired, + "expired duties should report AlreadyExpired, got {outcome:?}" + ); } let added_count = non_expired_len .checked_add(future_duties_len) .context("added_count overflow")?; for _ in 0..added_count { - let result = non_expired_rx + let outcome = non_expired_rx .recv() .await .context("expected non-expired ack")?; - assert!(result, "non-expired duties should return true"); + ensure!( + outcome == AddOutcome::Scheduled, + "non-expired duties should be Scheduled, got {outcome:?}" + ); } // Collect expired duties from output channel. @@ -536,8 +527,6 @@ mod tests { /// deadliner. #[tokio::test] async fn expired_duties_arrive_in_deadline_order() -> Result<()> { - use anyhow::ensure; - let start_time = Utc::now(); let calculator = TestCalculator { start_time, @@ -545,9 +534,8 @@ mod tests { }; let cancel_token = CancellationToken::new(); - let deadliner = DeadlinerTask::start(cancel_token.clone(), "order-test", calculator); - - let mut output_rx = deadliner.c().context("output receiver already taken")?; + let (deadliner, mut output_rx) = + DeadlinerTask::start(cancel_token.clone(), "order-test", calculator); // TestCalculator: deadline = start_time + slot * 500ms. // Insert the later one first to make sure ordering is by deadline, @@ -556,9 +544,15 @@ mod tests { let earlier = Duty::new_attester_duty(SlotNumber::new(1)); let added_later = deadliner.add(later.clone()).await; - ensure!(added_later, "later duty should be added"); + ensure!( + added_later == AddOutcome::Scheduled, + "later duty should be Scheduled, got {added_later:?}" + ); let added_earlier = deadliner.add(earlier.clone()).await; - ensure!(added_earlier, "earlier duty should be added"); + ensure!( + added_earlier == AddOutcome::Scheduled, + "earlier duty should be Scheduled, got {added_earlier:?}" + ); let first = timeout(Duration::from_secs(5), output_rx.recv()) .await diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 94dbc98d..d596c0fb 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -2,19 +2,19 @@ //! //! Equivalent to charon/core/dutydb/memory.go. -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use pluto_eth2api::{ spec::{altair, phase0}, versioned, }; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::{Notify, RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use tree_hash::TreeHash; use crate::{ - deadline::Deadliner, + deadline::DeadlinerHandle, signeddata::{ AttestationData, SyncContribution, VersionedAggregatedAttestation, VersionedProposal, }, @@ -199,15 +199,17 @@ pub struct MemDB { aggregation_notify: Notify, contrib_notify: Notify, cancel: CancellationToken, - deadliner: Arc, + deadliner: DeadlinerHandle, } impl MemDB { - /// Creates a new in-memory DutyDB. - pub fn new(deadliner: Arc, cancel: &CancellationToken) -> Self { - let deadliner_rx = deadliner.c().expect( - "Deadliner::c() returned None — the receiver was already consumed. Each MemDB must use a fresh Deadliner.", - ); + /// Creates a new in-memory DutyDB. `deadliner_rx` is the receiver paired + /// with `deadliner` (typically from `DeadlinerTask::start`). + pub fn new( + deadliner: DeadlinerHandle, + deadliner_rx: mpsc::Receiver, + cancel: &CancellationToken, + ) -> Self { Self { state: RwLock::new(State { attestation_duties: HashMap::new(), @@ -245,7 +247,7 @@ impl MemDB { let mut state = self.state.write().await; - if !self.deadliner.add(duty.clone()).await { + if !self.deadliner.add(duty.clone()).await.is_scheduled() { return Err(Error::ExpiredDuty); } @@ -599,82 +601,59 @@ impl State { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; + use std::sync::Arc; - use async_trait::async_trait; - use tokio::sync::mpsc::{Receiver, Sender, channel}; + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc::{Receiver, channel}; use tokio_util::sync::CancellationToken; use super::*; use crate::{ + deadline::{self, DeadlineCalculator, DeadlinerTask}, signeddata::{AttesterDuty, ProposalBlock}, testutils::random_core_pub_key, types::{DutyType, SlotNumber}, }; - /// Deadliner that always accepts duties and never expires them. - pub(crate) struct NoopDeadliner; - - #[async_trait] - impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> bool { - true - } + /// Test calculator whose every duty is `Scheduled` (deadline is `MAX_UTC`). + /// The deadliner never actually fires for this calculator, so the paired + /// output receiver stays silent — eviction in tests is driven manually + /// through a separate channel (see `duty_expiry`). + struct FarFutureCalculator; - fn c(&self) -> Option> { - let (_, rx) = channel(1); - Some(rx) + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> deadline::Result>> { + Ok(Some(DateTime::::MAX_UTC)) } } - /// Deadliner that collects duties and can flush them to a channel on - /// demand. - pub(crate) struct TestDeadliner { - added: Mutex>, - tx: Sender, - rx: Mutex>>, - } - - impl TestDeadliner { - pub(crate) fn new() -> Arc { - let (tx, rx) = channel(64); - Arc::new(Self { - added: Mutex::new(Vec::new()), - tx, - rx: Mutex::new(Some(rx)), - }) - } - - /// Send all collected duties to the expiry channel. - pub(crate) async fn expire(&self) { - let duties: Vec = { - let mut added = self.added.lock().unwrap(); - std::mem::take(&mut *added) - }; - for duty in duties { - let _ = self.tx.send(duty).await; - } - } + /// Builds a never-firing receiver for tests that don't exercise eviction. + pub(crate) fn noop_deadliner_rx() -> Receiver { + let (_, rx) = channel(1); + rx } - #[async_trait] - impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> bool { - self.added.lock().unwrap().push(duty); - true - } - - fn c(&self) -> Option> { - self.rx.lock().unwrap().take() - } + /// Creates a real deadliner handle backed by [`FarFutureCalculator`] — + /// `add()` always reports `Scheduled` but nothing naturally expires. + fn far_future_handle() -> DeadlinerHandle { + let (handle, _drop_rx) = DeadlinerTask::start( + CancellationToken::new(), + "dutydb-tests", + FarFutureCalculator, + ); + handle } fn make_db() -> MemDB { - MemDB::new(Arc::new(NoopDeadliner), &CancellationToken::new()) + MemDB::new( + far_future_handle(), + noop_deadliner_rx(), + &CancellationToken::new(), + ) } - fn make_db_with_deadliner(deadliner: Arc) -> MemDB { - MemDB::new(deadliner, &CancellationToken::new()) + fn make_db_with_deadliner(deadliner: DeadlinerHandle, deadliner_rx: Receiver) -> MemDB { + MemDB::new(deadliner, deadliner_rx, &CancellationToken::new()) } fn att_data(slot: u64, committee_index: u64, validator_index: u64) -> AttestationData { @@ -1104,8 +1083,12 @@ mod tests { #[tokio::test] async fn duty_expiry() { - let deadliner = TestDeadliner::new(); - let db = make_db_with_deadliner(Arc::clone(&deadliner) as Arc); + // Real handle so `store()`'s `add(...).is_scheduled()` check passes. + // Eviction is driven manually via `trim_tx` so the test stays + // deterministic instead of racing the deadliner's timer. + let deadliner = far_future_handle(); + let (trim_tx, trim_rx) = channel::(64); + let db = make_db_with_deadliner(deadliner, trim_rx); const SLOT: u64 = 123; @@ -1122,8 +1105,12 @@ mod tests { // Should be findable now. db.pub_key_by_attestation(SLOT, 0, 0).await.unwrap(); - // Expire the duty. - deadliner.expire().await; + // Expire the duty: simulate the deadliner emitting it. + let expired_duty = Duty::new(SlotNumber::new(SLOT), DutyType::Attester); + trim_tx + .send(expired_duty) + .await + .expect("trim_tx should be open"); // Trigger expiry processing by storing another duty. let proposal = phase0_proposal(SLOT.saturating_add(1), 0); diff --git a/crates/core/src/parsigdb/memory.rs b/crates/core/src/parsigdb/memory.rs index 44ffa9f8..1eb4abe4 100644 --- a/crates/core/src/parsigdb/memory.rs +++ b/crates/core/src/parsigdb/memory.rs @@ -1,10 +1,10 @@ use std::{collections::HashMap, error::Error as StdError, future::Future, pin::Pin, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::debug; use crate::{ - deadline::Deadliner, + deadline::DeadlinerHandle, parsigdb::metrics::PARSIG_DB_METRICS, signeddata::SignedDataError, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey}, @@ -178,7 +178,7 @@ pub struct MemDBInner { pub struct MemDB { ct: CancellationToken, inner: Arc>, - deadliner: Arc, + deadliner: DeadlinerHandle, threshold: u64, } @@ -189,7 +189,7 @@ impl MemDB { /// * `ct` - Cancellation token for graceful shutdown /// * `threshold` - Number of matching partial signatures required /// * `deadliner` - Deadliner for managing duty expiration - pub fn new(ct: CancellationToken, threshold: u64, deadliner: Arc) -> Self { + pub fn new(ct: CancellationToken, threshold: u64, deadliner: DeadlinerHandle) -> Self { Self { ct, inner: Arc::new(Mutex::new(MemDBInner { @@ -300,16 +300,12 @@ impl MemDB { /// Trims expired duties from the database. /// - /// This method runs in a loop, listening for expired duties from the - /// deadliner and removing their associated data from the database. It - /// should be spawned as a background task and will run until the - /// cancellation token is triggered. - pub async fn trim(&self) { - let Some(mut deadliner_rx) = self.deadliner.c() else { - warn!("Deadliner channel is not available"); - return; - }; - + /// Runs in a loop, listening on `deadliner_rx` for expired duties and + /// removing their associated data. Should be spawned as a background task; + /// returns when the cancellation token is triggered or the receiver + /// closes. The receiver is the one paired with the [`DeadlinerHandle`] at + /// `DeadlinerTask::start`. + pub async fn trim(&self, mut deadliner_rx: mpsc::Receiver) { loop { tokio::select! { biased; diff --git a/crates/core/src/parsigdb/memory_internal_test.rs b/crates/core/src/parsigdb/memory_internal_test.rs index d5a932af..a62aec32 100644 --- a/crates/core/src/parsigdb/memory_internal_test.rs +++ b/crates/core/src/parsigdb/memory_internal_test.rs @@ -1,17 +1,17 @@ -use std::{ - sync::{Arc, Mutex as StdMutex}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use pluto_eth2api::{spec::altair, v1}; use pluto_testutil as testutil; use test_case::test_case; -use tokio::sync::{Mutex, mpsc}; +use tokio::{ + sync::{Mutex, mpsc}, + time::sleep, +}; use tokio_util::sync::CancellationToken; use super::{MemDB, get_threshold_matching, threshold_subscriber}; use crate::{ - deadline::Deadliner, + deadline::{DeadlinerTask, NeverExpiringCalculator}, signeddata::{BeaconCommitteeSelection, SignedSyncMessage, VersionedAttestation}, testutils::random_core_pub_key, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber}, @@ -103,14 +103,19 @@ async fn memdb_threshold() { const THRESHOLD: u64 = 7; const N: usize = 10; - let deadliner = Arc::new(TestDeadliner::new()); let cancel = CancellationToken::new(); - let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner.clone())); + // Real deadliner so `MemDB.store_external` has a handle to call `.add` on. + // The calculator never expires anything, so the deadliner's natural output + // is silent — eviction is driven manually through `trim_tx` below. + let (deadliner, _drop_rx) = + DeadlinerTask::start(cancel.clone(), "memdb_threshold", NeverExpiringCalculator); + let (trim_tx, trim_rx) = mpsc::channel::(32); + let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner)); let trim_handle = tokio::spawn({ let db = db.clone(); async move { - db.trim().await; + db.trim(trim_rx).await; } }); @@ -151,8 +156,13 @@ async fn memdb_threshold() { enqueue_n().await; assert_eq!(1, *times_called.lock().await); - deadliner.expire().await; - tokio::time::sleep(Duration::from_millis(20)).await; + // Drive eviction manually: simulate the deadliner emitting `duty` as + // expired. Wait a beat so the trim task processes it. + trim_tx + .send(duty.clone()) + .await + .expect("trim_tx should be open"); + sleep(Duration::from_millis(20)).await; enqueue_n().await; assert_eq!(2, *times_called.lock().await); @@ -162,50 +172,3 @@ async fn memdb_threshold() { .await .expect("trim task should shut down cleanly"); } - -struct TestDeadliner { - added: StdMutex>, - tx: mpsc::Sender, - rx: StdMutex>>, -} - -impl TestDeadliner { - fn new() -> Self { - let (tx, rx) = mpsc::channel(32); - Self { - added: StdMutex::new(Vec::new()), - tx, - rx: StdMutex::new(Some(rx)), - } - } - - async fn expire(&self) -> bool { - let duties = { - let mut added = self.added.lock().expect("test deadliner lock poisoned"); - std::mem::take(&mut *added) - }; - - for duty in duties { - if self.tx.send(duty).await.is_err() { - return false; - } - } - - true - } -} - -#[async_trait::async_trait] -impl Deadliner for TestDeadliner { - async fn add(&self, duty: Duty) -> bool { - self.added - .lock() - .expect("test deadliner lock poisoned") - .push(duty); - true - } - - fn c(&self) -> Option> { - self.rx.lock().expect("test deadliner lock poisoned").take() - } -} diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index 26a33beb..4bc8292a 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -42,14 +42,13 @@ use std::{ sync::Arc, }; -use async_trait::async_trait; use libp2p::PeerId; use tokio::sync::{Mutex, Notify}; use tokio_util::sync::CancellationToken; use tracing::warn; use pluto_core::{ - deadline::Deadliner, + deadline::{DeadlinerTask, NeverExpiringCalculator}, parsigdb::memory::{ InternalSubscriberError, MemDB, MemDBError, internal_subscriber, threshold_subscriber, }, @@ -134,21 +133,6 @@ pub struct Exchanger { pub ct: CancellationToken, } -/// Deadliner that never expires any duty, used during DKG where slot-based -/// expiry does not apply. -struct NoopDeadliner; - -#[async_trait] -impl Deadliner for NoopDeadliner { - async fn add(&self, _duty: Duty) -> bool { - true - } - - fn c(&self) -> Option> { - None - } -} - impl Exchanger { /// Creates a new exchanger and wires up the three core subscriptions: /// @@ -185,11 +169,13 @@ impl Exchanger { // threshold is len(peers) to wait until we get all the partial sigs from all // the peers per DV let threshold = u64::try_from(peers.len()).expect("usize fits in u64"); - let sigdb = Arc::new(Mutex::new(MemDB::new( - ct.clone(), - threshold, - Arc::new(NoopDeadliner), - ))); + // DKG is one-shot and outside the slot timeline; we wire a real + // deadliner with a never-expiring calculator just to satisfy the + // `MemDB` API. The paired receiver is dropped — the calculator + // guarantees the background task never tries to publish. + let (deadliner, _expired_rx) = + DeadlinerTask::start(ct.clone(), "dkg-exchanger", NeverExpiringCalculator); + let sigdb = Arc::new(Mutex::new(MemDB::new(ct.clone(), threshold, deadliner))); let sig_data = DataByPubkey::default(); // Wiring core workflow components