Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions e2e-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ max_client_to_self_delay = 1024
min_payment_size_msat = 0
max_payment_size_msat = 1000000000
client_trusts_lsp = true

[metrics]
enabled = true
poll_metrics_interval = 1
"#,
storage_dir = storage_dir.display(),
);
Expand Down
92 changes: 92 additions & 0 deletions e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,3 +827,95 @@ async fn test_hodl_invoice_fail() {
events_a.iter().map(|e| &e.event).collect::<Vec<_>>()
);
}

#[tokio::test]
async fn test_metrics_endpoint() {
let bitcoind = TestBitcoind::new();

// Test with metrics enabled
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let client = server_a.client();
let metrics_result = client.get_metrics().await;

assert!(metrics_result.is_ok(), "Expected metrics to succeed when enabled");
let metrics = metrics_result.unwrap();

// Verify initial state
assert!(metrics.contains("ldk_server_total_peers_count 0"));
assert!(metrics.contains("ldk_server_total_payments_count 0"));
assert!(metrics.contains("ldk_server_total_successful_payments_count 0"));
assert!(metrics.contains("ldk_server_total_pending_payments_count 0"));
assert!(metrics.contains("ldk_server_total_failed_payments_count 0"));
assert!(metrics.contains("ldk_server_total_channels_count 0"));
assert!(metrics.contains("ldk_server_total_public_channels_count 0"));
assert!(metrics.contains("ldk_server_total_private_channels_count 0"));
assert!(metrics.contains("ldk_server_total_onchain_balance_sats 0"));
assert!(metrics.contains("ldk_server_spendable_onchain_balance_sats 0"));
assert!(metrics.contains("ldk_server_total_anchor_channels_reserve_sats 0"));
assert!(metrics.contains("ldk_server_total_lightning_balance_sats 0"));

// Set up channel and make a payment to trigger metrics update
setup_funded_channel(&bitcoind, &server_a, &server_b, 100_000).await;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to check that the channel (private/public) count, our LN balance, and num peers, goes up after opening a channel. It also sends funds to the nodes for anchor channels so we should be able to verify our on-chain balance numbers change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the metrics after the channel open but before the receive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also do a receive and check the metrics after that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the metrics after the channel open but before the receive

We'll only be able to check the channels count metric as that is the only metric dependent on the ChannelReady event. But we can check other metrics if we poll.

Can we also do a receive and check the metrics after that.

This is what we're doing right now.

// Poll for channel, peer and balance metrics.
let timeout = Duration::from_secs(10);
let start = std::time::Instant::now();
loop {
let metrics = client.get_metrics().await.unwrap();
if metrics.contains("ldk_server_total_peers_count 1")
&& metrics.contains("ldk_server_total_channels_count 1")
&& metrics.contains("ldk_server_total_public_channels_count 1")
&& metrics.contains("ldk_server_total_payments_count 2")
&& !metrics.contains("ldk_server_total_lightning_balance_sats 0")
&& !metrics.contains("ldk_server_total_onchain_balance_sats 0")
&& !metrics.contains("ldk_server_spendable_onchain_balance_sats 0")
&& !metrics.contains("ldk_server_total_anchor_channels_reserve_sats 0")
{
break;
}

if start.elapsed() > timeout {
let current_metrics = client.get_metrics().await.unwrap();
panic!(
"Timed out waiting for channel, peer and balance metrics to update. Current metrics:\n{}",
current_metrics
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

let invoice_resp = server_b
.client()
.bolt11_receive(Bolt11ReceiveRequest {
amount_msat: Some(10_000_000),
description: Some(Bolt11InvoiceDescription {
kind: Some(bolt11_invoice_description::Kind::Direct("metrics test".to_string())),
}),
expiry_secs: 3600,
})
.await
.unwrap();

run_cli(&server_a, &["bolt11-send", &invoice_resp.invoice]);

// Wait to receive the PaymentSuccessful event and update metrics
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
loop {
let metrics = client.get_metrics().await.unwrap();
if metrics.contains("ldk_server_total_successful_payments_count 1")
&& !metrics.contains("ldk_server_total_lightning_balance_sats 0")
&& !metrics.contains("ldk_server_total_onchain_balance_sats 0")
&& !metrics.contains("ldk_server_spendable_onchain_balance_sats 0")
&& !metrics.contains("ldk_server_total_anchor_channels_reserve_sats 0")
{
break;
}
if start.elapsed() > timeout {
panic!("Timed out waiting for payment metrics to update");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
80 changes: 56 additions & 24 deletions ldk-server-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ use ldk_server_protos::endpoints::{
BOLT11_RECEIVE_PATH, BOLT11_RECEIVE_VARIABLE_AMOUNT_VIA_JIT_CHANNEL_PATH,
BOLT11_RECEIVE_VIA_JIT_CHANNEL_PATH, BOLT11_SEND_PATH, BOLT12_RECEIVE_PATH, BOLT12_SEND_PATH,
CLOSE_CHANNEL_PATH, CONNECT_PEER_PATH, DISCONNECT_PEER_PATH, EXPORT_PATHFINDING_SCORES_PATH,
FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_NODE_INFO_PATH, GET_PAYMENT_DETAILS_PATH,
GRAPH_GET_CHANNEL_PATH, GRAPH_GET_NODE_PATH, GRAPH_LIST_CHANNELS_PATH, GRAPH_LIST_NODES_PATH,
LIST_CHANNELS_PATH, LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, LIST_PEERS_PATH,
ONCHAIN_RECEIVE_PATH, ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH,
SPLICE_OUT_PATH, SPONTANEOUS_SEND_PATH, UNIFIED_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH,
VERIFY_SIGNATURE_PATH,
FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_METRICS_PATH, GET_NODE_INFO_PATH,
GET_PAYMENT_DETAILS_PATH, GRAPH_GET_CHANNEL_PATH, GRAPH_GET_NODE_PATH,
GRAPH_LIST_CHANNELS_PATH, GRAPH_LIST_NODES_PATH, LIST_CHANNELS_PATH,
LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, LIST_PEERS_PATH, ONCHAIN_RECEIVE_PATH,
ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH, SPLICE_OUT_PATH,
SPONTANEOUS_SEND_PATH, UNIFIED_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH, VERIFY_SIGNATURE_PATH,
};
use ldk_server_protos::error::{ErrorCode, ErrorResponse};
use prost::bytes::Bytes;
use prost::Message;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Certificate, Client};
Expand All @@ -69,6 +70,11 @@ pub struct LdkServerClient {
api_key: String,
}

enum RequestType {
Get,
Post,
}

impl LdkServerClient {
/// Constructs a [`LdkServerClient`] using `base_url` as the ldk-server endpoint.
///
Expand Down Expand Up @@ -114,6 +120,18 @@ impl LdkServerClient {
self.post_request(&request, &url).await
}

/// Retrieve the node metrics in Prometheus format.
pub async fn get_metrics(&self) -> Result<String, LdkServerError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a raw string, really should be decoded into the Response type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really should be decoded into the Response type

The Response type is protobuf, but Promotheus scrapers needs the endpoint to return plain-text

let url = format!("https://{}/{GET_METRICS_PATH}", self.base_url);
let payload = self.make_request(&url, RequestType::Get, None, false).await?;
String::from_utf8(payload.to_vec()).map_err(|e| {
LdkServerError::new(
InternalError,
format!("Failed to decode metrics response as string: {}", e),
)
})
}

/// Retrieves an overview of all known balances.
/// For API contract/usage, refer to docs for [`GetBalancesRequest`] and [`GetBalancesResponse`].
pub async fn get_balances(
Expand Down Expand Up @@ -431,31 +449,45 @@ impl LdkServerClient {
&self, request: &Rq, url: &str,
) -> Result<Rs, LdkServerError> {
let request_body = request.encode_to_vec();
let auth_header = self.compute_auth_header(&request_body);
let response_raw = self
.client
.post(url)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.header("X-Auth", auth_header)
.body(request_body)
.send()
.await
.map_err(|e| {
LdkServerError::new(InternalError, format!("HTTP request failed: {}", e))
})?;
let payload = self.make_request(url, RequestType::Post, Some(request_body), true).await?;
Rs::decode(&payload[..]).map_err(|e| {
LdkServerError::new(InternalError, format!("Failed to decode success response: {}", e))
})
}

async fn make_request(
&self, url: &str, request_type: RequestType, body: Option<Vec<u8>>, authenticated: bool,
) -> Result<Bytes, LdkServerError> {
let builder = match request_type {
RequestType::Get => self.client.get(url),
RequestType::Post => self.client.post(url),
};

let builder = if authenticated {
let body_for_auth = body.as_deref().unwrap_or(&[]);
let auth_header = self.compute_auth_header(body_for_auth);
builder.header("X-Auth", auth_header)
} else {
builder
};

let builder = if let Some(body_content) = body {
builder.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM).body(body_content)
} else {
builder
};

let response_raw = builder.send().await.map_err(|e| {
LdkServerError::new(InternalError, format!("HTTP request failed: {}", e))
})?;

let status = response_raw.status();
let payload = response_raw.bytes().await.map_err(|e| {
LdkServerError::new(InternalError, format!("Failed to read response body: {}", e))
})?;

if status.is_success() {
Ok(Rs::decode(&payload[..]).map_err(|e| {
LdkServerError::new(
InternalError,
format!("Failed to decode success response: {}", e),
)
})?)
Ok(payload)
} else {
let error_response = ErrorResponse::decode(&payload[..]).map_err(|e| {
LdkServerError::new(
Expand Down
1 change: 1 addition & 0 deletions ldk-server-protos/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ pub const GRAPH_LIST_CHANNELS_PATH: &str = "GraphListChannels";
pub const GRAPH_GET_CHANNEL_PATH: &str = "GraphGetChannel";
pub const GRAPH_LIST_NODES_PATH: &str = "GraphListNodes";
pub const GRAPH_GET_NODE_PATH: &str = "GraphGetNode";
pub const GET_METRICS_PATH: &str = "metrics";
5 changes: 5 additions & 0 deletions ldk-server/ldk-server-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,8 @@ client_trusts_lsp = false
## A token we may require to be sent by the clients.
## If set, only requests matching this token will be accepted. (uncomment and set if required)
# require_token = ""

# Metrics settings
[metrics]
enabled = false
poll_metrics_interval = 60 # The polling interval for metrics in seconds. Defaults to 60secs if unset and metrics enabled.
57 changes: 55 additions & 2 deletions ldk-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use clap::Parser;
use hex::DisplayHex;
Expand Down Expand Up @@ -50,6 +50,7 @@ use crate::io::persist::{
use crate::service::NodeService;
use crate::util::config::{load_config, ArgsConfig, ChainSource};
use crate::util::logger::ServerLogger;
use crate::util::metrics::Metrics;
use crate::util::proto_adapter::{forwarded_payment_to_proto, payment_to_proto};
use crate::util::systemd;
use crate::util::tls::get_or_generate_tls_config;
Expand Down Expand Up @@ -273,6 +274,28 @@ fn main() {
}
};
let event_node = Arc::clone(&node);

let metrics: Option<Arc<Metrics>> = if config_file.metrics_enabled {
let poll_metrics_interval = Duration::from_secs(config_file.poll_metrics_interval.unwrap_or(60));
let metrics_node = Arc::clone(&node);
let mut interval = tokio::time::interval(poll_metrics_interval);
let metrics = Arc::new(Metrics::new());
let metrics_bg = Arc::clone(&metrics);

// Initialize metrics that are event-driven to ensure they start with correct values from persistence
metrics.initialize_payment_metrics(&metrics_node);

runtime.spawn(async move {
loop {
interval.tick().await;
metrics_bg.update_all_pollable_metrics(&metrics_node);
}
});
Some(metrics)
} else {
None
};

let rest_svc_listener = TcpListener::bind(config_file.rest_service_addr)
.await
.expect("Failed to bind listening port");
Expand Down Expand Up @@ -313,7 +336,24 @@ fn main() {
if let Err(e) = event_node.event_handled() {
error!("Failed to mark event as handled: {e}");
}

if let Some(metrics) = &metrics {
metrics.update_channels_count(false);
}
},
Event::ChannelClosed { channel_id, counterparty_node_id, .. } => {
info!(
"CHANNEL_CLOSED: {} from counterparty {:?}",
channel_id, counterparty_node_id
);
if let Err(e) = event_node.event_handled() {
error!("Failed to mark event as handled: {e}");
}

if let Some(metrics) = &metrics {
metrics.update_channels_count(true);
}
}
Event::PaymentReceived { payment_id, payment_hash, amount_msat, .. } => {
info!(
"PAYMENT_RECEIVED: with id {:?}, hash {}, amount_msat {}",
Expand All @@ -328,6 +368,10 @@ fn main() {
&event_node,
Arc::clone(&event_publisher),
Arc::clone(&paginated_store)).await;

if let Some(metrics) = &metrics {
metrics.update_all_balances(&event_node);
}
},
Event::PaymentSuccessful {payment_id, ..} => {
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
Expand All @@ -339,6 +383,11 @@ fn main() {
&event_node,
Arc::clone(&event_publisher),
Arc::clone(&paginated_store)).await;

if let Some(metrics) = &metrics {
metrics.update_payments_count(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to update our balance metric here and the payment received one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update it with the channel events and their corresponding metrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to update our balance metric here and the payment received one

Not sure how this will look, do we want to build/maintain a local balance value based on the payment received event, rather than rely on the node information? Same with the channel events

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this will look, do we want to build/maintain a local balance value based on the payment received event, rather than rely on the node information?

Should be able to just use your Metrics::update_all_balances here

Same with the channel events

channel counts we can just increase/decrease like we do with the payment counts

metrics.update_all_balances(&event_node);
}
},
Event::PaymentFailed {payment_id, ..} => {
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
Expand All @@ -350,6 +399,10 @@ fn main() {
&event_node,
Arc::clone(&event_publisher),
Arc::clone(&paginated_store)).await;

if let Some(metrics) = &metrics {
metrics.update_payments_count(false);
}
},
Event::PaymentClaimable {payment_id, ..} => {
publish_event_and_upsert_payment(&payment_id,
Expand Down Expand Up @@ -435,7 +488,7 @@ fn main() {
res = rest_svc_listener.accept() => {
match res {
Ok((stream, _)) => {
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store), api_key.clone());
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store), api_key.clone(), metrics.clone());
let acceptor = tls_acceptor.clone();
runtime.spawn(async move {
match acceptor.accept(stream).await {
Expand Down
Loading
Loading