From 808014b75f9e6eafe2c4b83010bfafde21e674c9 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 11 Mar 2026 11:53:37 -0300 Subject: [PATCH 1/3] bids bulk --- .../client/bidding_service_client_adapter.rs | 6 ++++-- .../fast_streams/subscriber_poller.rs | 2 +- .../server/bidding_service_server_adapter.rs | 10 +++++++--- .../block_output/bidding_service_interface.rs | 4 ++-- .../block_output/true_value_bidding_service.rs | 2 +- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs index 5b0a6cc76..4616ccff7 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rs @@ -326,7 +326,9 @@ impl BiddingService for BiddingServiceClientAdapter { .send(BiddingServiceClientCommand::UpdateFailedReadingNewLandedBlocks); } - fn observe_relay_bids(&self, bid_with_stats: ScrapedRelayBlockBidWithStats) { - self.scraped_bids_publisher.send(bid_with_stats.clone()); + fn observe_relay_bids(&self, bid_with_stats: Vec) { + for bid in bid_with_stats { + self.scraped_bids_publisher.send(bid); + } } } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/subscriber_poller.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/subscriber_poller.rs index 8ffc9dd8d..0ab53aafe 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/subscriber_poller.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/subscriber_poller.rs @@ -71,7 +71,7 @@ impl SubscriberPoller { /// Poll the subscriber and calls process_sample on each sample. /// Stops polling on any error. - pub fn poll(&mut self, process_sample: impl Fn(T)) -> Result<(), Error> { + pub fn poll(&mut self, mut process_sample: impl FnMut(T)) -> Result<(), Error> { while let Some(sample) = self.subscriber.receive()? { process_sample(*sample); } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/server/bidding_service_server_adapter.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/server/bidding_service_server_adapter.rs index 494f75159..719e3ab39 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/server/bidding_service_server_adapter.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/server/bidding_service_server_adapter.rs @@ -63,10 +63,10 @@ impl BiddingServiceServerInner { } /// Forward to bidding_service - pub fn update_new_bid(&mut self, bid: ScrapedRelayBlockBidRPC) { + pub fn update_new_bids(&mut self, bids: Vec) { self.service .bidding_service() - .observe_relay_bids(bid.into()); + .observe_relay_bids(bids.into_iter().map(|b| b.into()).collect()); } } @@ -238,11 +238,15 @@ fn spawn_scraped_bids_and_blocks_subscriber( init_done.set(Ok(())); while !cancellation_token.is_cancelled() { if let Ok(Some(_event_id)) = listener.timed_wait_one(THREAD_BLOCKING_DURATION) { + let mut bids = Vec::new(); if let Err(err) = scraped_bids_subscriber.poll(|sample| { - inner.lock().update_new_bid(sample); + bids.push(sample); }) { error!(err=?err, "scraped_bids_subscriber poll failed."); } + if !bids.is_empty() { + inner.lock().update_new_bids(bids); + } if let Err(err) = blocks_subscriber.poll(|sample| { inner.lock().new_block(sample); }) { diff --git a/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs b/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs index 8a0486067..b1b6d250f 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding_service_interface.rs @@ -257,7 +257,7 @@ pub trait BiddingService: Send + Sync { /// Not &[RelaySet] because it caused problems with some Mutex. fn relay_sets(&self) -> Vec; - fn observe_relay_bids(&self, bid: ScrapedRelayBlockBidWithStats); + fn observe_relay_bids(&self, bid: Vec); fn update_new_landed_blocks_detected(&self, landed_blocks: &[LandedBlockInfo]); @@ -278,7 +278,7 @@ impl BidSender for BiddingService2BidSender { fn send(&self, bid: ScrapedRelayBlockBid) -> Result<(), BidSenderError> { inc_bids_received(&bid); self.inner - .observe_relay_bids(ScrapedRelayBlockBidWithStats::new(bid)); + .observe_relay_bids(vec![ScrapedRelayBlockBidWithStats::new(bid)]); Ok(()) } } diff --git a/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs b/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs index 0c3e45a73..f861c69ed 100644 --- a/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs +++ b/crates/rbuilder/src/live_builder/block_output/true_value_bidding_service.rs @@ -100,7 +100,7 @@ impl BiddingService for NewTrueBlockValueBiddingService { self.relay_sets_subsidies.keys().cloned().collect() } - fn observe_relay_bids(&self, _bid: ScrapedRelayBlockBidWithStats) {} + fn observe_relay_bids(&self, _bid: Vec) {} fn update_new_landed_blocks_detected(&self, _landed_blocks: &[LandedBlockInfo]) {} From 9dc8b2150e5b35637bcf3402b052fe67e38784db Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Fri, 13 Mar 2026 09:12:27 -0300 Subject: [PATCH 2/3] batch scraped bids publishing to reduce notification overhead --- .../fast_streams/helpers.rs | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs index 4c7883cb6..77a94540e 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs @@ -222,13 +222,36 @@ impl NotifyingPublisher Result<(), Error> { + fn publish_item(&self, item: ItemTypeRPC) -> Result<(), Error> { let sample = self.publisher.loan_uninit()?; let sample = sample.write_payload(item); sample.send()?; + Ok(()) + } + + pub fn send(&self, item: ItemTypeRPC) -> Result<(), Error> { + self.publish_item(item)?; self.notifier.notify()?; Ok(()) } + + pub fn send_many(&self, items: Vec) -> Result<(), Error> { + let mut some_sent = false; + let mut publish_item_err = None; + for item in items { + match self.publish_item(item) { + Ok(_) => some_sent = true, + Err(err) => publish_item_err = Some(err), + } + } + if some_sent { + self.notifier.notify()?; + } + match publish_item_err { + Some(err) => Err(err), + None => Ok(()), + } + } } /// struct to publish ScrapedRelayBlockBidWithStats to the bidding service. @@ -262,8 +285,12 @@ impl ScrapedBidsPublisher { } }; while let Ok(scraped_bid) = scraped_bids_rx.recv() { - if let Err(err) = notifying_publisher.send(scraped_bid) { - error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send failed. Bid lost."); + let mut bids = vec![scraped_bid]; + while let Ok(extra) = scraped_bids_rx.try_recv() { + bids.push(extra); + } + if let Err(err) = notifying_publisher.send_many(bids) { + error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send_many failed. Some bids lost."); } } }); From 219c81ed1305d6dbc3a3fff3e1204fc715d2bbbd Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Fri, 13 Mar 2026 09:24:42 -0300 Subject: [PATCH 3/3] better error trace --- .../fast_streams/helpers.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs index 77a94540e..e48581265 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rs @@ -236,19 +236,23 @@ impl NotifyingPublisher) -> Result<(), Error> { - let mut some_sent = false; - let mut publish_item_err = None; + let mut error_count = 0; + let item_count = items.len(); + let mut publish_item_last_err = None; for item in items { - match self.publish_item(item) { - Ok(_) => some_sent = true, - Err(err) => publish_item_err = Some(err), + if let Err(err) = self.publish_item(item) { + error_count += 1; + publish_item_last_err = Some(err); } } - if some_sent { + if error_count != item_count { self.notifier.notify()?; } - match publish_item_err { - Some(err) => Err(err), + match publish_item_last_err { + Some(err) => { + error!(error_count,publish_item_last_err = ?err, "send_many failed to publish some items",); + Err(err) + } None => Ok(()), } }