From cfb12117992295527c60fd1d7a6cb2d531e56f61 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 10 Jun 2026 10:31:53 +0200 Subject: [PATCH 1/7] feat(grpc): add exponential backoff for reconnection attempts Assisted-by: claude-opus-4-6@default --- fact/src/output/grpc.rs | 71 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 259d6004..c3120d06 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -20,6 +20,32 @@ use tonic::transport::Channel; use crate::{config::GrpcConfig, event::Event, metrics::EventCounter}; +struct Backoff { + initial: Duration, + current: Duration, + max: Duration, +} + +impl Backoff { + fn new(initial: Duration, max: Duration) -> Self { + Self { + initial, + current: initial, + max, + } + } + + fn next(&mut self) -> Duration { + let delay = self.current; + self.current = (self.current * 2).min(self.max); + delay + } + + fn reset(&mut self) { + self.current = self.initial; + } +} + pub struct Client { rx: broadcast::Receiver>, running: watch::Receiver, @@ -121,6 +147,7 @@ impl Client { } async fn run(&mut self) -> anyhow::Result { + let mut backoff = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); loop { // Re-read certs on each connection attempt so rotated certificates // on disk are picked up on the next reconnect. @@ -129,12 +156,14 @@ impl Client { let channel = match self.create_channel(connector).await { Ok(channel) => channel, Err(e) => { - debug!("Failed to connect to server: {e:?}"); - sleep(Duration::from_secs(1)).await; + let delay = backoff.next(); + debug!("Failed to connect to server: {e:?}, retrying in {delay:?}"); + sleep(delay).await; continue; } }; info!("Successfully connected to gRPC server"); + backoff.reset(); let mut client = FileActivityServiceClient::new(channel); @@ -159,6 +188,9 @@ impl Client { Ok(_) => info!("gRPC stream ended"), Err(e) => warn!("gRPC stream error: {e:?}"), } + let delay = backoff.next(); + warn!("Reconnecting in {delay:?}..."); + sleep(delay).await; } _ = self.config.changed() => return Ok(true), _ = self.running.changed() => return Ok(*self.running.borrow()), @@ -177,3 +209,38 @@ impl Client { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_exponential() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Duration::from_secs(4)); + assert_eq!(b.next(), Duration::from_secs(8)); + assert_eq!(b.next(), Duration::from_secs(16)); + assert_eq!(b.next(), Duration::from_secs(32)); + } + + #[test] + fn backoff_caps_at_max() { + let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60)); + assert_eq!(b.next(), Duration::from_secs(32)); + assert_eq!(b.next(), Duration::from_secs(60)); + assert_eq!(b.next(), Duration::from_secs(60)); + } + + #[test] + fn backoff_reset() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); + b.next(); + b.next(); + b.next(); + b.reset(); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + } +} From a9a49b408146cfeb23405e4348ed909b8cd27eb0 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 10 Jun 2026 11:06:19 +0200 Subject: [PATCH 2/7] feat(config): make backoff initial/max configurable via YAML, CLI, and env vars Add BackoffConfig to GrpcConfig with YAML (grpc.backoff.initial, grpc.backoff.max), CLI (--backoff-initial, --backoff-max), and env var (FACT_GRPC_BACKOFF_INITIAL, FACT_GRPC_BACKOFF_MAX) support. Extract yaml_to_duration_secs and parse_duration_secs helpers, reusing them for scan_interval parsing as well. Assisted-by: claude-opus-4-6@default --- fact/src/config/mod.rs | 129 ++++++++++-- fact/src/config/tests.rs | 416 ++++++++++++++++++++++++++++++++++++++- fact/src/output/grpc.rs | 3 +- 3 files changed, 527 insertions(+), 21 deletions(-) diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index b7530176..c5305102 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -16,6 +16,29 @@ pub mod reloader; #[cfg(test)] mod tests; +fn yaml_to_duration_secs(v: &Yaml) -> Option { + v.as_f64() + .or_else(|| v.as_i64().map(|i| i as f64)) + .filter(|s| s.is_finite() && *s >= 0.0) + .map(Duration::from_secs_f64) +} + +fn parse_duration_secs(s: &str) -> anyhow::Result { + let f: f64 = s.parse()?; + if !f.is_finite() || f < 0.0 { + bail!("value must be a non-negative finite number, got {f}"); + } + Ok(Duration::from_secs_f64(f)) +} + +fn parse_positive_duration_secs(s: &str) -> anyhow::Result { + let d = parse_duration_secs(s)?; + if d.is_zero() { + bail!("value must be greater than zero"); + } + Ok(d) +} + const CONFIG_FILES: [&str; 4] = [ "/etc/stackrox/fact.yml", "/etc/stackrox/fact.yaml", @@ -218,20 +241,11 @@ impl TryFrom> for FactConfig { config.hotreload = Some(hotreload); } "scan_interval" => { - // scan_internal == 0 disables the scanner - if let Some(scan_interval) = v.as_f64() { - if scan_interval < 0.0 { - bail!("invalid scan_interval: {scan_interval}"); - } - config.scan_interval = Some(Duration::from_secs_f64(scan_interval)); - } else if let Some(scan_interval) = v.as_i64() { - if scan_interval < 0 { - bail!("invalid scan_interval: {scan_interval}"); - } - config.scan_interval = Some(Duration::from_secs(scan_interval as u64)) - } else { - bail!("scan_interval field has incorrect type: {v:?}"); - } + // scan_interval == 0 disables the scanner + config.scan_interval = Some( + yaml_to_duration_secs(v) + .with_context(|| format!("invalid scan_interval: {v:?}"))?, + ); } "rate_limit" => { // rate_limit == 0 means unlimited (no throttling) @@ -328,10 +342,67 @@ impl TryFrom<&yaml::Hash> for EndpointConfig { } } +#[derive(Debug, Default, PartialEq, Eq, Clone)] +pub struct BackoffConfig { + initial: Option, + max: Option, +} + +impl BackoffConfig { + fn update(&mut self, from: &BackoffConfig) { + if let Some(initial) = from.initial { + self.initial = Some(initial); + } + if let Some(max) = from.max { + self.max = Some(max); + } + } + + pub fn initial(&self) -> Duration { + self.initial.unwrap_or(Duration::from_secs(1)) + } + + pub fn max(&self) -> Duration { + self.max.unwrap_or(Duration::from_secs(60)) + } +} + +impl TryFrom<&yaml::Hash> for BackoffConfig { + type Error = anyhow::Error; + + fn try_from(value: &yaml::Hash) -> Result { + let mut backoff = BackoffConfig::default(); + for (k, v) in value.iter() { + let Some(k) = k.as_str() else { + bail!("key is not string: {k:?}"); + }; + match k { + "initial" => { + backoff.initial = Some( + yaml_to_duration_secs(v) + .filter(|d| !d.is_zero()) + .with_context(|| format!("invalid grpc.backoff.initial: {v:?}"))?, + ); + } + "max" => { + backoff.max = Some( + yaml_to_duration_secs(v) + .filter(|d| !d.is_zero()) + .with_context(|| format!("invalid grpc.backoff.max: {v:?}"))?, + ); + } + name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), + } + } + Ok(backoff) + } +} + #[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct GrpcConfig { url: Option, certs: Option, + pub backoff: BackoffConfig, } impl GrpcConfig { @@ -343,6 +414,8 @@ impl GrpcConfig { if let Some(certs) = from.certs.as_deref() { self.certs = Some(certs.to_owned()); } + + self.backoff.update(&from.backoff); } pub fn url(&self) -> Option<&str> { @@ -377,6 +450,12 @@ impl TryFrom<&yaml::Hash> for GrpcConfig { }; grpc.certs = Some(PathBuf::from(certs)); } + "backoff" => { + let Some(backoff) = v.as_hash() else { + bail!("grpc.backoff section has incorrect type: {v:?}"); + }; + grpc.backoff = BackoffConfig::try_from(backoff)?; + } name => bail!("Invalid field 'grpc.{name}' with value: {v:?}"), } } @@ -465,6 +544,18 @@ pub struct FactCli { #[arg(short, long, env = "FACT_CERTS")] certs: Option, + /// Initial backoff delay in seconds for gRPC reconnection + /// + /// Default value is 1 second + #[arg(long, env = "FACT_GRPC_BACKOFF_INITIAL", value_parser = parse_positive_duration_secs)] + backoff_initial: Option, + + /// Maximum backoff delay in seconds for gRPC reconnection + /// + /// Default value is 60 seconds + #[arg(long, env = "FACT_GRPC_BACKOFF_MAX", value_parser = parse_positive_duration_secs)] + backoff_max: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -535,8 +626,8 @@ pub struct FactCli { /// The seconds can use a decimal point for fractions of seconds. /// /// Default value is 30 seconds - #[arg(long, short, env = "FACT_SCAN_INTERVAL")] - scan_interval: Option, + #[arg(long, short, env = "FACT_SCAN_INTERVAL", value_parser = parse_duration_secs)] + scan_interval: Option, /// Maximum number of file events to allow per second /// @@ -555,6 +646,10 @@ impl FactCli { grpc: GrpcConfig { url: self.url.clone(), certs: self.certs.clone(), + backoff: BackoffConfig { + initial: self.backoff_initial, + max: self.backoff_max, + }, }, endpoint: EndpointConfig { address: self.address, @@ -568,7 +663,7 @@ impl FactCli { skip_pre_flight: resolve_bool_arg(self.skip_pre_flight, self.no_skip_pre_flight), json: resolve_bool_arg(self.json, self.no_json), hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload), - scan_interval: self.scan_interval.map(Duration::from_secs_f64), + scan_interval: self.scan_interval, rate_limit: self.rate_limit, } } diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index f25d86c5..65557539 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -255,6 +255,58 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 30 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 0.5 + max: 120 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" paths: @@ -262,6 +314,9 @@ fn parsing() { grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs + backoff: + initial: 0.5 + max: 120 endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -279,6 +334,10 @@ fn parsing() { grpc: GrpcConfig { url: Some(String::from("https://svc.sensor.stackrox:9090")), certs: Some(PathBuf::from("/etc/stackrox/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 8080))), @@ -346,6 +405,69 @@ paths: "#, "certs field has incorrect type: Boolean(true)", ), + ( + r#" + grpc: + backoff: true + "#, + "grpc.backoff section has incorrect type: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + initial: true + "#, + "invalid grpc.backoff.initial: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + max: true + "#, + "invalid grpc.backoff.max: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + initial: 0 + "#, + "invalid grpc.backoff.initial: Integer(0)", + ), + ( + r#" + grpc: + backoff: + initial: -1 + "#, + "invalid grpc.backoff.initial: Integer(-1)", + ), + ( + r#" + grpc: + backoff: + max: 0 + "#, + "invalid grpc.backoff.max: Integer(0)", + ), + ( + r#" + grpc: + backoff: + max: -5 + "#, + "invalid grpc.backoff.max: Integer(-5)", + ), + ( + r#" + grpc: + backoff: + unknown: 4 + "#, + "Invalid field 'grpc.backoff.unknown' with value: Integer(4)", + ), ( "endpoint: true", "Invalid field 'endpoint' with value: Boolean(true)", @@ -483,10 +605,16 @@ paths: ), ( "scan_interval: true", - "scan_interval field has incorrect type: Boolean(true)", + "invalid scan_interval: Boolean(true)", + ), + ( + "scan_interval: -128", + "invalid scan_interval: Integer(-128)", + ), + ( + "scan_interval: -128.5", + "invalid scan_interval: Real(\"-128.5\")", ), - ("scan_interval: -128", "invalid scan_interval: -128"), - ("scan_interval: -128.5", "invalid scan_interval: -128.5"), ("unknown:", "Invalid field 'unknown' with value: Null"), ]; for (input, expected) in tests { @@ -658,6 +786,150 @@ fn update() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1020,6 +1292,9 @@ fn update() { grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs + backoff: + initial: 0.5 + max: 120 endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1037,6 +1312,10 @@ fn update() { grpc: GrpcConfig { url: Some(String::from("http://localhost")), certs: Some(PathBuf::from("/etc/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs(15)), + max: Some(Duration::from_secs(30)), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 9000))), @@ -1058,6 +1337,10 @@ fn update() { grpc: GrpcConfig { url: Some(String::from("https://svc.sensor.stackrox:9090")), certs: Some(PathBuf::from("/etc/stackrox/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([127, 0, 0, 1], 8080))), @@ -1104,6 +1387,8 @@ fn defaults() { assert_eq!(config.bpf.ringbuf_size(), 8192); assert_eq!(config.bpf.inodes_max(), 65536); assert!(config.hotreload()); + assert_eq!(config.grpc.backoff.initial(), Duration::from_secs(1)); + assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1225,6 +1510,16 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_SCAN_INTERVAL", + value: "0", + }, + FactConfig { + scan_interval: Some(Duration::ZERO), + ..Default::default() + }, + ), ( EnvVar { name: "FACT_RATE_LIMIT", @@ -1261,6 +1556,38 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "120", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -1372,6 +1699,40 @@ fn env_vars_override_yaml() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "5", + }, + "grpc:\n backoff:\n initial: 2", + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "120", + }, + "grpc:\n backoff:\n max: 30", + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_PATHS", @@ -1587,6 +1948,55 @@ fn env_vars_invalid_values() { }, "error: invalid value 'not_a_boolean' for '--hotreload'", ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-initial ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-max ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "0", + }, + "error: invalid value '0' for '--backoff-initial ': value must be greater than zero", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "0", + }, + "error: invalid value '0' for '--backoff-max ': value must be greater than zero", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "-1", + }, + "error: invalid value '-1' for '--backoff-initial ': value must be a non-negative finite number, got -1", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "-1", + }, + "error: invalid value '-1' for '--backoff-max ': value must be a non-negative finite number, got -1", + ), + ( + EnvVar { + name: "FACT_SCAN_INTERVAL", + value: "-1", + }, + "error: invalid value '-1' for '--scan-interval ': value must be a non-negative finite number, got -1", + ), ]; for (env, expected) in tests { let Err(err) = with_env_var(env) else { diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index c3120d06..8c47ac5c 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -147,7 +147,8 @@ impl Client { } async fn run(&mut self) -> anyhow::Result { - let mut backoff = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); + let backoff_config = self.config.borrow().backoff.clone(); + let mut backoff = Backoff::new(backoff_config.initial(), backoff_config.max()); loop { // Re-read certs on each connection attempt so rotated certificates // on disk are picked up on the next reconnect. From 9054e092c0c25d0029bed2a39f2a7b8d45da108c Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 10 Jun 2026 13:31:19 +0200 Subject: [PATCH 3/7] cleanup(grpc): simplify Backoff creation --- fact/src/output/grpc.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 8c47ac5c..7975b2bc 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -18,7 +18,11 @@ use tokio_stream::{ }; use tonic::transport::Channel; -use crate::{config::GrpcConfig, event::Event, metrics::EventCounter}; +use crate::{ + config::{BackoffConfig, GrpcConfig}, + event::Event, + metrics::EventCounter, +}; struct Backoff { initial: Duration, @@ -46,6 +50,12 @@ impl Backoff { } } +impl From<&BackoffConfig> for Backoff { + fn from(value: &BackoffConfig) -> Self { + Backoff::new(value.initial(), value.max()) + } +} + pub struct Client { rx: broadcast::Receiver>, running: watch::Receiver, @@ -147,8 +157,7 @@ impl Client { } async fn run(&mut self) -> anyhow::Result { - let backoff_config = self.config.borrow().backoff.clone(); - let mut backoff = Backoff::new(backoff_config.initial(), backoff_config.max()); + let mut backoff = Backoff::from(&self.config.borrow().backoff); loop { // Re-read certs on each connection attempt so rotated certificates // on disk are picked up on the next reconnect. @@ -189,9 +198,6 @@ impl Client { Ok(_) => info!("gRPC stream ended"), Err(e) => warn!("gRPC stream error: {e:?}"), } - let delay = backoff.next(); - warn!("Reconnecting in {delay:?}..."); - sleep(delay).await; } _ = self.config.changed() => return Ok(true), _ = self.running.changed() => return Ok(*self.running.borrow()), @@ -237,9 +243,9 @@ mod tests { #[test] fn backoff_reset() { let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); - b.next(); - b.next(); - b.next(); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Duration::from_secs(4)); b.reset(); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); From fc69ffee7e9651ce52433aa0ee23ab351bf80a52 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 10 Jun 2026 13:42:49 +0200 Subject: [PATCH 4/7] chore: add changelog line --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index df3f5633..6ab0c7fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ possible include a PR number for easier tracking. ## Next +* feat(grpc): add exponential backoff for reconnection attempts (#789) * ROX-34502: reload mTLS certificates on each gRPC connection attempt (#788) * chore: add formatting and linting to integration test code (#783, #784) * feat: add code coverage with cargo-llvm-cov and Codecov upload (#745) From 9f129fea92e4bb9051f9406f647feea5cfaa2bf0 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Thu, 11 Jun 2026 11:50:41 +0200 Subject: [PATCH 5/7] feat(grpc): add full jitter to backoff reconnection Add configurable jitter to the exponential backoff using rand crate for uniform distribution over [0, delay]. Jitter is enabled by default and can be toggled via YAML (grpc.backoff.jitter), CLI (--backoff-jitter / --no-backoff-jitter), or env var (FACT_GRPC_NO_BACKOFF_JITTER). Assisted-by: claude-opus-4-6@default --- Cargo.lock | 51 +++++++++++++++++++++++++++++++----- Cargo.toml | 1 + fact/Cargo.toml | 1 + fact/src/config/mod.rs | 27 +++++++++++++++++++ fact/src/config/tests.rs | 56 ++++++++++++++++++++++++++++++++++++++++ fact/src/output/grpc.rs | 47 ++++++++++++++++++++++++++++----- 6 files changed, 171 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb4a6560..bedc4a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,6 +262,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core 0.10.1", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -344,6 +355,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -454,6 +474,7 @@ dependencies = [ "prometheus-client", "prost", "prost-types", + "rand 0.10.1", "regex", "serde", "serde_json", @@ -606,6 +627,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -646,7 +668,7 @@ dependencies = [ "parking_lot", "portable-atomic", "quanta", - "rand", + "rand 0.9.3", "smallvec", "spinning_top", "web-time", @@ -1342,7 +1364,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166" dependencies = [ "rand_chacha", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.1", + "rand_core 0.10.1", ] [[package]] @@ -1352,7 +1385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", ] [[package]] @@ -1364,6 +1397,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -1427,7 +1466,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1605,10 +1644,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1032319c..59273075 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ openssl = "0.10.75" prometheus-client = { version = "0.24.0", default-features = false } prost = "0.14.0" prost-types = "0.14.0" +rand = { version = "0.10.1", default-features = false, features = ["thread_rng"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.142" shlex = "2.0.1" diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 1b2b63c8..8bba7aff 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -28,6 +28,7 @@ tokio-stream = { workspace = true } prometheus-client = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } shlex = { workspace = true } diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index c5305102..5e7af10d 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -346,6 +346,7 @@ impl TryFrom<&yaml::Hash> for EndpointConfig { pub struct BackoffConfig { initial: Option, max: Option, + jitter: Option, } impl BackoffConfig { @@ -356,6 +357,9 @@ impl BackoffConfig { if let Some(max) = from.max { self.max = Some(max); } + if let Some(jitter) = from.jitter { + self.jitter = Some(jitter); + } } pub fn initial(&self) -> Duration { @@ -365,6 +369,10 @@ impl BackoffConfig { pub fn max(&self) -> Duration { self.max.unwrap_or(Duration::from_secs(60)) } + + pub fn jitter(&self) -> bool { + self.jitter.unwrap_or(true) + } } impl TryFrom<&yaml::Hash> for BackoffConfig { @@ -391,6 +399,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { .with_context(|| format!("invalid grpc.backoff.max: {v:?}"))?, ); } + "jitter" => { + let Some(jitter) = v.as_bool() else { + bail!("grpc.backoff.jitter field has incorrect type: {v:?}"); + }; + backoff.jitter = Some(jitter); + } name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), } } @@ -556,6 +570,18 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_MAX", value_parser = parse_positive_duration_secs)] backoff_max: Option, + /// Enable jitter for gRPC reconnection backoff + /// + /// Default value is true + #[arg(long, overrides_with = "no_backoff_jitter", hide = true)] + backoff_jitter: bool, + #[arg( + long, + overrides_with = "backoff_jitter", + env = "FACT_GRPC_NO_BACKOFF_JITTER" + )] + no_backoff_jitter: bool, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -649,6 +675,7 @@ impl FactCli { backoff: BackoffConfig { initial: self.backoff_initial, max: self.backoff_max, + jitter: resolve_bool_arg(self.backoff_jitter, self.no_backoff_jitter), }, }, endpoint: EndpointConfig { diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 65557539..a2e1a6ee 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -289,18 +289,37 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + jitter: false + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" grpc: backoff: initial: 0.5 max: 120 + jitter: false "#, FactConfig { grpc: GrpcConfig { backoff: BackoffConfig { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), + jitter: Some(false), }, ..Default::default() }, @@ -317,6 +336,7 @@ fn parsing() { backoff: initial: 0.5 max: 120 + jitter: false endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -337,6 +357,7 @@ fn parsing() { backoff: BackoffConfig { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), + jitter: Some(false), }, }, endpoint: EndpointConfig { @@ -460,6 +481,14 @@ paths: "#, "invalid grpc.backoff.max: Integer(-5)", ), + ( + r#" + grpc: + backoff: + jitter: 4 + "#, + "grpc.backoff.jitter field has incorrect type: Integer(4)", + ), ( r#" grpc: @@ -1295,6 +1324,7 @@ fn update() { backoff: initial: 0.5 max: 120 + jitter: false endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1315,6 +1345,7 @@ fn update() { backoff: BackoffConfig { initial: Some(Duration::from_secs(15)), max: Some(Duration::from_secs(30)), + jitter: Some(true), }, }, endpoint: EndpointConfig { @@ -1340,6 +1371,7 @@ fn update() { backoff: BackoffConfig { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), + jitter: Some(false), }, }, endpoint: EndpointConfig { @@ -1389,6 +1421,7 @@ fn defaults() { assert!(config.hotreload()); assert_eq!(config.grpc.backoff.initial(), Duration::from_secs(1)); assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); + assert!(config.grpc.backoff.jitter()); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1588,6 +1621,22 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_NO_BACKOFF_JITTER", + value: "true", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -1997,6 +2046,13 @@ fn env_vars_invalid_values() { }, "error: invalid value '-1' for '--scan-interval ': value must be a non-negative finite number, got -1", ), + ( + EnvVar { + name: "FACT_GRPC_NO_BACKOFF_JITTER", + value: "not_a_boolean", + }, + "error: invalid value 'not_a_boolean' for '--no-backoff-jitter'", + ), ]; for (env, expected) in tests { let Err(err) = with_env_var(env) else { diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 7975b2bc..8fd418b1 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -28,21 +28,28 @@ struct Backoff { initial: Duration, current: Duration, max: Duration, + jitter: bool, } impl Backoff { - fn new(initial: Duration, max: Duration) -> Self { + fn new(initial: Duration, max: Duration, jitter: bool) -> Self { Self { initial, current: initial, max, + jitter, } } fn next(&mut self) -> Duration { let delay = self.current; self.current = (self.current * 2).min(self.max); - delay + if self.jitter { + let nanos = rand::random_range(0..=delay.as_nanos() as u64); + Duration::from_nanos(nanos) + } else { + delay + } } fn reset(&mut self) { @@ -52,7 +59,7 @@ impl Backoff { impl From<&BackoffConfig> for Backoff { fn from(value: &BackoffConfig) -> Self { - Backoff::new(value.initial(), value.max()) + Backoff::new(value.initial(), value.max(), value.jitter()) } } @@ -223,7 +230,7 @@ mod tests { #[test] fn backoff_exponential() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); assert_eq!(b.next(), Duration::from_secs(4)); @@ -234,7 +241,7 @@ mod tests { #[test] fn backoff_caps_at_max() { - let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60)); + let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false); assert_eq!(b.next(), Duration::from_secs(32)); assert_eq!(b.next(), Duration::from_secs(60)); assert_eq!(b.next(), Duration::from_secs(60)); @@ -242,7 +249,7 @@ mod tests { #[test] fn backoff_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60)); + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); assert_eq!(b.next(), Duration::from_secs(4)); @@ -250,4 +257,32 @@ mod tests { assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); } + + #[test] + fn backoff_jitter_within_range() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true); + let mut expected_max = Duration::from_secs(1); + for _ in 0..100 { + let delay = b.next(); + assert!( + delay <= expected_max, + "delay {delay:?} exceeded expected max {expected_max:?}" + ); + expected_max = (expected_max * 2).min(Duration::from_secs(60)); + } + } + + #[test] + fn backoff_jitter_reset() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true); + for _ in 0..5 { + b.next(); + } + b.reset(); + let delay = b.next(); + assert!( + delay <= Duration::from_secs(1), + "delay {delay:?} exceeded 1s after reset" + ); + } } From 37a1ae1fbc692385786ace42dff65bc59bca7001 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Thu, 11 Jun 2026 13:38:44 +0200 Subject: [PATCH 6/7] feat(config): make backoff multiplier configurable Add multiplier field to BackoffConfig (default 1.5x). Configurable via YAML (grpc.backoff.multiplier), CLI (--backoff-multiplier), and env var (FACT_GRPC_BACKOFF_MULTIPLIER). Must be > 1.0. Drops Eq derive from config types in favor of PartialEq to support storing multiplier as f64 directly. Assisted-by: claude-opus-4-6@default --- fact/src/config/mod.rs | 41 +++++++++++- fact/src/config/tests.rs | 133 +++++++++++++++++++++++++++++++++++++++ fact/src/output/grpc.rs | 39 +++++++++--- 3 files changed, 200 insertions(+), 13 deletions(-) diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 5e7af10d..e9520173 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -39,6 +39,14 @@ fn parse_positive_duration_secs(s: &str) -> anyhow::Result { Ok(d) } +fn parse_multiplier(s: &str) -> anyhow::Result { + let mult = s.parse::()?; + if !mult.is_finite() || mult <= 1.0 { + bail!("multiplier must be > 1.0, got {mult}"); + } + Ok(mult) +} + const CONFIG_FILES: [&str; 4] = [ "/etc/stackrox/fact.yml", "/etc/stackrox/fact.yaml", @@ -46,7 +54,7 @@ const CONFIG_FILES: [&str; 4] = [ "fact.yaml", ]; -#[derive(Debug, Default, PartialEq, Eq, Clone)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct FactConfig { paths: Option>, pub grpc: GrpcConfig, @@ -342,11 +350,12 @@ impl TryFrom<&yaml::Hash> for EndpointConfig { } } -#[derive(Debug, Default, PartialEq, Eq, Clone)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct BackoffConfig { initial: Option, max: Option, jitter: Option, + multiplier: Option, } impl BackoffConfig { @@ -360,6 +369,9 @@ impl BackoffConfig { if let Some(jitter) = from.jitter { self.jitter = Some(jitter); } + if let Some(multiplier) = from.multiplier { + self.multiplier = Some(multiplier); + } } pub fn initial(&self) -> Duration { @@ -373,6 +385,10 @@ impl BackoffConfig { pub fn jitter(&self) -> bool { self.jitter.unwrap_or(true) } + + pub fn multiplier(&self) -> f64 { + self.multiplier.unwrap_or(1.5) + } } impl TryFrom<&yaml::Hash> for BackoffConfig { @@ -405,6 +421,18 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { }; backoff.jitter = Some(jitter); } + "multiplier" => { + let multiplier = match v.as_f64().or_else(|| v.as_i64().map(|v| v as f64)) { + Some(m) if !m.is_finite() || m <= 1.0 => { + bail!("invalid grpc.backoff.multiplier: {v:?}") + } + None => { + bail!("invalid grpc.backoff.multiplier: {v:?}") + } + Some(m) => m, + }; + backoff.multiplier = Some(multiplier); + } name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), } } @@ -412,7 +440,7 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { } } -#[derive(Debug, Default, PartialEq, Eq, Clone)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct GrpcConfig { url: Option, certs: Option, @@ -570,6 +598,12 @@ pub struct FactCli { #[arg(long, env = "FACT_GRPC_BACKOFF_MAX", value_parser = parse_positive_duration_secs)] backoff_max: Option, + /// Backoff multiplier for gRPC reconnection + /// + /// Must be > 1.0. Default value is 1.5 + #[arg(long, env = "FACT_GRPC_BACKOFF_MULTIPLIER", value_parser = parse_multiplier)] + backoff_multiplier: Option, + /// Enable jitter for gRPC reconnection backoff /// /// Default value is true @@ -676,6 +710,7 @@ impl FactCli { initial: self.backoff_initial, max: self.backoff_max, jitter: resolve_bool_arg(self.backoff_jitter, self.no_backoff_jitter), + multiplier: self.backoff_multiplier, }, }, endpoint: EndpointConfig { diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index a2e1a6ee..b4764807 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -306,6 +306,40 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 3.5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(3.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" grpc: @@ -313,6 +347,7 @@ fn parsing() { initial: 0.5 max: 120 jitter: false + multiplier: 2 "#, FactConfig { grpc: GrpcConfig { @@ -320,6 +355,7 @@ fn parsing() { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), jitter: Some(false), + multiplier: Some(2.0), }, ..Default::default() }, @@ -337,6 +373,7 @@ fn parsing() { initial: 0.5 max: 120 jitter: false + multiplier: 2 endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -358,6 +395,7 @@ fn parsing() { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), jitter: Some(false), + multiplier: Some(2.0), }, }, endpoint: EndpointConfig { @@ -489,6 +527,22 @@ paths: "#, "grpc.backoff.jitter field has incorrect type: Integer(4)", ), + ( + r#" + grpc: + backoff: + multiplier: true + "#, + "invalid grpc.backoff.multiplier: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + multiplier: 0.5 + "#, + "invalid grpc.backoff.multiplier: Real(\"0.5\")", + ), ( r#" grpc: @@ -959,6 +1013,51 @@ fn update() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(1.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1325,6 +1424,7 @@ fn update() { initial: 0.5 max: 120 jitter: false + multiplier: 3.0 endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1346,6 +1446,7 @@ fn update() { initial: Some(Duration::from_secs(15)), max: Some(Duration::from_secs(30)), jitter: Some(true), + multiplier: Some(2.0), }, }, endpoint: EndpointConfig { @@ -1372,6 +1473,7 @@ fn update() { initial: Some(Duration::from_secs_f64(0.5)), max: Some(Duration::from_secs(120)), jitter: Some(false), + multiplier: Some(3.0), }, }, endpoint: EndpointConfig { @@ -1422,6 +1524,7 @@ fn defaults() { assert_eq!(config.grpc.backoff.initial(), Duration::from_secs(1)); assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); assert!(config.grpc.backoff.jitter()); + assert_eq!(config.grpc.backoff.multiplier(), 1.5); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1637,6 +1740,22 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "2.5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -2053,6 +2172,20 @@ fn env_vars_invalid_values() { }, "error: invalid value 'not_a_boolean' for '--no-backoff-jitter'", ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-multiplier ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "0.5", + }, + "error: invalid value '0.5' for '--backoff-multiplier ': multiplier must be > 1.0, got 0.5", + ), ]; for (env, expected) in tests { let Err(err) = with_env_var(env) else { diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 8fd418b1..2daae5ef 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -29,21 +29,25 @@ struct Backoff { current: Duration, max: Duration, jitter: bool, + multiplier: f64, } impl Backoff { - fn new(initial: Duration, max: Duration, jitter: bool) -> Self { + fn new(initial: Duration, max: Duration, jitter: bool, multiplier: f64) -> Self { Self { initial, current: initial, max, jitter, + multiplier, } } fn next(&mut self) -> Duration { let delay = self.current; - self.current = (self.current * 2).min(self.max); + self.current = Duration::from_secs_f64( + (self.current.as_secs_f64() * self.multiplier).min(self.max.as_secs_f64()), + ); if self.jitter { let nanos = rand::random_range(0..=delay.as_nanos() as u64); Duration::from_nanos(nanos) @@ -59,7 +63,12 @@ impl Backoff { impl From<&BackoffConfig> for Backoff { fn from(value: &BackoffConfig) -> Self { - Backoff::new(value.initial(), value.max(), value.jitter()) + Backoff::new( + value.initial(), + value.max(), + value.jitter(), + value.multiplier(), + ) } } @@ -229,8 +238,8 @@ mod tests { use super::*; #[test] - fn backoff_exponential() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false); + fn backoff_exponential_2x() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); assert_eq!(b.next(), Duration::from_secs(4)); @@ -239,9 +248,18 @@ mod tests { assert_eq!(b.next(), Duration::from_secs(32)); } + #[test] + fn backoff_default_multiplier() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 1.5); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_millis(1500)); + assert_eq!(b.next(), Duration::from_millis(2250)); + assert_eq!(b.next(), Duration::from_millis(3375)); + } + #[test] fn backoff_caps_at_max() { - let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false); + let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false, 2.0); assert_eq!(b.next(), Duration::from_secs(32)); assert_eq!(b.next(), Duration::from_secs(60)); assert_eq!(b.next(), Duration::from_secs(60)); @@ -249,7 +267,7 @@ mod tests { #[test] fn backoff_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false); + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); assert_eq!(b.next(), Duration::from_secs(4)); @@ -260,7 +278,7 @@ mod tests { #[test] fn backoff_jitter_within_range() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true); + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); let mut expected_max = Duration::from_secs(1); for _ in 0..100 { let delay = b.next(); @@ -268,13 +286,14 @@ mod tests { delay <= expected_max, "delay {delay:?} exceeded expected max {expected_max:?}" ); - expected_max = (expected_max * 2).min(Duration::from_secs(60)); + let nanos = expected_max.as_nanos() as u64 * 1500 / 1000; + expected_max = Duration::from_nanos(nanos).min(Duration::from_secs(60)); } } #[test] fn backoff_jitter_reset() { - let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true); + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); for _ in 0..5 { b.next(); } From 1acb8094cc87cc86c44bcd239cd5dcf5b21d09a3 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Thu, 11 Jun 2026 16:28:41 +0200 Subject: [PATCH 7/7] cleanup: refactor some code for consistency and simplicity --- fact/src/config/mod.rs | 114 ++++++++++++++++++--------------------- fact/src/config/tests.rs | 8 +-- 2 files changed, 56 insertions(+), 66 deletions(-) diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index e9520173..b286d7ca 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -16,37 +16,6 @@ pub mod reloader; #[cfg(test)] mod tests; -fn yaml_to_duration_secs(v: &Yaml) -> Option { - v.as_f64() - .or_else(|| v.as_i64().map(|i| i as f64)) - .filter(|s| s.is_finite() && *s >= 0.0) - .map(Duration::from_secs_f64) -} - -fn parse_duration_secs(s: &str) -> anyhow::Result { - let f: f64 = s.parse()?; - if !f.is_finite() || f < 0.0 { - bail!("value must be a non-negative finite number, got {f}"); - } - Ok(Duration::from_secs_f64(f)) -} - -fn parse_positive_duration_secs(s: &str) -> anyhow::Result { - let d = parse_duration_secs(s)?; - if d.is_zero() { - bail!("value must be greater than zero"); - } - Ok(d) -} - -fn parse_multiplier(s: &str) -> anyhow::Result { - let mult = s.parse::()?; - if !mult.is_finite() || mult <= 1.0 { - bail!("multiplier must be > 1.0, got {mult}"); - } - Ok(mult) -} - const CONFIG_FILES: [&str; 4] = [ "/etc/stackrox/fact.yml", "/etc/stackrox/fact.yaml", @@ -54,6 +23,13 @@ const CONFIG_FILES: [&str; 4] = [ "fact.yaml", ]; +fn yaml_to_duration_secs(v: &Yaml) -> Option { + v.as_f64() + .or_else(|| v.as_i64().map(|i| i as f64)) + .filter(|s| s.is_finite() && *s >= 0.0) + .map(Duration::from_secs_f64) +} + #[derive(Debug, Default, PartialEq, Clone)] pub struct FactConfig { paths: Option>, @@ -250,10 +226,10 @@ impl TryFrom> for FactConfig { } "scan_interval" => { // scan_interval == 0 disables the scanner - config.scan_interval = Some( - yaml_to_duration_secs(v) - .with_context(|| format!("invalid scan_interval: {v:?}"))?, - ); + let Some(scan_interval) = yaml_to_duration_secs(v) else { + bail!("invalid scan_interval: {v:?}"); + }; + config.scan_interval = Some(scan_interval); } "rate_limit" => { // rate_limit == 0 means unlimited (no throttling) @@ -402,18 +378,16 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { }; match k { "initial" => { - backoff.initial = Some( - yaml_to_duration_secs(v) - .filter(|d| !d.is_zero()) - .with_context(|| format!("invalid grpc.backoff.initial: {v:?}"))?, - ); + let Some(initial) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else { + bail!("invalid grpc.backoff.initial: {v:?}"); + }; + backoff.initial = Some(initial); } "max" => { - backoff.max = Some( - yaml_to_duration_secs(v) - .filter(|d| !d.is_zero()) - .with_context(|| format!("invalid grpc.backoff.max: {v:?}"))?, - ); + let Some(max) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else { + bail!("invalid grpc.backoff.max: {v:?}"); + }; + backoff.max = Some(max); } "jitter" => { let Some(jitter) = v.as_bool() else { @@ -422,14 +396,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig { backoff.jitter = Some(jitter); } "multiplier" => { - let multiplier = match v.as_f64().or_else(|| v.as_i64().map(|v| v as f64)) { - Some(m) if !m.is_finite() || m <= 1.0 => { - bail!("invalid grpc.backoff.multiplier: {v:?}") - } - None => { - bail!("invalid grpc.backoff.multiplier: {v:?}") - } - Some(m) => m, + let Some(multiplier) = v + .as_f64() + .or_else(|| v.as_i64().map(|v| v as f64)) + .filter(|mult| mult.is_finite() && *mult > 1.0) + else { + bail!("invalid grpc.backoff.multiplier: {v:?}"); }; backoff.multiplier = Some(multiplier); } @@ -571,6 +543,30 @@ impl TryFrom<&yaml::Hash> for BpfConfig { } } +fn parse_duration_secs(s: &str) -> anyhow::Result { + let f = s.parse::()?; + if !f.is_finite() || f < 0.0 { + bail!("value must be a non-negative finite number, got {f}"); + } + Ok(Duration::from_secs_f64(f)) +} + +fn parse_positive_duration_secs(s: &str) -> anyhow::Result { + let d = parse_duration_secs(s)?; + if d.is_zero() { + bail!("value must be greater than zero"); + } + Ok(d) +} + +fn parse_multiplier(s: &str) -> anyhow::Result { + let mult = s.parse::()?; + if !mult.is_finite() || mult <= 1.0 { + bail!("multiplier must be > 1.0, got {mult}"); + } + Ok(mult) +} + #[derive(Debug, Parser)] #[clap(version = crate::version::FACT_VERSION, about)] pub struct FactCli { @@ -607,14 +603,8 @@ pub struct FactCli { /// Enable jitter for gRPC reconnection backoff /// /// Default value is true - #[arg(long, overrides_with = "no_backoff_jitter", hide = true)] - backoff_jitter: bool, - #[arg( - long, - overrides_with = "backoff_jitter", - env = "FACT_GRPC_NO_BACKOFF_JITTER" - )] - no_backoff_jitter: bool, + #[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")] + backoff_jitter: Option, /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] @@ -709,7 +699,7 @@ impl FactCli { backoff: BackoffConfig { initial: self.backoff_initial, max: self.backoff_max, - jitter: resolve_bool_arg(self.backoff_jitter, self.no_backoff_jitter), + jitter: self.backoff_jitter, multiplier: self.backoff_multiplier, }, }, diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index b4764807..462d7370 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -1726,8 +1726,8 @@ fn env_vars() { ), ( EnvVar { - name: "FACT_GRPC_NO_BACKOFF_JITTER", - value: "true", + name: "FACT_GRPC_BACKOFF_JITTER", + value: "false", }, FactConfig { grpc: GrpcConfig { @@ -2167,10 +2167,10 @@ fn env_vars_invalid_values() { ), ( EnvVar { - name: "FACT_GRPC_NO_BACKOFF_JITTER", + name: "FACT_GRPC_BACKOFF_JITTER", value: "not_a_boolean", }, - "error: invalid value 'not_a_boolean' for '--no-backoff-jitter'", + "error: invalid value 'not_a_boolean' for '--backoff-jitter '", ), ( EnvVar {