Skip to content
Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fuser = { version = "0.16.0", features = ["libfuse"] }
libc = "0.2"
mesa-dev = "1.11.0"
num-traits = "0.2"
http = "1"
reqwest = { version = "0.12", default-features = false }
reqwest-middleware = "0.4"
serde_path_to_error = "0.1"
Expand Down Expand Up @@ -42,15 +43,14 @@ semver = "1.0"
shellexpand = "3.1"
inquire = "0.9.2"
tracing-indicatif = "0.3.14"
opentelemetry = { version = "0.29", optional = true }
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.29", features = ["http-proto", "trace", "reqwest-client"], optional = true }
tracing-opentelemetry = { version = "0.30", optional = true }
opentelemetry = { version = "0.29" }
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] }
tracing-opentelemetry = { version = "0.30" }

[features]
default = []
staging = []
__otlp_export = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-otlp", "tracing-opentelemetry"]

[build-dependencies]
vergen-gitcl = { version = "1", features = [] }
Expand Down
44 changes: 44 additions & 0 deletions src/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,35 @@ impl Default for DaemonConfig {
}
}

/// The Mesa telemetry endpoint.
pub const MESA_TELEMETRY_ENDPOINT: &str = "https://telemetry.priv.mesa.dev/v1/traces";

/// Telemetry configuration for exporting OpenTelemetry traces.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", default)]
pub struct TelemetryConfig {
/// Whether to send telemetry data to Mesa's servers.
pub vendor: bool,

/// Custom collector URL for forwarding telemetry to the user's own servers.
pub collector_url: Option<String>,
}

impl TelemetryConfig {
/// Returns the list of OTLP endpoints to export traces to.
#[must_use]
pub fn endpoints(&self) -> Vec<String> {
let mut endpoints = Vec::new();
if self.vendor {
endpoints.push(MESA_TELEMETRY_ENDPOINT.to_owned());
}
if let Some(ref url) = self.collector_url {
endpoints.push(url.clone());
}
endpoints
}
}

/// Application configuration structure.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -350,6 +379,9 @@ pub struct Config {
#[serde(default)]
pub daemon: DaemonConfig,

#[serde(default)]
pub telemetry: TelemetryConfig,

/// The mount point for the filesystem.
#[serde(default = "default_mount_point")]
pub mount_point: ExpandedPathBuf,
Expand All @@ -369,6 +401,7 @@ struct DangerousConfig<'a> {
pub organizations: HashMap<&'a str, DangerousOrganizationConfig<'a>>,
pub cache: &'a CacheConfig,
pub daemon: &'a DaemonConfig,
pub telemetry: &'a TelemetryConfig,
pub mount_point: &'a Path,
pub uid: u32,
pub gid: u32,
Expand All @@ -388,6 +421,7 @@ impl<'a> From<&'a Config> for DangerousConfig<'a> {
.collect(),
cache: &config.cache,
daemon: &config.daemon,
telemetry: &config.telemetry,
mount_point: &config.mount_point,
uid: config.uid,
gid: config.gid,
Expand All @@ -401,6 +435,7 @@ impl Default for Config {
organizations: default_organizations(),
cache: CacheConfig::default(),
daemon: DaemonConfig::default(),
telemetry: TelemetryConfig::default(),
mount_point: default_mount_point(),
uid: current_uid(),
gid: current_gid(),
Expand Down Expand Up @@ -459,6 +494,15 @@ impl Config {
}
}

if let Some(ref url) = self.telemetry.collector_url
&& !url.starts_with("http://")
&& !url.starts_with("https://")
{
errors.push(format!(
"Telemetry collector URL '{url}' must start with http:// or https://."
));
}

if errors.is_empty() {
Ok(())
} else {
Expand Down
56 changes: 52 additions & 4 deletions src/fs/mescloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::time::SystemTime;

use bytes::Bytes;
use mesa_dev::MesaClient;
use opentelemetry::propagation::Injector;
use secrecy::ExposeSecret as _;
use tracing::{Instrument as _, instrument, trace, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt as _;

use crate::fs::icache::bridge::HashMapBridge;
use crate::fs::icache::{AsyncICache, FileTable, IcbResolver};
Expand Down Expand Up @@ -37,6 +39,55 @@ use org::OrgFs;
pub mod icache;
pub mod repo;

struct HeaderInjector<'a>(&'a mut reqwest::header::HeaderMap);

impl Injector for HeaderInjector<'_> {
fn set(&mut self, key: &str, value: String) {
if let (Ok(name), Ok(val)) = (
reqwest::header::HeaderName::from_bytes(key.as_bytes()),
reqwest::header::HeaderValue::from_str(&value),
) {
self.0.insert(name, val);
}
}
}

/// Middleware that injects W3C `traceparent`/`tracestate` headers from the
/// current `tracing` span into every outgoing HTTP request.
struct OtelPropagationMiddleware;

#[async_trait::async_trait]
impl reqwest_middleware::Middleware for OtelPropagationMiddleware {
async fn handle(
&self,
mut req: reqwest::Request,
extensions: &mut http::Extensions,
next: reqwest_middleware::Next<'_>,
) -> reqwest_middleware::Result<reqwest::Response> {
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut()));
});
tracing::debug!(
traceparent = req.headers().get("traceparent").and_then(|v| v.to_str().ok()),
url = %req.url(),
"outgoing request"
);
next.run(req, extensions).await
}
}

fn build_mesa_client(api_key: &str) -> MesaClient {
let client = reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(OtelPropagationMiddleware)
.build();
MesaClient::builder()
.with_api_key(api_key)
.with_base_path(MESA_API_BASE_URL)
.with_client(client)
.build()
}

struct MesaResolver {
fs_owner: (u32, u32),
block_size: u32,
Expand Down Expand Up @@ -122,10 +173,7 @@ impl MesaFS {
inode_to_slot: HashMap::new(),
slots: orgs
.map(|org_conf| {
let client = MesaClient::builder()
.with_api_key(org_conf.api_key.expose_secret())
.with_base_path(MESA_API_BASE_URL)
.build();
let client = build_mesa_client(org_conf.api_key.expose_secret());
let org = OrgFs::new(org_conf.name, client, fs_owner);
ChildSlot {
inner: org,
Expand Down
Loading