From 8107fcd94099b89efe49e8cc3918fb5c885da299 Mon Sep 17 00:00:00 2001 From: Dan Draper Date: Fri, 29 May 2026 23:39:36 +1000 Subject: [PATCH 1/3] fix(passthrough): drain per-statement queues to stop memory leak In passthrough the backend returned before calling complete_execution() / finish_session() -- the only drains for the per-connection execute and session_metrics queues. The frontend enqueues a session and an execute for every statement (mapped or not), so in passthrough those queues grew by one entry per statement and were never reclaimed, leaking ~1KB/statement until the process was OOM-killed. This only manifested with an empty encrypt config, since is_passthrough() = encrypt_config.is_empty() || mapping_disabled(); with a config present even plaintext statements take the mapped path and are drained. Drain the execute and session_metrics queues on execute-terminating messages in the passthrough branch too, mirroring the mapped path. Adds a regression test asserting a full statement lifecycle leaves both queues empty. Fixes BUG-300 --- .../src/postgresql/backend.rs | 21 +++++++++++++ .../src/postgresql/context/mod.rs | 31 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index 9e832e6e..a29dd187 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -178,6 +178,27 @@ where msg = "Passthrough enabled" ); self.write_with_flush(bytes).await?; + + // The frontend starts a session and enqueues an execute for every + // statement (start_session / set_execute), regardless of whether + // the statement is mapped. Those per-connection queues are only + // drained by complete_execution()/finish_session(), which are + // normally called when an execute terminates (below). Because the + // passthrough path returns early, we must drain them here too — + // otherwise the execute and session_metrics queues grow by one + // entry per statement and never shrink, leaking memory until the + // process is OOM-killed. See BUG-300. + match code.into() { + BackendCode::CommandComplete + | BackendCode::EmptyQueryResponse + | BackendCode::PortalSuspended + | BackendCode::ErrorResponse => { + self.context.complete_execution(); + self.context.finish_session(); + } + _ => {} + } + return Ok(()); } diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index 602b3a9e..8cbbde76 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -1176,6 +1176,37 @@ mod tests { assert!(portal.is_some()); } + /// Regression test for BUG-300 (passthrough memory leak). + /// + /// The frontend starts a session and enqueues an execute for every + /// statement — including passthrough statements. The `execute` and + /// `session_metrics` queues are only drained by `complete_execution()` / + /// `finish_session()`. Before the fix, the passthrough path in the backend + /// returned without calling these, so the queues grew by one entry per + /// statement and leaked memory until OOM. This asserts that a full + /// statement lifecycle leaves both queues empty, regardless of how many + /// statements are processed. + #[test] + pub fn statement_lifecycle_does_not_grow_queues() { + log::init(LogConfig::default()); + + let mut context = create_context(); + + for _ in 0..1000 { + // Frontend: a session + execute are enqueued for every statement. + let session_id = context.start_session(); + context.set_execute(Name::unnamed(), Some(session_id)); + + // Backend: drain performed on an execute-terminating message + // (CommandComplete / ErrorResponse / ...), including in passthrough. + context.complete_execution(); + context.finish_session(); + } + + assert_eq!(context.execute.read().unwrap().queue.len(), 0); + assert_eq!(context.session_metrics.read().unwrap().queue.len(), 0); + } + #[test] pub fn add_and_close_portals() { log::init(LogConfig::default()); From 15a4684fdbec7d253aeac5ecc0fe49e2927882ad Mon Sep 17 00:00:00 2001 From: Dan Draper Date: Mon, 1 Jun 2026 11:02:08 +1000 Subject: [PATCH 2/3] test(passthrough): add backend-level regression test for BUG-300 The existing test called complete_execution()/finish_session() directly, so it passed even against the pre-fix backend (which early-returned in the passthrough branch without draining). It exercised the drain primitives, not the regression. Add passthrough_drains_queues_on_execute_terminating_message in backend.rs, which drives Backend::rewrite() through the passthrough branch with an execute-terminating CommandComplete and asserts the per-connection execute and session_metrics queues stay empty across many statements. This fails against the pre-fix backend, so it actually guards the bug. Rename the original test to complete_execution_and_finish_session_drain_queues and reframe its docs as a primitives-level unit test that points at the new backend test. Add #[cfg(test)] execute_queue_len()/session_metrics_queue_len() accessors so tests assert queue state without reaching into private fields. --- .../src/postgresql/backend.rs | 130 ++++++++++++++++++ .../src/postgresql/context/mod.rs | 48 +++++-- 2 files changed, 164 insertions(+), 14 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index a29dd187..23441914 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -726,3 +726,133 @@ where Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{LogConfig, TandemConfig}; + use crate::log; + use crate::postgresql::context::KeysetIdentifier; + use crate::postgresql::messages::Name; + use crate::proxy::{EncryptConfig, EncryptionService}; + use eql_mapper::Schema; + use std::io::Cursor; + use std::sync::Arc; + use tokio::sync::mpsc; + + struct TestService {} + + #[async_trait::async_trait] + impl EncryptionService for TestService { + async fn encrypt( + &self, + _keyset_id: Option, + _plaintexts: Vec>, + _columns: &[Option], + ) -> Result>, Error> { + Ok(vec![]) + } + + async fn decrypt( + &self, + _keyset_id: Option, + _ciphertexts: Vec>, + ) -> Result>, Error> { + Ok(vec![]) + } + } + + /// Builds a Context in passthrough mode (empty encrypt config), which is the + /// configuration that triggers BUG-300. + fn passthrough_context() -> Context { + let config = Arc::new(TandemConfig::for_testing()); + let encrypt_config = Arc::new(EncryptConfig::default()); + let schema = Arc::new(Schema::new("public")); + let (reload_sender, _reload_receiver) = mpsc::unbounded_channel(); + + Context::new( + 1, + config, + encrypt_config, + schema, + TestService {}, + reload_sender, + ) + } + + /// Encodes a `CommandComplete` backend message on the wire: + /// `'C'` + Int32 length (body + 4) + null-terminated command tag. + fn command_complete_bytes() -> BytesMut { + let tag = b"SELECT 1\0"; + let len = (tag.len() + 4) as i32; + + let mut bytes = BytesMut::new(); + bytes.extend_from_slice(b"C"); + bytes.extend_from_slice(&len.to_be_bytes()); + bytes.extend_from_slice(tag); + bytes + } + + /// Regression test for BUG-300 (passthrough memory leak). + /// + /// The frontend enqueues a session + execute for *every* statement. Those + /// per-connection `execute` / `session_metrics` queues are only drained by + /// `complete_execution()` / `finish_session()`. Before the fix, the + /// passthrough branch in `rewrite()` returned early without calling these, + /// so the queues grew by one entry per statement and leaked until OOM. + /// + /// This drives `Backend::rewrite()` through the passthrough branch with an + /// execute-terminating `CommandComplete` message and asserts both queues + /// stay empty across many statements. It fails against the pre-fix backend + /// (which never drained in passthrough) — i.e. it actually guards the bug. + #[tokio::test] + async fn passthrough_drains_queues_on_execute_terminating_message() { + log::init(LogConfig::default()); + + const STATEMENTS: usize = 1000; + + let context = passthrough_context(); + assert!( + context.is_passthrough(), + "test context must be in passthrough mode" + ); + + // A stream of CommandComplete messages — one per statement — that the + // backend reads from the "server". + let message = command_complete_bytes(); + let mut server_bytes = BytesMut::new(); + for _ in 0..STATEMENTS { + server_bytes.extend_from_slice(&message); + } + + // Keep the client receiver alive so write_with_flush succeeds. + let (client_sender, _client_receiver) = mpsc::unbounded_channel(); + let reader = Cursor::new(server_bytes.to_vec()); + let mut backend = Backend::new(client_sender, reader, context); + + for i in 0..STATEMENTS { + // Frontend: enqueue a session + execute for the statement. + let session_id = backend.context.start_session(); + backend + .context + .set_execute(Name::unnamed(), Some(session_id)); + + // Backend: process the server's CommandComplete via the passthrough + // path, which must drain the queues. + backend.rewrite().await.unwrap(); + + // The queues must be drained every iteration — not grow by one per + // statement (the BUG-300 leak). + assert_eq!( + backend.context.execute_queue_len(), + 0, + "execute queue not drained at statement {i}" + ); + assert_eq!( + backend.context.session_metrics_queue_len(), + 0, + "session_metrics queue not drained at statement {i}" + ); + } + } +} diff --git a/packages/cipherstash-proxy/src/postgresql/context/mod.rs b/packages/cipherstash-proxy/src/postgresql/context/mod.rs index 8cbbde76..ab3052b5 100644 --- a/packages/cipherstash-proxy/src/postgresql/context/mod.rs +++ b/packages/cipherstash-proxy/src/postgresql/context/mod.rs @@ -308,6 +308,24 @@ where self.set_execute(name, session_id); } + /// Number of entries currently queued in the `execute` queue. + /// + /// Test-only accessor used by the BUG-300 regression tests to assert the + /// per-connection queues are drained (and do not grow unbounded). + #[cfg(test)] + pub(crate) fn execute_queue_len(&self) -> usize { + self.execute.read().unwrap().queue.len() + } + + /// Number of entries currently queued in the `session_metrics` queue. + /// + /// Test-only accessor used by the BUG-300 regression tests to assert the + /// per-connection queues are drained (and do not grow unbounded). + #[cfg(test)] + pub(crate) fn session_metrics_queue_len(&self) -> usize { + self.session_metrics.read().unwrap().queue.len() + } + /// Marks the current Execution as Complete. /// /// Transfers accumulated timing data from ExecuteContext to SessionMetricsContext.phase_timing: @@ -1176,18 +1194,20 @@ mod tests { assert!(portal.is_some()); } - /// Regression test for BUG-300 (passthrough memory leak). + /// Unit test for the queue-draining primitives. + /// + /// `complete_execution()` / `finish_session()` are the only drains for the + /// per-connection `execute` / `session_metrics` queues. This asserts that + /// calling them after enqueuing a session + execute leaves both queues + /// empty, across many iterations. /// - /// The frontend starts a session and enqueues an execute for every - /// statement — including passthrough statements. The `execute` and - /// `session_metrics` queues are only drained by `complete_execution()` / - /// `finish_session()`. Before the fix, the passthrough path in the backend - /// returned without calling these, so the queues grew by one entry per - /// statement and leaked memory until OOM. This asserts that a full - /// statement lifecycle leaves both queues empty, regardless of how many - /// statements are processed. + /// Note: this exercises the primitives directly — it does *not* drive the + /// backend passthrough path that actually caused BUG-300 (that early + /// returned without calling these). The backend-level regression test for + /// BUG-300 lives in `backend.rs` + /// (`passthrough_drains_queues_on_execute_terminating_message`). #[test] - pub fn statement_lifecycle_does_not_grow_queues() { + pub fn complete_execution_and_finish_session_drain_queues() { log::init(LogConfig::default()); let mut context = create_context(); @@ -1197,14 +1217,14 @@ mod tests { let session_id = context.start_session(); context.set_execute(Name::unnamed(), Some(session_id)); - // Backend: drain performed on an execute-terminating message - // (CommandComplete / ErrorResponse / ...), including in passthrough. + // Drain primitives, normally called by the backend on an + // execute-terminating message (CommandComplete / ErrorResponse / …). context.complete_execution(); context.finish_session(); } - assert_eq!(context.execute.read().unwrap().queue.len(), 0); - assert_eq!(context.session_metrics.read().unwrap().queue.len(), 0); + assert_eq!(context.execute_queue_len(), 0); + assert_eq!(context.session_metrics_queue_len(), 0); } #[test] From 8f610ab3e682b969059d856574ef56932033e0c8 Mon Sep 17 00:00:00 2001 From: Dan Draper Date: Mon, 1 Jun 2026 12:32:10 +1000 Subject: [PATCH 3/3] test(passthrough): cover all execute-terminating codes (BUG-300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback: the regression test only fed CommandComplete, but the passthrough drain matches CommandComplete | EmptyQueryResponse | PortalSuspended | ErrorResponse. Three arms were uncovered — dropping any of them would have left the test green while the leak returned for that code. Make the test table-driven over all four terminating codes. Factor a backend_message() wire-framing helper and one encoder per code. Verified the test fails when any single arm is removed from the drain match. --- .../src/postgresql/backend.rs | 142 +++++++++++------- 1 file changed, 89 insertions(+), 53 deletions(-) diff --git a/packages/cipherstash-proxy/src/postgresql/backend.rs b/packages/cipherstash-proxy/src/postgresql/backend.rs index 23441914..b1a5c4b1 100644 --- a/packages/cipherstash-proxy/src/postgresql/backend.rs +++ b/packages/cipherstash-proxy/src/postgresql/backend.rs @@ -780,19 +780,44 @@ mod tests { ) } - /// Encodes a `CommandComplete` backend message on the wire: - /// `'C'` + Int32 length (body + 4) + null-terminated command tag. - fn command_complete_bytes() -> BytesMut { - let tag = b"SELECT 1\0"; - let len = (tag.len() + 4) as i32; + /// Encodes a backend message as wire bytes (one per execute-terminating code). + type MessageEncoder = fn() -> BytesMut; + + /// Frame a backend message on the wire: 1-byte code + Int32 length + /// (body length + 4) + body. Sufficient for the passthrough path, which + /// matches on the code only and does not parse the body. + fn backend_message(code: u8, body: &[u8]) -> BytesMut { + let len = (body.len() + 4) as i32; let mut bytes = BytesMut::new(); - bytes.extend_from_slice(b"C"); + bytes.extend_from_slice(&[code]); bytes.extend_from_slice(&len.to_be_bytes()); - bytes.extend_from_slice(tag); + bytes.extend_from_slice(body); bytes } + /// `'C'` CommandComplete, carrying a command tag. + fn command_complete_bytes() -> BytesMut { + backend_message(b'C', b"SELECT 1\0") + } + + /// `'I'` EmptyQueryResponse — no body. + fn empty_query_response_bytes() -> BytesMut { + backend_message(b'I', b"") + } + + /// `'s'` PortalSuspended — no body. + fn portal_suspended_bytes() -> BytesMut { + backend_message(b's', b"") + } + + /// `'E'` ErrorResponse — a sequence of (field-type, C-string) pairs + /// terminated by a zero byte. Content is irrelevant here: the passthrough + /// path forwards the bytes and matches on the code without parsing them. + fn error_response_bytes() -> BytesMut { + backend_message(b'E', b"SERROR\0CXX000\0Mboom\0\0") + } + /// Regression test for BUG-300 (passthrough memory leak). /// /// The frontend enqueues a session + execute for *every* statement. Those @@ -801,58 +826,69 @@ mod tests { /// passthrough branch in `rewrite()` returned early without calling these, /// so the queues grew by one entry per statement and leaked until OOM. /// - /// This drives `Backend::rewrite()` through the passthrough branch with an - /// execute-terminating `CommandComplete` message and asserts both queues - /// stay empty across many statements. It fails against the pre-fix backend - /// (which never drained in passthrough) — i.e. it actually guards the bug. + /// This drives `Backend::rewrite()` through the passthrough branch and + /// asserts both queues stay empty across many statements — once for *each* + /// execute-terminating message code the fix drains on, so dropping any arm + /// of that match is caught. It fails against the pre-fix backend (which + /// never drained in passthrough) — i.e. it actually guards the bug. #[tokio::test] async fn passthrough_drains_queues_on_execute_terminating_message() { log::init(LogConfig::default()); - const STATEMENTS: usize = 1000; - - let context = passthrough_context(); - assert!( - context.is_passthrough(), - "test context must be in passthrough mode" - ); + const STATEMENTS: usize = 256; + + // Every code that terminates the execute phase must drain the queues. + let cases: [(&str, MessageEncoder); 4] = [ + ("CommandComplete", command_complete_bytes), + ("EmptyQueryResponse", empty_query_response_bytes), + ("PortalSuspended", portal_suspended_bytes), + ("ErrorResponse", error_response_bytes), + ]; + + for (label, encode) in cases { + let context = passthrough_context(); + assert!( + context.is_passthrough(), + "test context must be in passthrough mode" + ); - // A stream of CommandComplete messages — one per statement — that the - // backend reads from the "server". - let message = command_complete_bytes(); - let mut server_bytes = BytesMut::new(); - for _ in 0..STATEMENTS { - server_bytes.extend_from_slice(&message); - } + // A stream of identical terminating messages — one per statement — + // that the backend reads from the "server". + let message = encode(); + let mut server_bytes = BytesMut::new(); + for _ in 0..STATEMENTS { + server_bytes.extend_from_slice(&message); + } - // Keep the client receiver alive so write_with_flush succeeds. - let (client_sender, _client_receiver) = mpsc::unbounded_channel(); - let reader = Cursor::new(server_bytes.to_vec()); - let mut backend = Backend::new(client_sender, reader, context); - - for i in 0..STATEMENTS { - // Frontend: enqueue a session + execute for the statement. - let session_id = backend.context.start_session(); - backend - .context - .set_execute(Name::unnamed(), Some(session_id)); - - // Backend: process the server's CommandComplete via the passthrough - // path, which must drain the queues. - backend.rewrite().await.unwrap(); - - // The queues must be drained every iteration — not grow by one per - // statement (the BUG-300 leak). - assert_eq!( - backend.context.execute_queue_len(), - 0, - "execute queue not drained at statement {i}" - ); - assert_eq!( - backend.context.session_metrics_queue_len(), - 0, - "session_metrics queue not drained at statement {i}" - ); + // Keep the client receiver alive so write_with_flush succeeds. + let (client_sender, _client_receiver) = mpsc::unbounded_channel(); + let reader = Cursor::new(server_bytes.to_vec()); + let mut backend = Backend::new(client_sender, reader, context); + + for i in 0..STATEMENTS { + // Frontend: enqueue a session + execute for the statement. + let session_id = backend.context.start_session(); + backend + .context + .set_execute(Name::unnamed(), Some(session_id)); + + // Backend: process the terminating message via the passthrough + // path, which must drain the queues. + backend.rewrite().await.unwrap(); + + // The queues must be drained every iteration — not grow by one + // per statement (the BUG-300 leak). + assert_eq!( + backend.context.execute_queue_len(), + 0, + "{label}: execute queue not drained at statement {i}" + ); + assert_eq!( + backend.context.session_metrics_queue_len(), + 0, + "{label}: session_metrics queue not drained at statement {i}" + ); + } } } }