Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions packages/cipherstash-proxy/src/postgresql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,27 @@ where
msg = "Passthrough enabled"
);
self.write_with_flush(bytes).await?;

// The frontend starts a session and enqueues an execute for every
// statement (start_session / set_execute), regardless of whether
// the statement is mapped. Those per-connection queues are only
// drained by complete_execution()/finish_session(), which are
// normally called when an execute terminates (below). Because the
// passthrough path returns early, we must drain them here too —
// otherwise the execute and session_metrics queues grow by one
// entry per statement and never shrink, leaking memory until the
// process is OOM-killed. See BUG-300.
match code.into() {
BackendCode::CommandComplete
| BackendCode::EmptyQueryResponse
| BackendCode::PortalSuspended
| BackendCode::ErrorResponse => {
self.context.complete_execution();
self.context.finish_session();
}
_ => {}
}

return Ok(());
}

Expand Down Expand Up @@ -705,3 +726,169 @@ where
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::config::{LogConfig, TandemConfig};
use crate::log;
use crate::postgresql::context::KeysetIdentifier;
use crate::postgresql::messages::Name;
use crate::proxy::{EncryptConfig, EncryptionService};
use eql_mapper::Schema;
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::mpsc;

struct TestService {}

#[async_trait::async_trait]
impl EncryptionService for TestService {
async fn encrypt(
&self,
_keyset_id: Option<KeysetIdentifier>,
_plaintexts: Vec<Option<cipherstash_client::encryption::Plaintext>>,
_columns: &[Option<Column>],
) -> Result<Vec<Option<crate::EqlCiphertext>>, Error> {
Ok(vec![])
}

async fn decrypt(
&self,
_keyset_id: Option<KeysetIdentifier>,
_ciphertexts: Vec<Option<crate::EqlCiphertext>>,
) -> Result<Vec<Option<cipherstash_client::encryption::Plaintext>>, Error> {
Ok(vec![])
}
}

/// Builds a Context in passthrough mode (empty encrypt config), which is the
/// configuration that triggers BUG-300.
fn passthrough_context() -> Context<TestService> {
let config = Arc::new(TandemConfig::for_testing());
let encrypt_config = Arc::new(EncryptConfig::default());
let schema = Arc::new(Schema::new("public"));
let (reload_sender, _reload_receiver) = mpsc::unbounded_channel();

Context::new(
1,
config,
encrypt_config,
schema,
TestService {},
reload_sender,
)
}

/// Encodes a backend message as wire bytes (one per execute-terminating code).
type MessageEncoder = fn() -> BytesMut;

/// Frame a backend message on the wire: 1-byte code + Int32 length
/// (body length + 4) + body. Sufficient for the passthrough path, which
/// matches on the code only and does not parse the body.
fn backend_message(code: u8, body: &[u8]) -> BytesMut {
let len = (body.len() + 4) as i32;

let mut bytes = BytesMut::new();
bytes.extend_from_slice(&[code]);
bytes.extend_from_slice(&len.to_be_bytes());
bytes.extend_from_slice(body);
bytes
}

/// `'C'` CommandComplete, carrying a command tag.
fn command_complete_bytes() -> BytesMut {
backend_message(b'C', b"SELECT 1\0")
}

/// `'I'` EmptyQueryResponse — no body.
fn empty_query_response_bytes() -> BytesMut {
backend_message(b'I', b"")
}

/// `'s'` PortalSuspended — no body.
fn portal_suspended_bytes() -> BytesMut {
backend_message(b's', b"")
}

/// `'E'` ErrorResponse — a sequence of (field-type, C-string) pairs
/// terminated by a zero byte. Content is irrelevant here: the passthrough
/// path forwards the bytes and matches on the code without parsing them.
fn error_response_bytes() -> BytesMut {
backend_message(b'E', b"SERROR\0CXX000\0Mboom\0\0")
}

