Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ impl BiddingService for BiddingServiceClientAdapter {
.send(BiddingServiceClientCommand::UpdateFailedReadingNewLandedBlocks);
}

fn observe_relay_bids(&self, bid_with_stats: ScrapedRelayBlockBidWithStats) {
self.scraped_bids_publisher.send(bid_with_stats.clone());
fn observe_relay_bids(&self, bid_with_stats: Vec<ScrapedRelayBlockBidWithStats>) {
for bid in bid_with_stats {
self.scraped_bids_publisher.send(bid);
}
}
Comment on lines +329 to 333
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client adapter still sends bids one-by-one over the channel. The new send_many / try_recv batching on the publisher thread partially compensates for this, but if the publisher thread drains faster than bids arrive, each bid still results in a separate IPC publish + notify cycle. This is fine for now since the publisher-side batching via try_recv handles the hot path, but worth noting the optimization boundary.

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,40 @@ impl<ItemTypeRPC: std::fmt::Debug + ZeroCopySend + 'static> NotifyingPublisher<I
})
}

pub fn send(&self, item: ItemTypeRPC) -> Result<(), Error> {
fn publish_item(&self, item: ItemTypeRPC) -> Result<(), Error> {
let sample = self.publisher.loan_uninit()?;
let sample = sample.write_payload(item);
sample.send()?;
Ok(())
}

pub fn send(&self, item: ItemTypeRPC) -> Result<(), Error> {
self.publish_item(item)?;
self.notifier.notify()?;
Ok(())
}

pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> {
let mut error_count = 0;
let item_count = items.len();
let mut publish_item_last_err = None;
for item in items {
if let Err(err) = self.publish_item(item) {
error_count += 1;
publish_item_last_err = Some(err);
}
}
if error_count != item_count {
self.notifier.notify()?;
}
match publish_item_last_err {
Some(err) => {
error!(error_count,publish_item_last_err = ?err, "send_many failed to publish some items",);
Err(err)
}
None => Ok(()),
}
}
}

/// struct to publish ScrapedRelayBlockBidWithStats to the bidding service.
Expand Down Expand Up @@ -262,8 +289,12 @@ impl ScrapedBidsPublisher {
}
};
while let Ok(scraped_bid) = scraped_bids_rx.recv() {
if let Err(err) = notifying_publisher.send(scraped_bid) {
error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send failed. Bid lost.");
let mut bids = vec![scraped_bid];
while let Ok(extra) = scraped_bids_rx.try_recv() {
bids.push(extra);
}
Comment on lines +292 to +295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try_recv drain loop has no upper bound. Under sustained high bid throughput, this could accumulate an unbounded number of bids in memory before any are published, causing latency spikes and memory pressure. Consider capping the batch size:

Suggested change
let mut bids = vec![scraped_bid];
while let Ok(extra) = scraped_bids_rx.try_recv() {
bids.push(extra);
}
let mut bids = vec![scraped_bid];
while bids.len() < 1024 {
match scraped_bids_rx.try_recv() {
Ok(extra) => bids.push(extra),
Err(_) => break,
}
}

if let Err(err) = notifying_publisher.send_many(bids) {
error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send_many failed. Some bids lost.");
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: std::fmt::Debug + ZeroCopySend + Copy> SubscriberPoller<T> {

/// Poll the subscriber and calls process_sample on each sample.
/// Stops polling on any error.
pub fn poll(&mut self, process_sample: impl Fn(T)) -> Result<(), Error> {
pub fn poll(&mut self, mut process_sample: impl FnMut(T)) -> Result<(), Error> {
while let Some(sample) = self.subscriber.receive()? {
process_sample(*sample);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ impl BiddingServiceServerInner {
}

/// Forward to bidding_service
pub fn update_new_bid(&mut self, bid: ScrapedRelayBlockBidRPC) {
pub fn update_new_bids(&mut self, bids: Vec<ScrapedRelayBlockBidRPC>) {
self.service
.bidding_service()
.observe_relay_bids(bid.into());
.observe_relay_bids(bids.into_iter().map(|b| b.into()).collect());
}
}

Expand Down Expand Up @@ -238,11 +238,15 @@ fn spawn_scraped_bids_and_blocks_subscriber(
init_done.set(Ok(()));
while !cancellation_token.is_cancelled() {
if let Ok(Some(_event_id)) = listener.timed_wait_one(THREAD_BLOCKING_DURATION) {
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
inner.lock().update_new_bid(sample);
bids.push(sample);
}) {
error!(err=?err, "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}
Comment on lines +241 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When poll errors partway through, bids will contain only the items collected before the error. These are still forwarded to update_new_bids, which is correct behavior, but the partial batch is silently sent without any indication of truncation. Consider logging the count of bids forwarded alongside the error so operators can correlate bid loss with poll failures.

Comment on lines +241 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When poll errors partway through, bids contains only the items collected before the error β€” these are still forwarded to update_new_bids. This is correct behavior, but without logging the batch size it's hard for operators to tell if bids were lost. Consider:

Suggested change
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
inner.lock().update_new_bid(sample);
bids.push(sample);
}) {
error!(err=?err, "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}
let mut bids = Vec::new();
if let Err(err) = scraped_bids_subscriber.poll(|sample| {
bids.push(sample);
}) {
error!(err=?err, bids_collected = bids.len(), "scraped_bids_subscriber poll failed.");
}
if !bids.is_empty() {
inner.lock().update_new_bids(bids);
}

if let Err(err) = blocks_subscriber.poll(|sample| {
inner.lock().new_block(sample);
}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub trait BiddingService: Send + Sync {
/// Not &[RelaySet] because it caused problems with some Mutex<BiddingService>.
fn relay_sets(&self) -> Vec<RelaySet>;

fn observe_relay_bids(&self, bid: ScrapedRelayBlockBidWithStats);
fn observe_relay_bids(&self, bid: Vec<ScrapedRelayBlockBidWithStats>);

fn update_new_landed_blocks_detected(&self, landed_blocks: &[LandedBlockInfo]);

Expand All @@ -278,7 +278,7 @@ impl BidSender for BiddingService2BidSender {
fn send(&self, bid: ScrapedRelayBlockBid) -> Result<(), BidSenderError> {
inc_bids_received(&bid);
self.inner
.observe_relay_bids(ScrapedRelayBlockBidWithStats::new(bid));
.observe_relay_bids(vec![ScrapedRelayBlockBidWithStats::new(bid)]);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl BiddingService for NewTrueBlockValueBiddingService {
self.relay_sets_subsidies.keys().cloned().collect()
}

fn observe_relay_bids(&self, _bid: ScrapedRelayBlockBidWithStats) {}
fn observe_relay_bids(&self, _bid: Vec<ScrapedRelayBlockBidWithStats>) {}

fn update_new_landed_blocks_detected(&self, _landed_blocks: &[LandedBlockInfo]) {}

Expand Down
Loading