Skip to content

Commit f4cd072

Browse files
committed
feat: Implement parallel filter matching
1 parent cf751cd commit f4cd072

File tree

9 files changed

+381
-6
lines changed

9 files changed

+381
-6
lines changed

dash-spv/src/client/block_processor_test.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod tests {
77
use crate::storage::DiskStorageManager;
88
use crate::types::{SpvEvent, SpvStats};
99
use dashcore::{blockdata::constants::genesis_block, Block, Network, Transaction};
10-
10+
use key_wallet_manager::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput};
1111
use std::sync::Arc;
1212
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
1313

@@ -72,6 +72,10 @@ mod tests {
7272
true
7373
}
7474

75+
async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput {
76+
input.keys().cloned().collect()
77+
}
78+
7579
async fn describe(&self, _network: Network) -> String {
7680
"MockWallet (test implementation)".to_string()
7781
}
@@ -297,6 +301,10 @@ mod tests {
297301
false
298302
}
299303

304+
async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput {
305+
FilterMatchOutput::new()
306+
}
307+
300308
async fn describe(&self, _network: Network) -> String {
301309
"NonMatchingWallet (test implementation)".to_string()
302310
}

dash-spv/src/sync/filters/matching.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@
88
//! - Efficient filter matching using BIP158 algorithms
99
//! - Block download coordination for matches
1010
11+
use crate::error::{SyncError, SyncResult};
12+
use crate::network::NetworkManager;
13+
use crate::storage::StorageManager;
1114
use dashcore::{
1215
bip158::{BlockFilterReader, Error as Bip158Error},
1316
network::message::NetworkMessage,
1417
network::message_blockdata::Inventory,
1518
BlockHash, ScriptBuf,
1619
};
17-
18-
use crate::error::{SyncError, SyncResult};
19-
use crate::network::NetworkManager;
20-
use crate::storage::StorageManager;
20+
use key_wallet_manager::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput};
2121

2222
impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
2323
super::manager::FilterSyncManager<S, N>
@@ -44,6 +44,16 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
4444
}
4545
}
4646

47+
pub async fn check_filters_for_matches<
48+
W: key_wallet_manager::wallet_interface::WalletInterface,
49+
>(
50+
&self,
51+
input_map: FilterMatchInput,
52+
wallet: &W,
53+
) -> FilterMatchOutput {
54+
wallet.check_compact_filters(input_map).await
55+
}
56+
4757
/// Check if filter matches any of the provided scripts using BIP158 GCS filter.
4858
#[allow(dead_code)]
4959
fn filter_matches_scripts(

key-wallet-manager/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti
2424
async-trait = "0.1"
2525
bincode = { version = "=2.0.0-rc.3", optional = true }
2626
zeroize = { version = "1.8", features = ["derive"] }
27+
rayon = "1.11"
2728

2829
[dev-dependencies]
2930
hex = "0.4"
3031
serde_json = "1.0"
32+
dashcore-test-utils = { path = "../test-utils" }
3133
tokio = { version = "1.32", features = ["full"] }
3234

3335
[lints.rust]

key-wallet-manager/src/wallet_interface.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,30 @@
22
//!
33
//! This module defines the trait that SPV clients use to interact with wallets.
44
5+
use crate::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput};
56
use alloc::string::String;
7+
use alloc::vec::Vec;
68
use async_trait::async_trait;
79
use dashcore::bip158::BlockFilter;
810
use dashcore::prelude::CoreBlockHeight;
9-
use dashcore::{Block, Transaction, Txid};
11+
use dashcore::{Block, BlockHash, Transaction, Txid};
1012
use key_wallet::Network;
1113

