diff --git a/MIGRATING-0.3.md b/MIGRATING-0.3.md index 0dab583..75265c2 100644 --- a/MIGRATING-0.3.md +++ b/MIGRATING-0.3.md @@ -177,6 +177,72 @@ if let Error::Server { sqlstate: Some(code), detail, hint, .. } = &err { --- +## #69 — Transaction API consolidation + +The raw transaction methods on `Connection` and `AsyncConnection` are now deprecated and hidden from rustdoc. The RAII guard at `Connection::transaction()` / `AsyncConnection::transaction()` is the recommended (and only documented) way to drive transactions. + +### What's deprecated + +```rust +Connection::begin_transaction(&self) // -> #[doc(hidden)] #[deprecated] +Connection::commit(&self) // -> #[doc(hidden)] #[deprecated] +Connection::rollback(&self) // -> #[doc(hidden)] #[deprecated] +AsyncConnection::begin_transaction(&self) // -> #[doc(hidden)] #[deprecated] +AsyncConnection::commit(&self) // -> #[doc(hidden)] #[deprecated] +AsyncConnection::rollback(&self) // -> #[doc(hidden)] #[deprecated] +``` + +These methods still exist and still work — your build will see compiler warnings rather than errors. They will be deleted in a future release; new code must use the RAII guard. + +### Migration recipe + +```rust +// Before +conn.begin_transaction()?; +conn.execute_command("INSERT INTO t VALUES (1)")?; +conn.commit()?; + +// After (sync) +let txn = conn.transaction()?; // requires &mut conn +txn.execute_command("INSERT INTO t VALUES (1)")?; +txn.commit()?; +``` + +For the async equivalent, the body of the function holding `conn` will need to take `&mut AsyncConnection` instead of `&AsyncConnection`. Where you previously had: + +```rust +pub async fn ingest(conn: &AsyncConnection, ...) -> Result<(), McpError> { + conn.begin_transaction().await?; + ... + conn.commit().await?; +} +``` + +write: + +```rust +pub async fn ingest(conn: &mut AsyncConnection, ...) -> Result<(), McpError> { + let txn = conn.transaction().await?; + txn.execute_command("...").await?; + txn.commit().await?; +} +``` + +Callers that hold a pooled connection (`deadpool::managed::Object`) need `let mut conn = pool.get().await?;` and `&mut conn` at the call site. + +### What didn't change + +- `Connection::transaction(&mut self) -> Result>` — kept as the canonical entry point. +- `Transaction::commit(self)` and `Transaction::rollback(self)` — kept; consume `self` to prevent double-commit. +- The `Drop for Transaction` auto-rollback safety net — kept. +- `AsyncTransaction` semantics, including the warning-only `Drop` (Rust has no async `Drop`) — kept. + +### MCP follow-up + +The MCP server's `Engine::execute_in_transaction` helper takes `&self` and so cannot use the RAII guard. It retains the deprecated raw methods with a function-level `#[allow(deprecated, reason = "...")]` annotation. Migrating it requires reshaping `Engine`'s locking model. Two structural paths and an acceptance-criteria checklist are written up in [issue #72](https://github.com/tableau/hyper-api-rust/issues/72). + +--- + ## #70 (continued) — Ergonomic constructors across all workspace error types The same ergonomic-constructor pattern was applied to every error type in the workspace that user code might construct, so call sites no longer need `.to_string()` ceremony for string-literal arguments. diff --git a/docs/TRANSACTIONS.md b/docs/TRANSACTIONS.md index 67ad3e8..13352c2 100644 --- a/docs/TRANSACTIONS.md +++ b/docs/TRANSACTIONS.md @@ -1,31 +1,17 @@ # Transaction Support -This document describes the transaction API in the Hyper Rust API, covering ACID semantics (A, C, I guaranteed; D not provided by this API), raw `Connection` methods, the RAII `Transaction` / `AsyncTransaction` guards, behavioral notes, and the test inventory. +This document describes the transaction API in the Hyper Rust API, covering ACID semantics (A, C, I guaranteed; D not provided by this API), the RAII `Transaction` / `AsyncTransaction` guards, behavioral notes, and the test inventory. ## Overview Hyper transactions in the Rust API guarantee **A**tomicity, **C**onsistency, and **I**solation. **Durability is not provided by this API.** Committed data is held in the server's memory; the database becomes durable only when it is closed, unloaded, detached, or released — at which point its data is flushed to disk. An unexpected process termination (crash, SIGKILL) before that flush can lose committed transactions. -The API provides two levels of transaction control: +The recommended way to drive transactions is the **RAII guard** (`Transaction<'conn>` / `AsyncTransaction<'conn>`), which auto-rolls back on drop and uses Rust's borrow checker to make several classes of misuse compile errors. -1. **Raw methods** on `Connection` / `AsyncConnection` — thin wrappers around SQL commands -2. **RAII guards** (`Transaction<'conn>` / `AsyncTransaction<'conn>`) — auto-rollback on drop - -All transaction APIs are always available with no feature flags required. +> Older raw `Connection::begin_transaction` / `commit` / `rollback` methods exist but are **deprecated** as of v0.3.0 and hidden from generated rustdoc. They will be removed in a future release; new code must use the RAII guard. See "Deprecated raw methods" at the bottom of this doc for the migration recipe. ## API Reference -### Raw Connection Methods - -Available on both `Connection` (sync) and `AsyncConnection` (async, with `.await`): - -```rust -// Transaction control -conn.begin_transaction()?; -conn.commit()?; -conn.rollback()?; -``` - ### RAII Transaction Guard (Sync) ```rust @@ -117,33 +103,53 @@ txn.commit().await?; ### Transactions -- **Nested BEGIN:** Calling `begin_transaction()` inside an active transaction produces a Hyper WARNING notice, not an error. The second BEGIN is ignored. -- **ROLLBACK outside transaction:** Calling `rollback()` with no active transaction produces a WARNING, not an error. -- **Error in transaction:** After a SQL error inside a transaction, the entire transaction enters an aborted state (SQLSTATE `25P02`). You must issue `ROLLBACK` before using the connection for anything else. -- **DDL after DML:** Executing DDL (e.g., `CREATE TABLE`) after DML (e.g., `INSERT`) in the same transaction produces error `0A000`. DDL-only transactions work fine. +- **Error in transaction:** After a SQL error inside a transaction, the entire transaction enters an aborted state (SQLSTATE `25P02`). You must drop or rollback the guard before using the connection for anything else; the next `txn.execute_command(...)` would error. +- **DDL after DML:** Executing DDL (e.g. `CREATE TABLE`) after DML (e.g. `INSERT`) in the same transaction produces error `0A000`. DDL-only transactions work fine. +- **Nested transactions:** Hyper does not support nested transactions. Issuing `BEGIN` while a transaction is open produces a WARNING; the second BEGIN is ignored. The RAII guard's `&mut self` borrow already prevents this in safe Rust code. ## What Works -- BEGIN / COMMIT / ROLLBACK via raw methods and via SQL strings - RAII `Transaction` guard with auto-rollback on drop (sync) - RAII `AsyncTransaction` guard (async, with warning-only drop) +- DDL inside transactions (subject to the DDL-after-DML restriction) +- Multi-table atomic rollback ## What Doesn't Work / Limitations -- **Async Drop rollback:** `AsyncTransaction` cannot issue ROLLBACK in Drop due to Rust's sync-only Drop trait. It only prints a warning. -- **Error recovery within transactions:** After a SQL error inside a transaction, the transaction is fully aborted (SQLSTATE `25P02`). You must ROLLBACK — you cannot continue executing statements. +- **Async Drop rollback:** `AsyncTransaction` cannot issue ROLLBACK in Drop due to Rust's sync-only Drop trait. It only prints a warning. Always explicitly commit or rollback async transactions before drop. +- **Error recovery within transactions:** After a SQL error inside a transaction, the transaction is fully aborted (SQLSTATE `25P02`). You must rollback — you cannot continue executing statements. - **`information_schema.tables`:** Does not exist in Hyper. Cannot be used to check table existence. +## Deprecated raw methods + +The methods `Connection::begin_transaction` / `commit` / `rollback` (and the matching `AsyncConnection` versions) are **deprecated** as of v0.3.0. They are hidden from generated rustdoc, marked `#[deprecated]` so any caller receives a compiler warning, and slated for removal in a future release. + +Migration recipe: + +```rust +// Before — deprecated +conn.begin_transaction()?; +conn.execute_command("INSERT INTO t VALUES (1, 'hello')")?; +conn.commit()?; + +// After — RAII guard +let txn = conn.transaction()?; // requires &mut conn +txn.execute_command("INSERT INTO t VALUES (1, 'hello')")?; +txn.commit()?; +``` + +The `&mut conn` requirement is intentional — it's the borrow-checker mechanism that makes the safety story compile-enforced. If your code currently holds the connection through a non-mutable reference (e.g. inside an `&self` method on a wrapper struct), you may need to reshape the wrapper's locking model. The MCP server's `engine.rs::execute_in_transaction` is one such caller; it retains the deprecated raw methods until [issue #72](https://github.com/tableau/hyper-api-rust/issues/72) restructures `Engine`'s lock model. + ## Test Inventory ### transaction_tests.rs -Basic transaction behavior. +Basic transaction behavior. The `test_raw_*` tests pin behavior of the deprecated raw methods until they are removed. | Test | Description | |------|-------------| -| `test_raw_begin_commit_methods` | Raw `begin_transaction()` / `commit()` methods | -| `test_raw_begin_rollback_methods` | Raw `begin_transaction()` / `rollback()` methods | +| `test_raw_begin_commit_methods` | **(deprecated API)** Raw `begin_transaction()` / `commit()` methods | +| `test_raw_begin_rollback_methods` | **(deprecated API)** Raw `begin_transaction()` / `rollback()` methods | | `test_begin_commit` | BEGIN + INSERT + COMMIT via SQL strings | | `test_begin_rollback` | BEGIN + INSERT + ROLLBACK via SQL strings | | `test_transaction_guard_commit` | RAII guard: `txn.execute_command()` + `txn.commit()` | diff --git a/hyperdb-api/examples/additional_examples/transactions.rs b/hyperdb-api/examples/additional_examples/transactions.rs index 7a49fac..02f830f 100644 --- a/hyperdb-api/examples/additional_examples/transactions.rs +++ b/hyperdb-api/examples/additional_examples/transactions.rs @@ -4,17 +4,28 @@ //! Example: Transactions //! //! Demonstrates the transaction API: -//! - Raw transaction methods (`begin_transaction`, `commit`, `rollback`) -//! - RAII `Transaction` guard with explicit commit and rollback +//! - **Recommended:** RAII `Transaction` guard with explicit commit and rollback //! - Querying within transactions to see uncommitted data //! - Multiple operations (INSERT, UPDATE, DELETE) in a single transaction //! - Multi-table atomic rollback (referential integrity across tables) //! - Multi-table reconnect semantics: only committed cross-table data is visible after reconnect //! - Auto-rollback safety net when the guard is dropped without commit //! - DDL inside transactions and known restrictions +//! - Legacy raw transaction methods (`begin_transaction`, `commit`, +//! `rollback`) — included for completeness only; deprecated and +//! slated for removal. New code should use the RAII guard. //! //! cargo run -p hyperdb-api --example transactions +// The `example_raw_transaction` and `example_raw_error_recovery` +// helpers below intentionally exercise the deprecated raw transaction +// API for documentation purposes. New code should use the RAII guard +// shown in `example_transaction_guard` instead. +#![allow( + deprecated, + reason = "example intentionally demonstrates the deprecated raw transaction API alongside the RAII guard" +)] + use hyperdb_api::{ Catalog, Connection, CreateMode, HyperProcess, Parameters, Result, SqlType, TableDefinition, }; diff --git a/hyperdb-api/src/async_connection.rs b/hyperdb-api/src/async_connection.rs index 7227894..9843c5f 100644 --- a/hyperdb-api/src/async_connection.rs +++ b/hyperdb-api/src/async_connection.rs @@ -1014,37 +1014,90 @@ impl AsyncConnection { // Transaction Control // ========================================================================= + // ------------------------------------------------------------------- + // Raw transaction control (internal) + // ------------------------------------------------------------------- + // + // The `*_raw` methods below are `pub(crate)` and form the canonical + // implementation of session-level transaction control. The RAII + // guard at `crate::AsyncTransaction` and any internal helper that + // genuinely needs `&self` (rather than the guard's `&mut self`) + // delegate to these. + // + // The matching `pub` methods (`begin_transaction`, `commit`, + // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers + // retained only so any pre-existing downstream caller sees a + // compiler warning rather than a hard break. They will be deleted + // in a future release; the `_raw` methods stay. + + /// Issues `BEGIN TRANSACTION`. Crate-internal use only. + pub(crate) async fn begin_transaction_raw(&self) -> Result<()> { + self.execute_command("BEGIN TRANSACTION").await?; + Ok(()) + } + + /// Issues `COMMIT`. Crate-internal use only. + pub(crate) async fn commit_raw(&self) -> Result<()> { + self.execute_command("COMMIT").await?; + Ok(()) + } + + /// Issues `ROLLBACK`. Crate-internal use only. + pub(crate) async fn rollback_raw(&self) -> Result<()> { + self.execute_command("ROLLBACK").await?; + Ok(()) + } + /// Begins an explicit transaction (async). /// + /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard + /// auto-rolls back on drop and cannot leak a half-open transaction + /// across error paths. Hidden from generated rustdoc and + /// deprecated; slated for removal in a future release. + /// /// # Errors /// /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION` /// (e.g. a transaction is already open on this session). + #[doc(hidden)] + #[deprecated( + note = "Use `AsyncConnection::transaction()` for an RAII guard. This method will be \ + removed in a future release." + )] pub async fn begin_transaction(&self) -> Result<()> { - self.execute_command("BEGIN TRANSACTION").await?; - Ok(()) + self.begin_transaction_raw().await } /// Commits the current transaction (async). /// + /// **Prefer [`AsyncTransaction::commit`](crate::AsyncTransaction::commit)** + /// on the RAII guard returned by [`transaction()`](Self::transaction). + /// Hidden from generated rustdoc and deprecated; slated for removal. + /// /// # Errors /// - /// Returns [`Error::Server`] if the server rejects `COMMIT` (e.g. no - /// transaction is currently open). + /// Returns [`Error::Server`] if the server rejects `COMMIT`. + #[doc(hidden)] + #[deprecated(note = "Use `AsyncTransaction::commit()` on the RAII guard from \ + `AsyncConnection::transaction()`. This method will be removed in a future release.")] pub async fn commit(&self) -> Result<()> { - self.execute_command("COMMIT").await?; - Ok(()) + self.commit_raw().await } /// Rolls back the current transaction (async). /// + /// **Prefer [`AsyncTransaction::rollback`](crate::AsyncTransaction::rollback)** + /// on the RAII guard returned by [`transaction()`](Self::transaction). + /// Hidden from generated rustdoc and deprecated; slated for removal. + /// /// # Errors /// - /// Returns [`Error::Server`] if the server rejects `ROLLBACK` (e.g. no - /// transaction is currently open). + /// Returns [`Error::Server`] if the server rejects `ROLLBACK`. + #[doc(hidden)] + #[deprecated(note = "Use `AsyncTransaction::rollback()` on the RAII guard from \ + `AsyncConnection::transaction()`. This method will be removed in a future release.")] pub async fn rollback(&self) -> Result<()> { - self.execute_command("ROLLBACK").await?; - Ok(()) + self.rollback_raw().await } /// Starts a transaction with an async RAII guard (async). diff --git a/hyperdb-api/src/async_transaction.rs b/hyperdb-api/src/async_transaction.rs index 259a84d..feb9270 100644 --- a/hyperdb-api/src/async_transaction.rs +++ b/hyperdb-api/src/async_transaction.rs @@ -37,7 +37,11 @@ pub struct AsyncTransaction<'conn> { impl<'conn> AsyncTransaction<'conn> { /// Creates a new async transaction by issuing `BEGIN TRANSACTION`. pub(crate) async fn new(connection: &'conn mut AsyncConnection) -> Result { - connection.begin_transaction().await?; + // Use the crate-internal `_raw` family. The matching `pub` + // methods on `AsyncConnection` are `#[deprecated]` for + // downstream consumers; this guard is the recommended + // replacement. + connection.begin_transaction_raw().await?; Ok(Self { connection, completed: false, @@ -48,22 +52,22 @@ impl<'conn> AsyncTransaction<'conn> { /// /// # Errors /// - /// Forwards the error from [`AsyncConnection::commit`]. The transaction + /// Forwards the error from the server's `COMMIT`. The transaction /// is marked completed regardless, so the drop guard will not warn. pub async fn commit(mut self) -> Result<()> { self.completed = true; - self.connection.commit().await + self.connection.commit_raw().await } /// Rolls back the transaction explicitly. /// /// # Errors /// - /// Forwards the error from [`AsyncConnection::rollback`]. The + /// Forwards the error from the server's `ROLLBACK`. The /// transaction is marked completed regardless. pub async fn rollback(mut self) -> Result<()> { self.completed = true; - self.connection.rollback().await + self.connection.rollback_raw().await } /// Returns a reference to the underlying async connection. diff --git a/hyperdb-api/src/connection.rs b/hyperdb-api/src/connection.rs index c0c112f..ecfbc12 100644 --- a/hyperdb-api/src/connection.rs +++ b/hyperdb-api/src/connection.rs @@ -1879,52 +1879,95 @@ impl Connection { // Transaction Control // ========================================================================= + // ------------------------------------------------------------------- + // Raw transaction control (internal) + // ------------------------------------------------------------------- + // + // The `*_raw` methods below are `pub(crate)` and form the canonical + // implementation of session-level transaction control. The RAII + // guard at `crate::Transaction` and any internal helper that + // genuinely needs `&self` (rather than the guard's `&mut self`) + // delegate to these. + // + // The matching `pub` methods (`begin_transaction`, `commit`, + // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers + // retained only so any pre-existing downstream caller sees a + // compiler warning rather than a hard break. They will be deleted + // in a future release; the `_raw` methods stay. + + /// Issues `BEGIN TRANSACTION`. Crate-internal use only. + pub(crate) fn begin_transaction_raw(&self) -> Result<()> { + self.execute_command("BEGIN TRANSACTION")?; + Ok(()) + } + + /// Issues `COMMIT`. Crate-internal use only. + pub(crate) fn commit_raw(&self) -> Result<()> { + self.execute_command("COMMIT")?; + Ok(()) + } + + /// Issues `ROLLBACK`. Crate-internal use only. + pub(crate) fn rollback_raw(&self) -> Result<()> { + self.execute_command("ROLLBACK")?; + Ok(()) + } + /// Begins an explicit transaction. /// - /// For an RAII guard that auto-rolls back on drop, use [`transaction()`](Self::transaction) instead. - /// - /// # Example - /// - /// ```no_run - /// # use hyperdb_api::{Connection, CreateMode, Result}; - /// # fn main() -> Result<()> { - /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?; - /// conn.begin_transaction()?; - /// conn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?; - /// conn.commit()?; - /// # Ok(()) - /// # } - /// ``` + /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard + /// auto-rolls back on drop and cannot leak a half-open transaction + /// across error paths. This method is hidden from generated + /// rustdoc and marked deprecated; it will be removed in a future + /// release. /// /// # Errors /// /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION` /// (e.g. a transaction is already open on this session). + #[doc(hidden)] + #[deprecated( + note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \ + in a future release." + )] pub fn begin_transaction(&self) -> Result<()> { - self.execute_command("BEGIN TRANSACTION")?; - Ok(()) + self.begin_transaction_raw() } /// Commits the current transaction. /// + /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on + /// the RAII guard returned by [`transaction()`](Self::transaction). + /// Hidden from generated rustdoc and deprecated; slated for removal. + /// /// # Errors /// - /// Returns [`Error::Server`] if the server rejects `COMMIT` — most - /// commonly because no transaction is currently open. + /// Returns [`Error::Server`] if the server rejects `COMMIT`. + #[doc(hidden)] + #[deprecated( + note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \ + This method will be removed in a future release." + )] pub fn commit(&self) -> Result<()> { - self.execute_command("COMMIT")?; - Ok(()) + self.commit_raw() } /// Rolls back the current transaction. /// + /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)** + /// on the RAII guard returned by [`transaction()`](Self::transaction). + /// Hidden from generated rustdoc and deprecated; slated for removal. + /// /// # Errors /// - /// Returns [`Error::Server`] if the server rejects `ROLLBACK` — most - /// commonly because no transaction is currently open. + /// Returns [`Error::Server`] if the server rejects `ROLLBACK`. + #[doc(hidden)] + #[deprecated( + note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \ + This method will be removed in a future release." + )] pub fn rollback(&self) -> Result<()> { - self.execute_command("ROLLBACK")?; - Ok(()) + self.rollback_raw() } /// Starts a transaction and returns an RAII guard that auto-rolls back on drop. diff --git a/hyperdb-api/src/transaction.rs b/hyperdb-api/src/transaction.rs index 281373d..2c27d5f 100644 --- a/hyperdb-api/src/transaction.rs +++ b/hyperdb-api/src/transaction.rs @@ -36,7 +36,10 @@ pub struct Transaction<'conn> { impl<'conn> Transaction<'conn> { /// Creates a new transaction by issuing `BEGIN TRANSACTION`. pub(crate) fn new(connection: &'conn mut Connection) -> Result { - connection.begin_transaction()?; + // Use the crate-internal `_raw` family. The matching `pub` + // methods on `Connection` are `#[deprecated]` for downstream + // consumers; this guard is the recommended replacement. + connection.begin_transaction_raw()?; Ok(Self { connection, completed: false, @@ -47,23 +50,23 @@ impl<'conn> Transaction<'conn> { /// /// # Errors /// - /// Returns the error from [`Connection::commit`]. The transaction is + /// Returns the error from the server's `COMMIT`. The transaction is /// marked completed regardless, so the drop guard will not re-issue a /// rollback. pub fn commit(mut self) -> Result<()> { self.completed = true; - self.connection.commit() + self.connection.commit_raw() } /// Rolls back the transaction explicitly. /// /// # Errors /// - /// Returns the error from [`Connection::rollback`]. The transaction is - /// marked completed regardless. + /// Returns the error from the server's `ROLLBACK`. The transaction + /// is marked completed regardless. pub fn rollback(mut self) -> Result<()> { self.completed = true; - self.connection.rollback() + self.connection.rollback_raw() } /// Returns a reference to the underlying connection. @@ -162,7 +165,7 @@ impl Drop for Transaction<'_> { if !self.completed { // Best-effort rollback; ignore errors during drop. // Hyper produces a WARNING (not error) if no active transaction. - let _ = self.connection.rollback(); + let _ = self.connection.rollback_raw(); } } } diff --git a/hyperdb-api/tests/transaction_tests.rs b/hyperdb-api/tests/transaction_tests.rs index f2324a3..0c88a0c 100644 --- a/hyperdb-api/tests/transaction_tests.rs +++ b/hyperdb-api/tests/transaction_tests.rs @@ -3,7 +3,14 @@ //! Tests for transaction support (BEGIN/COMMIT/ROLLBACK). //! -//! These tests verify basic transaction behavior. +//! These tests verify basic transaction behavior. Several tests +//! intentionally exercise the deprecated raw `begin_transaction` / +//! `commit` / `rollback` methods on `Connection` to lock in their +//! behavior; the deprecation warning is suppressed file-wide. +#![allow( + deprecated, + reason = "tests intentionally exercise the deprecated raw transaction API" +)] mod common; use common::TestConnection; diff --git a/hyperdb-api/tests/wire_desync_tests.rs b/hyperdb-api/tests/wire_desync_tests.rs index bbc5563..fa42fbb 100644 --- a/hyperdb-api/tests/wire_desync_tests.rs +++ b/hyperdb-api/tests/wire_desync_tests.rs @@ -12,6 +12,14 @@ //! These tests provoke a server-side error and then immediately run a //! variety of follow-up operations (simple query, command, transaction, //! prepared statement, COPY) to verify the connection stays in sync. +//! +//! Several tests exercise the deprecated raw `begin_transaction` / +//! `commit` / `rollback` methods on `Connection` to lock in their wire +//! behavior; the deprecation warning is suppressed file-wide. +#![allow( + deprecated, + reason = "regression tests intentionally exercise the deprecated raw transaction API" +)] mod common; diff --git a/hyperdb-mcp/src/engine.rs b/hyperdb-mcp/src/engine.rs index dc7b9a3..b89001d 100644 --- a/hyperdb-mcp/src/engine.rs +++ b/hyperdb-mcp/src/engine.rs @@ -719,6 +719,17 @@ impl Engine { /// Does not introduce new panic sites. If `f` panics, the transaction /// is rolled back (best-effort) and the original panic is re-raised /// via [`std::panic::resume_unwind`], preserving the panic payload. + // The deprecated `begin_transaction`/`commit`/`rollback` raw + // methods on `Connection` are required here because this helper + // takes `&self` (and so cannot use the RAII guard, which needs + // `&mut self`). Migrating requires reshaping `Engine`'s locking + // model — see issue #72 for two implementation paths (wrap + // connection in a `Mutex` vs. introduce an `EngineTransaction` + // guard) and the 8 closure call sites that need updating. + #[allow( + deprecated, + reason = "Engine borrows &self; the RAII guard requires &mut. Migration tracked in issue #72." + )] pub fn execute_in_transaction(&self, f: F) -> Result where F: FnOnce(&Engine) -> Result, diff --git a/hyperdb-mcp/src/ingest.rs b/hyperdb-mcp/src/ingest.rs index b42b23f..9d02bc4 100644 --- a/hyperdb-mcp/src/ingest.rs +++ b/hyperdb-mcp/src/ingest.rs @@ -922,7 +922,7 @@ pub fn ingest_csv_file( /// from the async connection. A rollback failure after an inner /// error is logged but does not override the original error. pub async fn ingest_csv_file_async( - conn: &AsyncConnection, + conn: &mut AsyncConnection, path: &str, opts: &IngestOptions, ) -> Result { @@ -969,28 +969,26 @@ pub async fn ingest_csv_file_async( let is_replace = opts.mode != "append"; - conn.begin_transaction().await.map_err(McpError::from)?; + let txn = conn.transaction().await.map_err(McpError::from)?; let inner: Result = async { create_table_async( - conn, + txn.connection(), &opts.table, &columns, is_replace, opts.target_db.as_deref(), ) .await?; - conn.execute_command(©_sql) - .await - .map_err(McpError::from) + txn.execute_command(©_sql).await.map_err(McpError::from) } .await; let row_count = match inner { Ok(n) => { - conn.commit().await.map_err(McpError::from)?; + txn.commit().await.map_err(McpError::from)?; n } Err(e) => { - if let Err(rb) = conn.rollback().await { + if let Err(rb) = txn.rollback().await { tracing::warn!("rollback after error failed: {}", rb); } return Err(e); @@ -1095,7 +1093,7 @@ pub fn ingest_json_file( /// connection. Rollback failures after an inner error are logged /// but do not shadow the original error. pub async fn ingest_json_async( - conn: &AsyncConnection, + conn: &mut AsyncConnection, json_str: &str, opts: &IngestOptions, ) -> Result { @@ -1119,10 +1117,10 @@ pub async fn ingest_json_async( let is_replace = opts.mode != "append"; let qualified = qualified_table(opts); - conn.begin_transaction().await.map_err(McpError::from)?; + let txn = conn.transaction().await.map_err(McpError::from)?; let inner: Result = async { create_table_async( - conn, + txn.connection(), &opts.table, &columns, is_replace, @@ -1149,7 +1147,7 @@ pub async fn ingest_json_async( col_names.join(", "), values.join(", ") ); - conn.execute_command(&sql).await.map_err(McpError::from)?; + txn.execute_command(&sql).await.map_err(McpError::from)?; row_count += 1; } Ok(row_count) @@ -1158,11 +1156,11 @@ pub async fn ingest_json_async( let row_count = match inner { Ok(n) => { - conn.commit().await.map_err(McpError::from)?; + txn.commit().await.map_err(McpError::from)?; n } Err(e) => { - if let Err(rb) = conn.rollback().await { + if let Err(rb) = txn.rollback().await { tracing::warn!("rollback after error failed: {}", rb); } return Err(e); @@ -1203,7 +1201,7 @@ pub async fn ingest_json_async( /// - Propagates errors from [`normalize_json_or_jsonl`] and from /// [`ingest_json_async`]. pub async fn ingest_json_file_async( - conn: &AsyncConnection, + conn: &mut AsyncConnection, path: &str, opts: &IngestOptions, ) -> Result { @@ -1238,7 +1236,10 @@ pub async fn ingest_json_file_async( /// Shared helper: `CREATE TABLE` (optionally dropping first) on an async /// connection. Mirrors [`Engine::create_table`] exactly so the async /// ingest paths produce identical tables to the sync ones. Callers that -/// need atomicity should wrap this in `begin_transaction` / `commit`. +/// need atomicity should call this from inside an +/// [`AsyncConnection::transaction`](hyperdb_api::AsyncConnection::transaction) +/// guard and pass `txn.connection()` here, then commit the guard on +/// success. pub(crate) async fn create_table_async( conn: &AsyncConnection, table_name: &str, diff --git a/hyperdb-mcp/src/ingest_arrow.rs b/hyperdb-mcp/src/ingest_arrow.rs index 5d64815..0785dd0 100644 --- a/hyperdb-mcp/src/ingest_arrow.rs +++ b/hyperdb-mcp/src/ingest_arrow.rs @@ -371,7 +371,7 @@ pub fn ingest_parquet_file( /// path resolution, schema inference, transaction/ingest, and row /// counting failures. pub async fn ingest_parquet_file_async( - conn: &AsyncConnection, + conn: &mut AsyncConnection, path: &str, opts: &IngestOptions, ) -> Result { @@ -402,33 +402,36 @@ pub async fn ingest_parquet_file_async( opts.target_db.as_deref(), ); - conn.begin_transaction().await.map_err(McpError::from)?; + let txn = conn.transaction().await.map_err(McpError::from)?; let result: Result = async { if is_replace { let qualified = crate::ingest::qualified_table(opts); - conn.execute_command(&format!("DROP TABLE IF EXISTS {qualified}")) + txn.execute_command(&format!("DROP TABLE IF EXISTS {qualified}")) .await .map_err(McpError::from)?; } - conn.execute_command(&sql).await.map_err(McpError::from) + txn.execute_command(&sql).await.map_err(McpError::from) } .await; let affected = match result { Ok(n) => { - conn.commit().await.map_err(McpError::from)?; + txn.commit().await.map_err(McpError::from)?; n } Err(e) => { - if let Err(rb) = conn.rollback().await { + if let Err(rb) = txn.rollback().await { tracing::warn!("rollback after error failed: {}", rb); } return Err(e); } }; - // See sync path: COUNT(*) must run outside the transaction to avoid a - // post-CTAS wire-state quirk that returns a truncated count. + // The transaction was committed above (or the function returned on + // error). This COUNT(*) runs on the bare connection — outside any + // transaction — to mirror the sync path's source-of-truth behavior + // and avoid any post-CTAS wire-state quirks that the sync path + // documents. let row_count = if is_replace { count_rows_async(conn, &opts.table).await? } else { @@ -662,7 +665,7 @@ pub fn ingest_arrow_ipc_file( /// - Propagates transaction errors from the async `CREATE TABLE` and /// from [`hyperdb_api::AsyncArrowInserter`] operations. pub async fn ingest_arrow_ipc_file_async( - conn: &AsyncConnection, + conn: &mut AsyncConnection, path: &str, opts: &IngestOptions, ) -> Result { @@ -691,10 +694,10 @@ pub async fn ingest_arrow_ipc_file_async( let is_replace = opts.mode != "append"; let table_def = crate::schema::build_table_def(&opts.table, &columns)?; - conn.begin_transaction().await.map_err(McpError::from)?; + let txn = conn.transaction().await.map_err(McpError::from)?; let result: Result = async { crate::ingest::create_table_async( - conn, + txn.connection(), &opts.table, &columns, is_replace, @@ -706,8 +709,8 @@ pub async fn ingest_arrow_ipc_file_async( // search path. When targeting a non-primary DB, qualify the // TableDefinition with that database. (Same trick the sync // Arrow IPC path uses via scoped_search_path.) - let mut inserter = - hyperdb_api::AsyncArrowInserter::new(conn, &table_def).map_err(McpError::from)?; + let mut inserter = hyperdb_api::AsyncArrowInserter::new(txn.connection(), &table_def) + .map_err(McpError::from)?; inserter .insert_data(&ipc_stream) .await @@ -718,11 +721,11 @@ pub async fn ingest_arrow_ipc_file_async( let row_count = match result { Ok(n) => { - conn.commit().await.map_err(McpError::from)?; + txn.commit().await.map_err(McpError::from)?; n } Err(e) => { - if let Err(rb) = conn.rollback().await { + if let Err(rb) = txn.rollback().await { tracing::warn!("rollback after error failed: {}", rb); } return Err(e); diff --git a/hyperdb-mcp/src/server.rs b/hyperdb-mcp/src/server.rs index 8e7a0f7..f8c7dd4 100644 --- a/hyperdb-mcp/src/server.rs +++ b/hyperdb-mcp/src/server.rs @@ -1872,7 +1872,7 @@ impl HyperMcpServer { // Check out a connection from the pool. Held only // for the duration of this one ingest, then released. - let conn = match pool.get().await { + let mut conn = match pool.get().await { Ok(c) => c, Err(e) => { out.err = Some(( @@ -1914,7 +1914,7 @@ impl HyperMcpServer { return (idx, out); } }; - crate::ingest::ingest_json_async(&conn, &array_text, &opts) + crate::ingest::ingest_json_async(&mut conn, &array_text, &opts) .await .map(|mut r| { r.stats.operation = "load_file".into(); @@ -1926,16 +1926,16 @@ impl HyperMcpServer { } else { match detect_file_format(std::path::Path::new(&entry.path)) { InferredFileFormat::Parquet => { - ingest_parquet_file_async(&conn, &entry.path, &opts).await + ingest_parquet_file_async(&mut conn, &entry.path, &opts).await } InferredFileFormat::ArrowIpc => { - ingest_arrow_ipc_file_async(&conn, &entry.path, &opts).await + ingest_arrow_ipc_file_async(&mut conn, &entry.path, &opts).await } InferredFileFormat::Json => { - ingest_json_file_async(&conn, &entry.path, &opts).await + ingest_json_file_async(&mut conn, &entry.path, &opts).await } InferredFileFormat::Csv => { - ingest_csv_file_async(&conn, &entry.path, &opts).await + ingest_csv_file_async(&mut conn, &entry.path, &opts).await } } }; diff --git a/hyperdb-mcp/src/watcher.rs b/hyperdb-mcp/src/watcher.rs index 1963f30..adc81eb 100644 --- a/hyperdb-mcp/src/watcher.rs +++ b/hyperdb-mcp/src/watcher.rs @@ -737,7 +737,7 @@ async fn ingest_one_ready_file( ready_path: &Path, data_path: &Path, ) -> Result { - let conn = pool.get().await.map_err(|e| { + let mut conn = pool.get().await.map_err(|e| { McpError::new( ErrorCode::InternalError, format!("Failed to check out connection: {e}"), @@ -759,10 +759,12 @@ async fn ingest_one_ready_file( .to_str() .ok_or_else(|| McpError::new(ErrorCode::InternalError, "Non-UTF-8 path"))?; let res = match detect_file_format(data_path) { - InferredFileFormat::Parquet => ingest_parquet_file_async(&conn, data_str, &opts).await, - InferredFileFormat::ArrowIpc => ingest_arrow_ipc_file_async(&conn, data_str, &opts).await, - InferredFileFormat::Json => ingest_json_file_async(&conn, data_str, &opts).await, - InferredFileFormat::Csv => ingest_csv_file_async(&conn, data_str, &opts).await, + InferredFileFormat::Parquet => ingest_parquet_file_async(&mut conn, data_str, &opts).await, + InferredFileFormat::ArrowIpc => { + ingest_arrow_ipc_file_async(&mut conn, data_str, &opts).await + } + InferredFileFormat::Json => ingest_json_file_async(&mut conn, data_str, &opts).await, + InferredFileFormat::Csv => ingest_csv_file_async(&mut conn, data_str, &opts).await, }?; let _ = ready_path; // silence the unused-variable lint; the path is used by the caller Ok(res.rows) diff --git a/hyperdb-mcp/tests/ingest_arrow_tests.rs b/hyperdb-mcp/tests/ingest_arrow_tests.rs index 2e9e16f..3a54e8c 100644 --- a/hyperdb-mcp/tests/ingest_arrow_tests.rs +++ b/hyperdb-mcp/tests/ingest_arrow_tests.rs @@ -548,7 +548,7 @@ async fn load_files_runs_parallel_parquet_ingests() { let pool = Arc::clone(&pool); let table = table.to_string(); async move { - let conn = pool.get().await.unwrap(); + let mut conn = pool.get().await.unwrap(); let opts = IngestOptions { table, mode: "replace".into(), @@ -556,7 +556,7 @@ async fn load_files_runs_parallel_parquet_ingests() { merge_key: None, target_db: None, }; - ingest_parquet_file_async(&conn, &path, &opts) + ingest_parquet_file_async(&mut conn, &path, &opts) .await .unwrap() }