Skip to content

Commit aa3303e

Browse files
committed
refactor(payjoin): implement polling-based monitoring with timeout
Replace single sync-and-check with periodic polling loop. This allows multiple sync operations since sync_wallet now accepts a reference to BlockchainClient, enabling proper long-running monitoring instead of a one-time check.
1 parent f5985b7 commit aa3303e

File tree

1 file changed

+68
-50
lines changed

1 file changed

+68
-50
lines changed

src/payjoin/mod.rs

Lines changed: 68 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -607,72 +607,90 @@ impl<'a> PayjoinManager<'a> {
607607
.await;
608608
}
609609

610-
/// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the
610+
/// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the
611611
/// sender.
612612
///
613-
/// The currenty implementation does not support checking for the Payjoin broadcast in a loop
614-
/// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`]
615-
/// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and
616-
/// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running
617-
/// tasks.
613+
/// This function syncs the wallet at regular intervals and checks for the Payjoin transaction
614+
/// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts
615+
/// a reference to the BlockchainClient, we can call it multiple times in a loop.
618616
async fn monitor_payjoin_proposal(
619617
&mut self,
620618
receiver: Receiver<Monitor>,
621619
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
622620
blockchain_client: &BlockchainClient,
623621
) -> Result<(), Error> {
624-
let wait_time_for_sync = 3;
625-
let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
622+
let poll_interval = tokio::time::Duration::from_millis(200);
623+
let sync_interval = tokio::time::Duration::from_secs(3);
624+
let timeout_duration = tokio::time::Duration::from_secs(15);
626625

627626
println!(
628-
"Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..."
627+
"Polling for Payjoin transaction broadcast. This may take up to {} seconds...",
628+
timeout_duration.as_secs()
629629
);
630-
tokio::time::sleep(poll_internal).await;
631-
sync_wallet(blockchain_client, self.wallet).await?;
632-
633-
let check_result = receiver
634-
.check_payment(
635-
|txid| {
636-
let Some(tx_details) = self.wallet.tx_details(txid) else {
637-
return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
638-
};
639-
640-
let is_seen = match tx_details.chain_position {
641-
bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
642-
bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
643-
_ => false
644-
};
645-
646-
if is_seen {
647-
return Ok(Some(tx_details.tx.as_ref().clone()));
630+
let result = tokio::time::timeout(timeout_duration, async {
631+
let mut poll_timer = tokio::time::interval(poll_interval);
632+
let mut sync_timer = tokio::time::interval(sync_interval);
633+
poll_timer.tick().await;
634+
sync_timer.tick().await;
635+
sync_wallet(blockchain_client, self.wallet).await?;
636+
637+
loop {
638+
tokio::select! {
639+
_ = poll_timer.tick() => {
640+
// Time to check payment
641+
let check_result = receiver
642+
.check_payment(
643+
|txid| {
644+
let Some(tx_details) = self.wallet.tx_details(txid) else {
645+
return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
646+
};
647+
648+
let is_seen = match tx_details.chain_position {
649+
bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
650+
bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
651+
_ => false
652+
};
653+
654+
if is_seen {
655+
return Ok(Some(tx_details.tx.as_ref().clone()));
656+
}
657+
return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
658+
},
659+
|outpoint| {
660+
let utxo = self.wallet.get_utxo(outpoint);
661+
match utxo {
662+
Some(_) => Ok(false),
663+
None => Ok(true),
664+
}
665+
}
666+
)
667+
.save(persister)
668+
.map_err(|e| {
669+
Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
670+
});
671+
672+
if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
673+
println!("Payjoin transaction detected in the mempool!");
674+
return Ok(());
675+
}
676+
// For Stasis or Err, continue polling (implicit - falls through to next loop iteration)
648677
}
649-
return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
650-
},
651-
|outpoint| {
652-
let utxo = self.wallet.get_utxo(outpoint);
653-
match utxo {
654-
Some(_) => Ok(false),
655-
None => Ok(true),
678+
_ = sync_timer.tick() => {
679+
// Time to sync wallet
680+
sync_wallet(blockchain_client, self.wallet).await?;
656681
}
657682
}
658-
)
659-
.save(persister)
660-
.map_err(|e| {
661-
Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
662-
});
663-
664-
match check_result {
665-
Ok(_) => {
666-
println!("Payjoin transaction detected in the mempool!");
667-
}
668-
Err(_) => {
669-
println!(
670-
"Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command."
671-
);
672683
}
684+
})
685+
.await;
686+
687+
match result {
688+
Ok(ok) => ok,
689+
Err(_) => Err(Error::Generic(format!(
690+
"Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.",
691+
timeout_duration
692+
))),
673693
}
674-
675-
Ok(())
676694
}
677695

678696
async fn handle_error(

0 commit comments

Comments
 (0)