/// Regression test for BUG-300 (passthrough memory leak).
///
/// The frontend enqueues a session + execute for *every* statement. Those
/// per-connection `execute` / `session_metrics` queues are only drained by
/// `complete_execution()` / `finish_session()`. Before the fix, the
/// passthrough branch in `rewrite()` returned early without calling these,
/// so the queues grew by one entry per statement and leaked until OOM.
///
/// This drives `Backend::rewrite()` through the passthrough branch and
/// asserts both queues stay empty across many statements — once for *each*
/// execute-terminating message code the fix drains on, so dropping any arm
/// of that match is caught. It fails against the pre-fix backend (which
/// never drained in passthrough) — i.e. it actually guards the bug.
#[tokio::test]
async fn passthrough_drains_queues_on_execute_terminating_message() {
log::init(LogConfig::default());

const STATEMENTS: usize = 256;

// Every code that terminates the execute phase must drain the queues.
let cases: [(&str, MessageEncoder); 4] = [
("CommandComplete", command_complete_bytes),
("EmptyQueryResponse", empty_query_response_bytes),
("PortalSuspended", portal_suspended_bytes),
("ErrorResponse", error_response_bytes),
];

for (label, encode) in cases {
let context = passthrough_context();
assert!(
context.is_passthrough(),
"test context must be in passthrough mode"
);

// A stream of identical terminating messages — one per statement —
// that the backend reads from the "server".
let message = encode();
let mut server_bytes = BytesMut::new();
for _ in 0..STATEMENTS {
server_bytes.extend_from_slice(&message);
}

// Keep the client receiver alive so write_with_flush succeeds.
let (client_sender, _client_receiver) = mpsc::unbounded_channel();
let reader = Cursor::new(server_bytes.to_vec());
let mut backend = Backend::new(client_sender, reader, context);

for i in 0..STATEMENTS {
// Frontend: enqueue a session + execute for the statement.
let session_id = backend.context.start_session();
backend
.context
.set_execute(Name::unnamed(), Some(session_id));

// Backend: process the terminating message via the passthrough
// path, which must drain the queues.
backend.rewrite().await.unwrap();

// The queues must be drained every iteration — not grow by one
// per statement (the BUG-300 leak).
assert_eq!(
backend.context.execute_queue_len(),
0,
"{label}: execute queue not drained at statement {i}"
);
assert_eq!(
backend.context.session_metrics_queue_len(),
0,
"{label}: session_metrics queue not drained at statement {i}"
);
}
}
}
}
51 changes: 51 additions & 0 deletions packages/cipherstash-proxy/src/postgresql/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,24 @@ where
self.set_execute(name, session_id);
}

/// Number of entries currently queued in the `execute` queue.
///
/// Test-only accessor used by the BUG-300 regression tests to assert the
/// per-connection queues are drained (and do not grow unbounded).
#[cfg(test)]
pub(crate) fn execute_queue_len(&self) -> usize {
self.execute.read().unwrap().queue.len()
}

/// Number of entries currently queued in the `session_metrics` queue.
///
/// Test-only accessor used by the BUG-300 regression tests to assert the
/// per-connection queues are drained (and do not grow unbounded).
#[cfg(test)]
pub(crate) fn session_metrics_queue_len(&self) -> usize {
self.session_metrics.read().unwrap().queue.len()
}

/// Marks the current Execution as Complete.
///
/// Transfers accumulated timing data from ExecuteContext to SessionMetricsContext.phase_timing:
Expand Down Expand Up @@ -1176,6 +1194,39 @@ mod tests {
assert!(portal.is_some());
}

/// Unit test for the queue-draining primitives.
///
/// `complete_execution()` / `finish_session()` are the only drains for the
/// per-connection `execute` / `session_metrics` queues. This asserts that
/// calling them after enqueuing a session + execute leaves both queues
/// empty, across many iterations.
///
/// Note: this exercises the primitives directly — it does *not* drive the
/// backend passthrough path that actually caused BUG-300 (that early
/// returned without calling these). The backend-level regression test for
/// BUG-300 lives in `backend.rs`
/// (`passthrough_drains_queues_on_execute_terminating_message`).
#[test]
pub fn complete_execution_and_finish_session_drain_queues() {
log::init(LogConfig::default());

let mut context = create_context();

for _ in 0..1000 {
// Frontend: a session + execute are enqueued for every statement.
let session_id = context.start_session();
context.set_execute(Name::unnamed(), Some(session_id));

// Drain primitives, normally called by the backend on an
// execute-terminating message (CommandComplete / ErrorResponse / …).
context.complete_execution();
context.finish_session();
}

assert_eq!(context.execute_queue_len(), 0);
assert_eq!(context.session_metrics_queue_len(), 0);
}

#[test]
pub fn add_and_close_portals() {
log::init(LogConfig::default());
Expand Down
Loading