14+
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
15+
pub struct FilterMatchKey {
16+
pub block_height: CoreBlockHeight,
17+
pub block_hash: BlockHash,
18+
}
19+
20+
impl FilterMatchKey {
21+
pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self {
22+
Self {
23+
block_height: height,
24+
block_hash: hash,
25+
}
26+
}
27+
}
28+
1229
/// Trait for wallet implementations to receive SPV events
1330
#[async_trait]
1431
pub trait WalletInterface: Send + Sync {
@@ -41,6 +58,10 @@ pub trait WalletInterface: Send + Sync {
4158
network: Network,
4259
) -> bool;
4360

61+
/// Check compact filters against watched addresses in batch
62+
/// Returns map of filter keys to match results
63+
async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput;
64+
4465
/// Return the wallet's per-transaction net change and involved addresses if known.
4566
/// Returns (net_amount, addresses) where net_amount is received - sent in satoshis.
4667
/// If the wallet has no record for the transaction, returns None.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use crate::wallet_interface::FilterMatchKey;
2+
use alloc::vec::Vec;
3+
use dashcore::bip158::BlockFilter;
4+
use dashcore::Address;
5+
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
6+
use std::collections::{BTreeSet, HashMap};
7+
8+
pub type FilterMatchInput = HashMap<FilterMatchKey, BlockFilter>;
9+
pub type FilterMatchOutput = BTreeSet<FilterMatchKey>;
10+
11+
/// Check compact filters for addresses and return the keys that matched.
12+
pub fn check_compact_filters_for_addresses(
13+
input: FilterMatchInput,
14+
addresses: Vec<Address>,
15+
) -> FilterMatchOutput {
16+
let script_pubkey_bytes: Vec<Vec<u8>> =
17+
addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect();
18+
19+
input
20+
.into_par_iter()
21+
.filter_map(|(key, filter)| {
22+
filter
23+
.match_any(&key.block_hash, script_pubkey_bytes.iter().map(|v| v.as_slice()))
24+
.unwrap_or(false)
25+
.then_some(key)
26+
})
27+
.collect()
28+
}
29+
30+
#[cfg(test)]
31+
mod tests {
32+
use super::*;
33+
use dashcore::blockdata::script::ScriptBuf;
34+
use dashcore_test_utils::{
35+
create_filter_for_block, create_test_block, create_test_transaction_to_script, test_address,
36+
};
37+
38+
#[test]
39+
fn test_empty_input_returns_empty() {
40+
let result = check_compact_filters_for_addresses(FilterMatchInput::new(), vec![]);
41+
assert!(result.is_empty());
42+
}
43+
44+
#[test]
45+
fn test_empty_addresses_returns_empty() {
46+
let tx = create_test_transaction_to_script(ScriptBuf::new());
47+
let block = create_test_block(100, vec![tx]);
48+
let filter = create_filter_for_block(&block);
49+
let key = FilterMatchKey::new(100, block.block_hash());
50+
51+
let mut input = FilterMatchInput::new();
52+
input.insert(key.clone(), filter);
53+
54+
let output = check_compact_filters_for_addresses(input, vec![]);
55+
assert!(!output.contains(&key));
56+
}
57+
58+
#[test]
59+
fn test_matching_filter() {
60+
let address = test_address(0);
61+
62+
let tx = create_test_transaction_to_script(address.script_pubkey());
63+
let block = create_test_block(100, vec![tx]);
64+
let filter = create_filter_for_block(&block);
65+
let key = FilterMatchKey::new(100, block.block_hash());
66+
67+
let mut input = FilterMatchInput::new();
68+
input.insert(key.clone(), filter);
69+
70+
let output = check_compact_filters_for_addresses(input, vec![address]);
71+
assert!(output.contains(&key));
72+
}
73+
74+
#[test]
75+
fn test_non_matching_filter() {
76+
let address = test_address(0);
77+
let other_address = test_address(1);
78+
79+
let tx = create_test_transaction_to_script(other_address.script_pubkey());
80+
let block = create_test_block(100, vec![tx]);
81+
let filter = create_filter_for_block(&block);
82+
let key = FilterMatchKey::new(100, block.block_hash());
83+
84+
let mut input = FilterMatchInput::new();
85+
input.insert(key.clone(), filter);
86+
87+
let output = check_compact_filters_for_addresses(input, vec![address]);
88+
assert!(!output.contains(&key));
89+
}
90+
91+
#[test]
92+
fn test_batch_mixed_results() {
93+
let address1 = test_address(0);
94+
let address2 = test_address(1);
95+
let unrelated_address = test_address(2);
96+
97+
let tx1 = create_test_transaction_to_script(address1.script_pubkey());
98+
let block1 = create_test_block(100, vec![tx1]);
99+
let filter1 = create_filter_for_block(&block1);
100+
let key1 = FilterMatchKey::new(100, block1.block_hash());
101+
102+
let tx2 = create_test_transaction_to_script(address2.script_pubkey());
103+
let block2 = create_test_block(200, vec![tx2]);
104+
let filter2 = create_filter_for_block(&block2);
105+
let key2 = FilterMatchKey::new(200, block2.block_hash());
106+
107+
let tx3 = create_test_transaction_to_script(unrelated_address.script_pubkey());
108+
let block3 = create_test_block(300, vec![tx3]);
109+
let filter3 = create_filter_for_block(&block3);
110+
let key3 = FilterMatchKey::new(300, block3.block_hash());
111+
112+
let mut input = FilterMatchInput::new();
113+
input.insert(key1.clone(), filter1);
114+
input.insert(key2.clone(), filter2);
115+
input.insert(key3.clone(), filter3);
116+
117+
let output = check_compact_filters_for_addresses(input, vec![address1, address2]);
118+
assert_eq!(output.len(), 2);
119+
assert!(output.contains(&key1));
120+
assert!(output.contains(&key2));
121+
assert!(!output.contains(&key3));
122+
}
123+
}

key-wallet-manager/src/wallet_manager/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! each of which can have multiple accounts. This follows the architecture
55
//! pattern where a manager oversees multiple distinct wallets.
66
7+
pub mod matching;
78
mod process_block;
89
mod transaction_building;
910

key-wallet-manager/src/wallet_manager/process_block.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use crate::wallet_interface::WalletInterface;
2+
use crate::wallet_manager::matching::{
3+
check_compact_filters_for_addresses, FilterMatchInput, FilterMatchOutput,
4+
};
25
use crate::{Network, WalletManager};
36
use alloc::string::String;
47
use alloc::vec::Vec;
@@ -127,6 +130,10 @@ impl<T: WalletInfoInterface + Send + Sync + 'static> WalletInterface for WalletM
127130
hit
128131
}
129132

133+
async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput {
134+
check_compact_filters_for_addresses(input, self.monitored_addresses())
135+
}
136+
130137
async fn transaction_effect(
131138
&self,
132139
tx: &Transaction,

0 commit comments

Comments
 (0)