Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/org/duckdb/DuckDBAppender.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.duckdb;

import static java.lang.Thread.currentThread;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.ZoneOffset.UTC;
import static java.time.temporal.ChronoUnit.*;
Expand Down Expand Up @@ -98,6 +99,8 @@ public class DuckDBAppender implements AutoCloseable {

private boolean writeInlinedStrings = true;

private long ownerThreadId = currentThread().getId();

DuckDBAppender(DuckDBConnection conn, String catalog, String schema, String table) throws SQLException {
this.conn = conn;
this.catalog = catalog;
Expand Down Expand Up @@ -967,6 +970,11 @@ public DuckDBAppender setWriteInlinedStrings(boolean writeInlinedStrings) {
return this;
}

public Lock unsafeBreakThreadConfinement() {
this.ownerThreadId = 0;
return this.appenderRefLock;
}

private String createErrMsg(String error) {
return "Appender error"
+ ", catalog: '" + catalog + "'"
Expand Down Expand Up @@ -1019,6 +1027,15 @@ private void checkOpen() throws SQLException {
if (isClosed()) {
throw new SQLException(createErrMsg("appender was closed"));
}
if (ownerThreadId != 0 && ownerThreadId != currentThread().getId()) {
throw new SQLException(createErrMsg(
"detected the usage of the same Appender instance from multiple threads,"
+ " owner thread ID: " + ownerThreadId + ", current thread ID: " + currentThread().getId() + ";"
+ " 'append()' and 'flush()' operations cannot be called concurrently;"
+ " when it is necessary to use the same Appender instance from multiple threads,"
+ " call 'appender.unsafeBreakThreadConfinement()' method and use the 'Lock' instance"
+ " obtained from there to synchronize the calls to the Appender."));
}
}

private void checkCurrentColumnType(CAPIType ctype) throws SQLException {
Expand Down
71 changes: 71 additions & 0 deletions src/test/java/org/duckdb/TestAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.ZoneOffset.UTC;
import static java.util.Collections.nCopies;
import static org.duckdb.DuckDBHugeInt.HUGE_INT_MAX;
import static org.duckdb.DuckDBHugeInt.HUGE_INT_MIN;
import static org.duckdb.TestDuckDBJDBC.JDBC_URL;
Expand All @@ -13,6 +14,9 @@
import java.sql.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;

public class TestAppender {

Expand Down Expand Up @@ -920,4 +924,71 @@ public static void test_appender_basic_enum() throws Exception {
}
}
}

public static void test_lots_appender_concurrent_flush() throws Exception {
try (DuckDBConnection conn = DriverManager.getConnection(JDBC_URL).unwrap(DuckDBConnection.class);
Statement stmt = conn.createStatement()) {

stmt.execute("CREATE TABLE tab1 (col1 INTEGER[], col2 VARCHAR[])");
AtomicBoolean completed = new AtomicBoolean(false);
AtomicLong concurrentlyFlushed = new AtomicLong(0);

try (DuckDBAppender appender = conn.createAppender("tab1")) {
AtomicBoolean flushThrown = new AtomicBoolean(false);
Thread thFail = new Thread(() -> {
try {
assertThrows(appender::flush, SQLException.class);
flushThrown.set(true);
} catch (Exception e) {
flushThrown.set(false);
}
});
thFail.start();
thFail.join();
assertTrue(flushThrown.get());

Lock appenderLock = appender.unsafeBreakThreadConfinement();

Thread th = new Thread(() -> {
long count = 0;
while (!completed.get()) {
appenderLock.lock();
try {
count += appender.flush();
} catch (SQLException e) {
// suppress
} finally {
appenderLock.unlock();
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
concurrentlyFlushed.set(count);
});
th.start();

for (int i = 0; i < 1 << 18; i++) {
int[] arr1 = new int[i % 128];
List<String> arr2 = new ArrayList<>();
for (int j = 0; j < arr1.length; j++) {
arr1[j] = j;
arr2.add(String.join("", nCopies(j % 32, String.valueOf(i))));
}
appenderLock.lock();
try {
appender.beginRow().append(arr1).append(arr2).endRow();
} finally {
appenderLock.unlock();
}
}

completed.set(true);
th.join();
assertTrue(concurrentlyFlushed.get() > 0);
}
}
}
}
Loading