From 9a82f696865e471a4df4290a5b3cc0986ff2ebdb Mon Sep 17 00:00:00 2001 From: Alex Kasko Date: Fri, 27 Feb 2026 22:55:43 +0000 Subject: [PATCH] Thread confinement for Appender Appender instances are not thread-safe. Only the `close()` method can be safely called from other threads concurrently with other operations. `append()` and `flush()` operations operate on the same native buffer and cannot be called concurrently. This PR implements a variant of a thread confinement for the Appender class intances. Only the thread that has created the Appender can call its methods (`close()` method still can be called from any thread). Method calls with throw `SQLExeption`s when called from other threads. When it is necessary to use Appender from multiple threads, it is required to call `unsafeBreakThreadConfinement()` method first and use a `Lock` instance returned from it to synchronize the access to this Appender instance. Example: ```java try (DuckDBAppender appender = connection.createAppender("tab1")) { Thread th = new Thread(() -> { // appender.flush(); // throws SQLException Lock appenderLock = appender.unsafeBreakThreadConfinement(); appenderLock.lock(); try { appender.flush(); } catch (SQLException e) { e.printStackTrace(); } finally { appenderLock.unlock(); } }); th.start(); th.join(); } ``` Testing: a concurrent test added that, without the patch, was crashing the JVM in about 1 of 10 runs. Fixes: #582 --- src/main/java/org/duckdb/DuckDBAppender.java | 17 +++++ src/test/java/org/duckdb/TestAppender.java | 71 ++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/src/main/java/org/duckdb/DuckDBAppender.java b/src/main/java/org/duckdb/DuckDBAppender.java index 92448c2de..38371ae2b 100644 --- a/src/main/java/org/duckdb/DuckDBAppender.java +++ b/src/main/java/org/duckdb/DuckDBAppender.java @@ -1,5 +1,6 @@ package org.duckdb; +import static java.lang.Thread.currentThread; import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.ZoneOffset.UTC; import static java.time.temporal.ChronoUnit.*; @@ -98,6 +99,8 @@ public class DuckDBAppender implements AutoCloseable { private boolean writeInlinedStrings = true; + private long ownerThreadId = currentThread().getId(); + DuckDBAppender(DuckDBConnection conn, String catalog, String schema, String table) throws SQLException { this.conn = conn; this.catalog = catalog; @@ -967,6 +970,11 @@ public DuckDBAppender setWriteInlinedStrings(boolean writeInlinedStrings) { return this; } + public Lock unsafeBreakThreadConfinement() { + this.ownerThreadId = 0; + return this.appenderRefLock; + } + private String createErrMsg(String error) { return "Appender error" + ", catalog: '" + catalog + "'" @@ -1019,6 +1027,15 @@ private void checkOpen() throws SQLException { if (isClosed()) { throw new SQLException(createErrMsg("appender was closed")); } + if (ownerThreadId != 0 && ownerThreadId != currentThread().getId()) { + throw new SQLException(createErrMsg( + "detected the usage of the same Appender instance from multiple threads," + + " owner thread ID: " + ownerThreadId + ", current thread ID: " + currentThread().getId() + ";" + + " 'append()' and 'flush()' operations cannot be called concurrently;" + + " when it is necessary to use the same Appender instance from multiple threads," + + " call 'appender.unsafeBreakThreadConfinement()' method and use the 'Lock' instance" + + " obtained from there to synchronize the calls to the Appender.")); + } } private void checkCurrentColumnType(CAPIType ctype) throws SQLException { diff --git a/src/test/java/org/duckdb/TestAppender.java b/src/test/java/org/duckdb/TestAppender.java index a67840cb3..01d4b4e9a 100644 --- a/src/test/java/org/duckdb/TestAppender.java +++ b/src/test/java/org/duckdb/TestAppender.java @@ -2,6 +2,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.ZoneOffset.UTC; +import static java.util.Collections.nCopies; import static org.duckdb.DuckDBHugeInt.HUGE_INT_MAX; import static org.duckdb.DuckDBHugeInt.HUGE_INT_MIN; import static org.duckdb.TestDuckDBJDBC.JDBC_URL; @@ -13,6 +14,9 @@ import java.sql.*; import java.time.*; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; public class TestAppender { @@ -920,4 +924,71 @@ public static void test_appender_basic_enum() throws Exception { } } } + + public static void test_lots_appender_concurrent_flush() throws Exception { + try (DuckDBConnection conn = DriverManager.getConnection(JDBC_URL).unwrap(DuckDBConnection.class); + Statement stmt = conn.createStatement()) { + + stmt.execute("CREATE TABLE tab1 (col1 INTEGER[], col2 VARCHAR[])"); + AtomicBoolean completed = new AtomicBoolean(false); + AtomicLong concurrentlyFlushed = new AtomicLong(0); + + try (DuckDBAppender appender = conn.createAppender("tab1")) { + AtomicBoolean flushThrown = new AtomicBoolean(false); + Thread thFail = new Thread(() -> { + try { + assertThrows(appender::flush, SQLException.class); + flushThrown.set(true); + } catch (Exception e) { + flushThrown.set(false); + } + }); + thFail.start(); + thFail.join(); + assertTrue(flushThrown.get()); + + Lock appenderLock = appender.unsafeBreakThreadConfinement(); + + Thread th = new Thread(() -> { + long count = 0; + while (!completed.get()) { + appenderLock.lock(); + try { + count += appender.flush(); + } catch (SQLException e) { + // suppress + } finally { + appenderLock.unlock(); + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + concurrentlyFlushed.set(count); + }); + th.start(); + + for (int i = 0; i < 1 << 18; i++) { + int[] arr1 = new int[i % 128]; + List arr2 = new ArrayList<>(); + for (int j = 0; j < arr1.length; j++) { + arr1[j] = j; + arr2.add(String.join("", nCopies(j % 32, String.valueOf(i)))); + } + appenderLock.lock(); + try { + appender.beginRow().append(arr1).append(arr2).endRow(); + } finally { + appenderLock.unlock(); + } + } + + completed.set(true); + th.join(); + assertTrue(concurrentlyFlushed.get() > 0); + } + } + } }