diff --git a/src/daemon.rs b/src/daemon.rs index 9b4bb6478..211f7d94d 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -639,7 +639,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. #[trace] - pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; let params_list: Vec = txids diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 69e46199a..c5788e683 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -516,99 +516,112 @@ impl Mempool { daemon: &Daemon, tip: &BlockHash, ) -> Result { - let _timer = mempool - .read() - .unwrap() - .latency - .with_label_values(&["update"]) - .start_timer(); + let (_timer, count) = { + let mempool = mempool.read().unwrap(); + let timer = mempool.latency.with_label_values(&["update"]).start_timer(); + (timer, mempool.count.clone()) + }; - // Continuously attempt to fetch mempool transactions until we're able to get them in full - let mut fetched_txs = HashMap::::new(); - let mut indexed_txids = mempool.read().unwrap().txids_set(); - loop { - // Get bitcoind's current list of mempool txids - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - - // Remove evicted mempool transactions - mempool - .write() - .unwrap() - .remove(indexed_txids.difference(&all_txids).collect()); - - indexed_txids.retain(|txid| all_txids.contains(txid)); - fetched_txs.retain(|txid, _| all_txids.contains(txid)); - - // Fetch missing transactions from bitcoind - let new_txids = all_txids - .iter() - .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) - .collect::>(); - if new_txids.is_empty() { - break; - } - debug!( - "mempool with total {} txs, {} fetched, {} missing", - all_txids.len(), - indexed_txids.len() + fetched_txs.len(), - new_txids.len() - ); + // Get bitcoind's current list of mempool txids + let bitcoind_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + + // Get the list of mempool txids in the local mempool view + let indexed_txids = mempool.read().unwrap().txids_set(); + + // Remove evicted mempool transactions from the local mempool view + let evicted_txids = indexed_txids + .difference(&bitcoind_txids) + .collect::>(); + if !evicted_txids.is_empty() { + mempool.write().unwrap().remove(evicted_txids); + } // avoids acquiring a lock when there are no evictions + + // Find transactions available in bitcoind's mempool but not indexed locally + let new_txids = bitcoind_txids + .difference(&indexed_txids) + .collect::>(); + + debug!( + "mempool with total {} txs, {} indexed locally, {} to fetch", + bitcoind_txids.len(), + indexed_txids.len(), + new_txids.len() + ); + count + .with_label_values(&["all_txs"]) + .set(bitcoind_txids.len() as f64); + count + .with_label_values(&["indexed_txs"]) + .set(indexed_txids.len() as f64); + count + .with_label_values(&["missing_txs"]) + .set(new_txids.len() as f64); + + if new_txids.is_empty() { + return Ok(true); + } - { - let mempool = mempool.read().unwrap(); - - mempool - .count - .with_label_values(&["all_txs"]) - .set(all_txids.len() as f64); - mempool - .count - .with_label_values(&["fetched_txs"]) - .set((indexed_txids.len() + fetched_txs.len()) as f64); - mempool - .count - .with_label_values(&["missing_txs"]) - .set(new_txids.len() as f64); - } + // Fetch missing transactions from bitcoind + let mut fetched_txs = daemon.gettransactions_available(&new_txids)?; - let new_txs = daemon.gettransactions_available(&new_txids)?; + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } - // Abort if the chain tip moved while fetching transactions - if daemon.getbestblockhash()? != *tip { - warn!("chain tip moved while updating mempool"); - return Ok(false); - } + // Find which transactions were requested but are no longer available in bitcoind's mempool, + // typically due to Replace-By-Fee (or mempool eviction for some other reason) taking place + // between querying for the mempool txids and querying for the transactions themselves. + let mut replaced_txids: HashSet<_> = new_txids + .into_iter() + .filter(|txid| !fetched_txs.contains_key(*txid)) + .cloned() + .collect(); - let fetched_count = new_txs.len(); - fetched_txs.extend(new_txs); + if replaced_txids.is_empty() { + trace!("fetched complete mempool snapshot"); + } else { + warn!( + "could not to fetch {} replaced/evicted mempool transactions: {:?}", + replaced_txids.len(), + replaced_txids.iter().take(10).collect::>() + ); + } - // Retry if any transactions were evicted form the mempool before we managed to get them - if fetched_count != new_txids.len() { - warn!( - "failed to fetch {} mempool txs, retrying...", - new_txids.len() - fetched_count - ); - let missing_txids: Vec<_> = new_txids + // If we were unable to get a complete consistent snapshot of the bitcoind mempool, + // detect and remove any transactions that spend from the missing (replaced) transactions + // or any of their descendants. This is necessary because it could be possible to fetch the + // child tx successfully before the parent is replaced, but miss the replaced parent tx. + while !replaced_txids.is_empty() { + let mut descendants_txids = HashSet::new(); + fetched_txs.retain(|txid, tx| { + let parent_was_replaced = tx + .input .iter() - .filter(|txid| !fetched_txs.contains_key(**txid)) - .take(10) - .collect(); - warn!("missing mempool txids: {:?} (capped at 10)", missing_txids); - } else { - break; - } + .any(|txin| replaced_txids.contains(&txin.previous_output.txid)); + if parent_was_replaced { + descendants_txids.insert(*txid); + } + !parent_was_replaced + }); + trace!( + "detected {} replaced mempool descendants", + descendants_txids.len() + ); + replaced_txids = descendants_txids; } // Add fetched transactions to our view of the mempool - { + trace!("indexing {} new mempool transactions", fetched_txs.len()); + if !fetched_txs.is_empty() { let mut mempool = mempool.write().unwrap(); mempool.add(fetched_txs)?; - mempool - .count + count .with_label_values(&["txs"]) .set(mempool.txstore.len() as f64);