diff --git a/CHANGELOG.md b/CHANGELOG.md index df3f5633..6ab0c7fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ possible include a PR number for easier tracking. ## Next +* feat(grpc): add exponential backoff for reconnection attempts (#789) * ROX-34502: reload mTLS certificates on each gRPC connection attempt (#788) * chore: add formatting and linting to integration test code (#783, #784) * feat: add code coverage with cargo-llvm-cov and Codecov upload (#745) diff --git a/Cargo.lock b/Cargo.lock index eb4a6560..bedc4a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,6 +262,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core 0.10.1", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -344,6 +355,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -454,6 +474,7 @@ dependencies = [ "prometheus-client", "prost", "prost-types", + "rand 0.10.1", "regex", "serde", "serde_json", @@ -606,6 +627,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -646,7 +668,7 @@ dependencies = [ "parking_lot", "portable-atomic", "quanta", - "rand", + "rand 0.9.3", "smallvec", "spinning_top", "web-time", @@ -1342,7 +1364,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166" dependencies = [ "rand_chacha", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.1", + "rand_core 0.10.1", ] [[package]] @@ -1352,7 +1385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", ] [[package]] @@ -1364,6 +1397,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -1427,7 +1466,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1605,10 +1644,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1032319c..59273075 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ openssl = "0.10.75" prometheus-client = { version = "0.24.0", default-features = false } prost = "0.14.0" prost-types = "0.14.0" +rand = { version = "0.10.1", default-features = false, features = ["thread_rng"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.142" shlex = "2.0.1" diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 1b2b63c8..8bba7aff 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -28,6 +28,7 @@ tokio-stream = { workspace = true } prometheus-client = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } shlex = { workspace = true } diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index b7530176..b286d7ca 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -23,7 +23,14 @@ const CONFIG_FILES: [&str; 4] = [ "fact.yaml", ]; -#[derive(Debug, Default, PartialEq, Eq, Clone)] +fn yaml_to_duration_secs(v: &Yaml) -> Option { + v.as_f64() + .or_else(|| v.as_i64().map(|i| i as f64)) + .filter(|s| s.is_finite() && *s >= 0.0) + .map(Duration::from_secs_f64) +} + +#[derive(Debug, Default, PartialEq, Clone)] pub struct FactConfig { paths: Option>, pub grpc: GrpcConfig, @@ -218,20 +225,11 @@ impl TryFrom> for FactConfig { config.hotreload = Some(hotreload); } "scan_interval" => { - // scan_internal == 0 disables the scanner - if let Some(scan_interval) = v.as_f64() { - if scan_interval < 0.0 { - bail!("invalid scan_interval: {scan_interval}"); - } - config.scan_interval = Some(Duration::from_secs_f64(scan_interval)); - } else if let Some(scan_interval) = v.as_i64() { - if scan_interval < 0 { - bail!("invalid scan_interval: {scan_interval}"); - } - config.scan_interval = Some(Duration::from_secs(scan_interval as u64)) - } else { - bail!("scan_interval field has incorrect type: {v:?}"); - } + // scan_interval == 0 disables the scanner + let Some(scan_interval) = yaml_to_duration_secs(v) else { + bail!("invalid scan_interval: {v:?}"); + }; + config.scan_interval = Some(scan_interval); } "rate_limit" => { // rate_limit == 0 means unlimited (no throttling) @@ -328,10 +326,97 @@ impl TryFrom<&yaml::Hash> for EndpointConfig { } } -#[derive(Debug, Default, PartialEq, Eq, Clone)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct BackoffConfig { + initial: Option, + max: Option, + jitter: Option, + multiplier: Option, +} + +impl BackoffConfig { + fn update(&mut self, from: &BackoffConfig) { + if let Some(initial) = from.initial { + self.initial = Some(initial); + } + if let Some(max) = from.max { + self.max = Some(max); + } + if let Some(jitter) = from.jitter { + self.jitter = Some(jitter); + } + if let Some(multiplier) = from.multiplier { + self.multiplier = Some(multiplier); + } + } + + pub fn initial(&self) -> Duration { + self.initial.unwrap_or(Duration::from_secs(1)) + } + + pub fn max(&self) -> Duration { + self.max.unwrap_or(Duration::from_secs(60)) + } + + pub fn jitter(&self) -> bool { + self.jitter.unwrap_or(true) + } + + pub fn multiplier(&self) -> f64 { + self.multiplier.unwrap_or(1.5) + } +} + +impl TryFrom<&yaml::Hash> for BackoffConfig { + type Error = anyhow::Error; + + fn try_from(value: &yaml::Hash) -> Result { + let mut backoff = BackoffConfig::default(); + for (k, v) in value.iter() { + let Some(k) = k.as_str() else { + bail!("key is not string: {k:?}"); + }; + match k { + "initial" => { + let Some(initial) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else { + bail!("invalid grpc.backoff.initial: {v:?}"); + }; + backoff.initial = Some(initial); + } + "max" => { + let Some(max) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else { + bail!("invalid grpc.backoff.max: {v:?}"); + }; + backoff.max = Some(max); + } + "jitter" => { + let Some(jitter) = v.as_bool() else { + bail!("grpc.backoff.jitter field has incorrect type: {v:?}"); + }; + backoff.jitter = Some(jitter); + } + "multiplier" => { + let Some(multiplier) = v + .as_f64() + .or_else(|| v.as_i64().map(|v| v as f64)) + .filter(|mult| mult.is_finite() && *mult > 1.0) + else { + bail!("invalid grpc.backoff.multiplier: {v:?}"); + }; + backoff.multiplier = Some(multiplier); + } + name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"), + } + } + Ok(backoff) + } +} + +#[derive(Debug, Default, PartialEq, Clone)] pub struct GrpcConfig { url: Option, certs: Option, + pub backoff: BackoffConfig, } impl GrpcConfig { @@ -343,6 +428,8 @@ impl GrpcConfig { if let Some(certs) = from.certs.as_deref() { self.certs = Some(certs.to_owned()); } + + self.backoff.update(&from.backoff); } pub fn url(&self) -> Option<&str> { @@ -377,6 +464,12 @@ impl TryFrom<&yaml::Hash> for GrpcConfig { }; grpc.certs = Some(PathBuf::from(certs)); } + "backoff" => { + let Some(backoff) = v.as_hash() else { + bail!("grpc.backoff section has incorrect type: {v:?}"); + }; + grpc.backoff = BackoffConfig::try_from(backoff)?; + } name => bail!("Invalid field 'grpc.{name}' with value: {v:?}"), } } @@ -450,6 +543,30 @@ impl TryFrom<&yaml::Hash> for BpfConfig { } } +fn parse_duration_secs(s: &str) -> anyhow::Result { + let f = s.parse::()?; + if !f.is_finite() || f < 0.0 { + bail!("value must be a non-negative finite number, got {f}"); + } + Ok(Duration::from_secs_f64(f)) +} + +fn parse_positive_duration_secs(s: &str) -> anyhow::Result { + let d = parse_duration_secs(s)?; + if d.is_zero() { + bail!("value must be greater than zero"); + } + Ok(d) +} + +fn parse_multiplier(s: &str) -> anyhow::Result { + let mult = s.parse::()?; + if !mult.is_finite() || mult <= 1.0 { + bail!("multiplier must be > 1.0, got {mult}"); + } + Ok(mult) +} + #[derive(Debug, Parser)] #[clap(version = crate::version::FACT_VERSION, about)] pub struct FactCli { @@ -465,6 +582,30 @@ pub struct FactCli { #[arg(short, long, env = "FACT_CERTS")] certs: Option, + /// Initial backoff delay in seconds for gRPC reconnection + /// + /// Default value is 1 second + #[arg(long, env = "FACT_GRPC_BACKOFF_INITIAL", value_parser = parse_positive_duration_secs)] + backoff_initial: Option, + + /// Maximum backoff delay in seconds for gRPC reconnection + /// + /// Default value is 60 seconds + #[arg(long, env = "FACT_GRPC_BACKOFF_MAX", value_parser = parse_positive_duration_secs)] + backoff_max: Option, + + /// Backoff multiplier for gRPC reconnection + /// + /// Must be > 1.0. Default value is 1.5 + #[arg(long, env = "FACT_GRPC_BACKOFF_MULTIPLIER", value_parser = parse_multiplier)] + backoff_multiplier: Option, + + /// Enable jitter for gRPC reconnection backoff + /// + /// Default value is true + #[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")] + backoff_jitter: Option, + /// The port to bind for all exposed endpoints #[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")] address: Option, @@ -535,8 +676,8 @@ pub struct FactCli { /// The seconds can use a decimal point for fractions of seconds. /// /// Default value is 30 seconds - #[arg(long, short, env = "FACT_SCAN_INTERVAL")] - scan_interval: Option, + #[arg(long, short, env = "FACT_SCAN_INTERVAL", value_parser = parse_duration_secs)] + scan_interval: Option, /// Maximum number of file events to allow per second /// @@ -555,6 +696,12 @@ impl FactCli { grpc: GrpcConfig { url: self.url.clone(), certs: self.certs.clone(), + backoff: BackoffConfig { + initial: self.backoff_initial, + max: self.backoff_max, + jitter: self.backoff_jitter, + multiplier: self.backoff_multiplier, + }, }, endpoint: EndpointConfig { address: self.address, @@ -568,7 +715,7 @@ impl FactCli { skip_pre_flight: resolve_bool_arg(self.skip_pre_flight, self.no_skip_pre_flight), json: resolve_bool_arg(self.json, self.no_json), hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload), - scan_interval: self.scan_interval.map(Duration::from_secs_f64), + scan_interval: self.scan_interval, rate_limit: self.rate_limit, } } diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index f25d86c5..462d7370 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -255,6 +255,113 @@ fn parsing() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 30 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + jitter: false + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 3.5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(3.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 0.5 + max: 120 + jitter: false + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + jitter: Some(false), + multiplier: Some(2.0), + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" paths: @@ -262,6 +369,11 @@ fn parsing() { grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs + backoff: + initial: 0.5 + max: 120 + jitter: false + multiplier: 2 endpoint: address: 0.0.0.0:8080 expose_metrics: true @@ -279,6 +391,12 @@ fn parsing() { grpc: GrpcConfig { url: Some(String::from("https://svc.sensor.stackrox:9090")), certs: Some(PathBuf::from("/etc/stackrox/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + jitter: Some(false), + multiplier: Some(2.0), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 8080))), @@ -346,6 +464,93 @@ paths: "#, "certs field has incorrect type: Boolean(true)", ), + ( + r#" + grpc: + backoff: true + "#, + "grpc.backoff section has incorrect type: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + initial: true + "#, + "invalid grpc.backoff.initial: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + max: true + "#, + "invalid grpc.backoff.max: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + initial: 0 + "#, + "invalid grpc.backoff.initial: Integer(0)", + ), + ( + r#" + grpc: + backoff: + initial: -1 + "#, + "invalid grpc.backoff.initial: Integer(-1)", + ), + ( + r#" + grpc: + backoff: + max: 0 + "#, + "invalid grpc.backoff.max: Integer(0)", + ), + ( + r#" + grpc: + backoff: + max: -5 + "#, + "invalid grpc.backoff.max: Integer(-5)", + ), + ( + r#" + grpc: + backoff: + jitter: 4 + "#, + "grpc.backoff.jitter field has incorrect type: Integer(4)", + ), + ( + r#" + grpc: + backoff: + multiplier: true + "#, + "invalid grpc.backoff.multiplier: Boolean(true)", + ), + ( + r#" + grpc: + backoff: + multiplier: 0.5 + "#, + "invalid grpc.backoff.multiplier: Real(\"0.5\")", + ), + ( + r#" + grpc: + backoff: + unknown: 4 + "#, + "Invalid field 'grpc.backoff.unknown' with value: Integer(4)", + ), ( "endpoint: true", "Invalid field 'endpoint' with value: Boolean(true)", @@ -483,10 +688,16 @@ paths: ), ( "scan_interval: true", - "scan_interval field has incorrect type: Boolean(true)", + "invalid scan_interval: Boolean(true)", + ), + ( + "scan_interval: -128", + "invalid scan_interval: Integer(-128)", + ), + ( + "scan_interval: -128.5", + "invalid scan_interval: Real(\"-128.5\")", ), - ("scan_interval: -128", "invalid scan_interval: -128"), - ("scan_interval: -128.5", "invalid scan_interval: -128.5"), ("unknown:", "Invalid field 'unknown' with value: Null"), ]; for (input, expected) in tests { @@ -658,6 +869,195 @@ fn update() { ..Default::default() }, ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(2)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + initial: 5 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(30)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + max: 120 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig::default(), + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + r#" + grpc: + backoff: + multiplier: 2 + "#, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(1.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.0), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( r#" endpoint: @@ -1020,6 +1420,11 @@ fn update() { grpc: url: 'https://svc.sensor.stackrox:9090' certs: /etc/stackrox/certs + backoff: + initial: 0.5 + max: 120 + jitter: false + multiplier: 3.0 endpoint: address: 127.0.0.1:8080 expose_metrics: true @@ -1037,6 +1442,12 @@ fn update() { grpc: GrpcConfig { url: Some(String::from("http://localhost")), certs: Some(PathBuf::from("/etc/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs(15)), + max: Some(Duration::from_secs(30)), + jitter: Some(true), + multiplier: Some(2.0), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([0, 0, 0, 0], 9000))), @@ -1058,6 +1469,12 @@ fn update() { grpc: GrpcConfig { url: Some(String::from("https://svc.sensor.stackrox:9090")), certs: Some(PathBuf::from("/etc/stackrox/certs")), + backoff: BackoffConfig { + initial: Some(Duration::from_secs_f64(0.5)), + max: Some(Duration::from_secs(120)), + jitter: Some(false), + multiplier: Some(3.0), + }, }, endpoint: EndpointConfig { address: Some(SocketAddr::from(([127, 0, 0, 1], 8080))), @@ -1104,6 +1521,10 @@ fn defaults() { assert_eq!(config.bpf.ringbuf_size(), 8192); assert_eq!(config.bpf.inodes_max(), 65536); assert!(config.hotreload()); + assert_eq!(config.grpc.backoff.initial(), Duration::from_secs(1)); + assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60)); + assert!(config.grpc.backoff.jitter()); + assert_eq!(config.grpc.backoff.multiplier(), 1.5); } static ENV_MUTEX: Mutex<()> = Mutex::new(()); @@ -1225,6 +1646,16 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_SCAN_INTERVAL", + value: "0", + }, + FactConfig { + scan_interval: Some(Duration::ZERO), + ..Default::default() + }, + ), ( EnvVar { name: "FACT_RATE_LIMIT", @@ -1261,6 +1692,70 @@ fn env_vars() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "120", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_JITTER", + value: "false", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + jitter: Some(false), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "2.5", + }, + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + multiplier: Some(2.5), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_ENDPOINT_ADDRESS", @@ -1372,6 +1867,40 @@ fn env_vars_override_yaml() { ..Default::default() }, ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "5", + }, + "grpc:\n backoff:\n initial: 2", + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + initial: Some(Duration::from_secs(5)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "120", + }, + "grpc:\n backoff:\n max: 30", + FactConfig { + grpc: GrpcConfig { + backoff: BackoffConfig { + max: Some(Duration::from_secs(120)), + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }, + ), ( EnvVar { name: "FACT_PATHS", @@ -1587,6 +2116,76 @@ fn env_vars_invalid_values() { }, "error: invalid value 'not_a_boolean' for '--hotreload'", ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-initial ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-max ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "0", + }, + "error: invalid value '0' for '--backoff-initial ': value must be greater than zero", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "0", + }, + "error: invalid value '0' for '--backoff-max ': value must be greater than zero", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_INITIAL", + value: "-1", + }, + "error: invalid value '-1' for '--backoff-initial ': value must be a non-negative finite number, got -1", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MAX", + value: "-1", + }, + "error: invalid value '-1' for '--backoff-max ': value must be a non-negative finite number, got -1", + ), + ( + EnvVar { + name: "FACT_SCAN_INTERVAL", + value: "-1", + }, + "error: invalid value '-1' for '--scan-interval ': value must be a non-negative finite number, got -1", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_JITTER", + value: "not_a_boolean", + }, + "error: invalid value 'not_a_boolean' for '--backoff-jitter '", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "not_a_number", + }, + "error: invalid value 'not_a_number' for '--backoff-multiplier ': invalid float literal", + ), + ( + EnvVar { + name: "FACT_GRPC_BACKOFF_MULTIPLIER", + value: "0.5", + }, + "error: invalid value '0.5' for '--backoff-multiplier ': multiplier must be > 1.0, got 0.5", + ), ]; for (env, expected) in tests { let Err(err) = with_env_var(env) else { diff --git a/fact/src/output/grpc.rs b/fact/src/output/grpc.rs index 259d6004..2daae5ef 100644 --- a/fact/src/output/grpc.rs +++ b/fact/src/output/grpc.rs @@ -18,7 +18,59 @@ use tokio_stream::{ }; use tonic::transport::Channel; -use crate::{config::GrpcConfig, event::Event, metrics::EventCounter}; +use crate::{ + config::{BackoffConfig, GrpcConfig}, + event::Event, + metrics::EventCounter, +}; + +struct Backoff { + initial: Duration, + current: Duration, + max: Duration, + jitter: bool, + multiplier: f64, +} + +impl Backoff { + fn new(initial: Duration, max: Duration, jitter: bool, multiplier: f64) -> Self { + Self { + initial, + current: initial, + max, + jitter, + multiplier, + } + } + + fn next(&mut self) -> Duration { + let delay = self.current; + self.current = Duration::from_secs_f64( + (self.current.as_secs_f64() * self.multiplier).min(self.max.as_secs_f64()), + ); + if self.jitter { + let nanos = rand::random_range(0..=delay.as_nanos() as u64); + Duration::from_nanos(nanos) + } else { + delay + } + } + + fn reset(&mut self) { + self.current = self.initial; + } +} + +impl From<&BackoffConfig> for Backoff { + fn from(value: &BackoffConfig) -> Self { + Backoff::new( + value.initial(), + value.max(), + value.jitter(), + value.multiplier(), + ) + } +} pub struct Client { rx: broadcast::Receiver>, @@ -121,6 +173,7 @@ impl Client { } async fn run(&mut self) -> anyhow::Result { + let mut backoff = Backoff::from(&self.config.borrow().backoff); loop { // Re-read certs on each connection attempt so rotated certificates // on disk are picked up on the next reconnect. @@ -129,12 +182,14 @@ impl Client { let channel = match self.create_channel(connector).await { Ok(channel) => channel, Err(e) => { - debug!("Failed to connect to server: {e:?}"); - sleep(Duration::from_secs(1)).await; + let delay = backoff.next(); + debug!("Failed to connect to server: {e:?}, retrying in {delay:?}"); + sleep(delay).await; continue; } }; info!("Successfully connected to gRPC server"); + backoff.reset(); let mut client = FileActivityServiceClient::new(channel); @@ -177,3 +232,76 @@ impl Client { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_exponential_2x() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Duration::from_secs(4)); + assert_eq!(b.next(), Duration::from_secs(8)); + assert_eq!(b.next(), Duration::from_secs(16)); + assert_eq!(b.next(), Duration::from_secs(32)); + } + + #[test] + fn backoff_default_multiplier() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 1.5); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_millis(1500)); + assert_eq!(b.next(), Duration::from_millis(2250)); + assert_eq!(b.next(), Duration::from_millis(3375)); + } + + #[test] + fn backoff_caps_at_max() { + let mut b = Backoff::new(Duration::from_secs(32), Duration::from_secs(60), false, 2.0); + assert_eq!(b.next(), Duration::from_secs(32)); + assert_eq!(b.next(), Duration::from_secs(60)); + assert_eq!(b.next(), Duration::from_secs(60)); + } + + #[test] + fn backoff_reset() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), false, 2.0); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + assert_eq!(b.next(), Duration::from_secs(4)); + b.reset(); + assert_eq!(b.next(), Duration::from_secs(1)); + assert_eq!(b.next(), Duration::from_secs(2)); + } + + #[test] + fn backoff_jitter_within_range() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + let mut expected_max = Duration::from_secs(1); + for _ in 0..100 { + let delay = b.next(); + assert!( + delay <= expected_max, + "delay {delay:?} exceeded expected max {expected_max:?}" + ); + let nanos = expected_max.as_nanos() as u64 * 1500 / 1000; + expected_max = Duration::from_nanos(nanos).min(Duration::from_secs(60)); + } + } + + #[test] + fn backoff_jitter_reset() { + let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(60), true, 1.5); + for _ in 0..5 { + b.next(); + } + b.reset(); + let delay = b.next(); + assert!( + delay <= Duration::from_secs(1), + "delay {delay:?} exceeded 1s after reset" + ); + } +}