Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/message_pool/msg_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ impl Chains {
}
}

pub(in crate::message_pool) fn bubble_down_after_trim(&mut self, from: usize) {
let mut j = from;
while j < self.key_vec.len().saturating_sub(1) {
#[allow(clippy::indexing_slicing)]
if self[j].compare(&self[j + 1]) == Ordering::Less {
break;
}
self.key_vec.swap(j, j + 1);
j += 1;
}
}

pub(in crate::message_pool) fn invalidate(&mut self, mut key: Option<NodeKey>) {
let mut next_keys = vec![];

Expand Down
37 changes: 26 additions & 11 deletions src/message_pool/msgpool/republish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
//! Tracks which CIDs were already broadcast in the current republish cycle
//! and exposes a trigger to wake the republish task early.

use std::cmp::Ordering;

use crate::message::{MessageRead as _, SignedMessage};
use crate::message_pool::{
Error,
Expand Down Expand Up @@ -174,15 +172,7 @@ where
// we can't fit the current chain but there is gas to spare
// trim it and push it down
chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
let mut j = i;
while j < chains.len() - 1 {
#[allow(clippy::indexing_slicing)]
if chains[j].compare(&chains[j + 1]) == Ordering::Less {
break;
}
chains.key_vec.swap(i, i + 1);
j += 1;
}
chains.bubble_down_after_trim(i);
}

if msgs.len() > REPUB_MSG_LIMIT {
Expand All @@ -195,6 +185,31 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::message_pool::msg_chain::MsgChainNode;
use crate::shim::econ::TokenAmount;

fn chains_from_perfs(perfs: &[f64]) -> Chains {
let mut chains = Chains::new();
let mut key_vec = Vec::with_capacity(perfs.len());
for (i, &p) in perfs.iter().enumerate() {
let node = MsgChainNode {
gas_perf: p,
gas_reward: TokenAmount::from_atto(i as u64 + 1),
..Default::default()
};
chains.push_with(node, &mut key_vec);
}
chains.key_vec = key_vec;
chains
}

#[test]
fn bubble_down_after_trim_restores_compare_order() {
let mut chains = chains_from_perfs(&[1.0, 5.0, 3.0, 4.0]);
chains.bubble_down_after_trim(1);
let perfs: Vec<f64> = (0..chains.len()).map(|i| chains[i].gas_perf).collect();
assert_eq!(perfs, vec![1.0, 3.0, 4.0, 5.0]);
}

#[test]
fn was_republished_reflects_replace_with() {
Expand Down
Loading