-
Notifications
You must be signed in to change notification settings - Fork 195
Bulk bid processing #892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk bid processing #892
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -222,13 +222,40 @@ impl<ItemTypeRPC: std::fmt::Debug + ZeroCopySend + 'static> NotifyingPublisher<I | |||||||||||||||||||||||
| }) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| pub fn send(&self, item: ItemTypeRPC) -> Result<(), Error> { | ||||||||||||||||||||||||
| fn publish_item(&self, item: ItemTypeRPC) -> Result<(), Error> { | ||||||||||||||||||||||||
| let sample = self.publisher.loan_uninit()?; | ||||||||||||||||||||||||
| let sample = sample.write_payload(item); | ||||||||||||||||||||||||
| sample.send()?; | ||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| pub fn send(&self, item: ItemTypeRPC) -> Result<(), Error> { | ||||||||||||||||||||||||
| self.publish_item(item)?; | ||||||||||||||||||||||||
| self.notifier.notify()?; | ||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| pub fn send_many(&self, items: Vec<ItemTypeRPC>) -> Result<(), Error> { | ||||||||||||||||||||||||
| let mut error_count = 0; | ||||||||||||||||||||||||
| let item_count = items.len(); | ||||||||||||||||||||||||
| let mut publish_item_last_err = None; | ||||||||||||||||||||||||
| for item in items { | ||||||||||||||||||||||||
| if let Err(err) = self.publish_item(item) { | ||||||||||||||||||||||||
| error_count += 1; | ||||||||||||||||||||||||
| publish_item_last_err = Some(err); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if error_count != item_count { | ||||||||||||||||||||||||
| self.notifier.notify()?; | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| match publish_item_last_err { | ||||||||||||||||||||||||
| Some(err) => { | ||||||||||||||||||||||||
| error!(error_count,publish_item_last_err = ?err, "send_many failed to publish some items",); | ||||||||||||||||||||||||
| Err(err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| None => Ok(()), | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| /// struct to publish ScrapedRelayBlockBidWithStats to the bidding service. | ||||||||||||||||||||||||
|
|
@@ -262,8 +289,12 @@ impl ScrapedBidsPublisher { | |||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||
| while let Ok(scraped_bid) = scraped_bids_rx.recv() { | ||||||||||||||||||||||||
| if let Err(err) = notifying_publisher.send(scraped_bid) { | ||||||||||||||||||||||||
| error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send failed. Bid lost."); | ||||||||||||||||||||||||
| let mut bids = vec![scraped_bid]; | ||||||||||||||||||||||||
| while let Ok(extra) = scraped_bids_rx.try_recv() { | ||||||||||||||||||||||||
| bids.push(extra); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+292
to
+295
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||
| if let Err(err) = notifying_publisher.send_many(bids) { | ||||||||||||||||||||||||
| error!(err=?err, "ScrapedBidsPublisher notifying_publisher.send_many failed. Some bids lost."); | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -63,10 +63,10 @@ impl BiddingServiceServerInner { | |||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| /// Forward to bidding_service | ||||||||||||||||||||||||||||||||||||||||
| pub fn update_new_bid(&mut self, bid: ScrapedRelayBlockBidRPC) { | ||||||||||||||||||||||||||||||||||||||||
| pub fn update_new_bids(&mut self, bids: Vec<ScrapedRelayBlockBidRPC>) { | ||||||||||||||||||||||||||||||||||||||||
| self.service | ||||||||||||||||||||||||||||||||||||||||
| .bidding_service() | ||||||||||||||||||||||||||||||||||||||||
| .observe_relay_bids(bid.into()); | ||||||||||||||||||||||||||||||||||||||||
| .observe_relay_bids(bids.into_iter().map(|b| b.into()).collect()); | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
@@ -238,11 +238,15 @@ fn spawn_scraped_bids_and_blocks_subscriber( | |||||||||||||||||||||||||||||||||||||||
| init_done.set(Ok(())); | ||||||||||||||||||||||||||||||||||||||||
| while !cancellation_token.is_cancelled() { | ||||||||||||||||||||||||||||||||||||||||
| if let Ok(Some(_event_id)) = listener.timed_wait_one(THREAD_BLOCKING_DURATION) { | ||||||||||||||||||||||||||||||||||||||||
| let mut bids = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||
| if let Err(err) = scraped_bids_subscriber.poll(|sample| { | ||||||||||||||||||||||||||||||||||||||||
| inner.lock().update_new_bid(sample); | ||||||||||||||||||||||||||||||||||||||||
| bids.push(sample); | ||||||||||||||||||||||||||||||||||||||||
| }) { | ||||||||||||||||||||||||||||||||||||||||
| error!(err=?err, "scraped_bids_subscriber poll failed."); | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| if !bids.is_empty() { | ||||||||||||||||||||||||||||||||||||||||
| inner.lock().update_new_bids(bids); | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+241
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Comment on lines
+241
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
| if let Err(err) = blocks_subscriber.poll(|sample| { | ||||||||||||||||||||||||||||||||||||||||
| inner.lock().new_block(sample); | ||||||||||||||||||||||||||||||||||||||||
| }) { | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client adapter still sends bids one-by-one over the channel. The new
send_many/try_recvbatching on the publisher thread partially compensates for this, but if the publisher thread drains faster than bids arrive, each bid still results in a separate IPC publish + notify cycle. This is fine for now since the publisher-side batching viatry_recvhandles the hot path, but worth noting the optimization boundary.