diff --git a/core/src/main/c/share/files.c b/core/src/main/c/share/files.c index 629eacb2..02fb93a1 100644 --- a/core/src/main/c/share/files.c +++ b/core/src/main/c/share/files.c @@ -164,9 +164,16 @@ JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_allocate fst.fst_length = (off_t) size; fst.fst_bytesalloc = 0; if (fcntl((int) fd, F_PREALLOCATE, &fst) == -1) { + /* Contiguous allocation failed (e.g. fragmented filesystem); retry + * non-contiguous all-or-nothing. Only fall through to ftruncate when + * the filesystem doesn't support F_PREALLOCATE at all; real failures + * (notably ENOSPC) must surface so the caller doesn't end up with a + * sparse file that SIGBUSes on later mmap store (sf-client.md §6). */ fst.fst_flags = F_ALLOCATEALL; - (void) fcntl((int) fd, F_PREALLOCATE, &fst); - /* if F_PREALLOCATE fails we still try ftruncate to set logical size */ + if (fcntl((int) fd, F_PREALLOCATE, &fst) == -1 + && errno != ENOTSUP && errno != EOPNOTSUPP) { + return JNI_FALSE; + } } #endif int res2; diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index 790a2210..27a79962 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -620,6 +620,10 @@ final class LineSenderBuilder { private static final long DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms private static final int DEFAULT_WS_AUTO_FLUSH_ROWS = 1_000; private static final int MIN_BUFFER_SIZE = AuthUtils.CHALLENGE_LEN + 1; // challenge size + 1; + // sf-client.md section 4.4: the inbox capacity must accommodate the + // distinct error categories in a bursty error stream so that drop-oldest + // does not erase the trailing distribution of categories. + private static final int MIN_ERROR_INBOX_CAPACITY = 16; // The PARAMETER_NOT_SET_EXPLICITLY constant is used to detect if a parameter was set explicitly in configuration parameters // where it matters. This is needed to detect invalid combinations of parameters. Why? // We want to fail-fast even when an explicitly configured options happens to be same value as the default value, @@ -1864,15 +1868,20 @@ public LineSenderBuilder errorHandler(io.questdb.client.SenderErrorHandler handl * *
WebSocket transport only; setting on other transports throws. * - * @param capacity must be {@code >= 1} + * @param capacity must be {@code >= 16} (sf-client.md section 4.4). + * The floor exists because overflow drops the oldest + * entry and watermarks are monotonic, so the inbox + * must be wide enough to keep a useful trailing + * window of categories under bursty errors. * @return this instance for method chaining */ public LineSenderBuilder errorInboxCapacity(int capacity) { if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) { throw new LineSenderException("error_inbox_capacity is only supported for WebSocket transport"); } - if (capacity < 1) { - throw new LineSenderException("error_inbox_capacity must be >= 1, was " + capacity); + if (capacity < MIN_ERROR_INBOX_CAPACITY) { + throw new LineSenderException("error_inbox_capacity must be >= " + + MIN_ERROR_INBOX_CAPACITY + ", was " + capacity); } this.errorInboxCapacity = capacity; return this; diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java index deb9a75f..178d35d0 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java @@ -293,7 +293,9 @@ private QwpQueryClient(String host, int port) { *
When the inbox is full, drops the oldest pending entry to make room + * (sf-client.md section 14.6, inherited contract) and bumps + * {@link #getDroppedNotifications()}. The newest entry is always retained + * because later events carry the most recent connection state. * *
Lazy-starts the dispatcher thread on the first successful offer. */ @@ -164,10 +162,18 @@ public boolean offer(SenderConnectionEvent event) { if (closed || event == null) { return false; } - boolean accepted = inbox.offer(event); - if (!accepted) { - dropped.incrementAndGet(); - return false; + // Drop-oldest overflow policy. The pollFirst()/offerLast() pair is + // not atomic with the consumer, but the consumer can only remove + // entries (never add). The retry loop converges in at most one extra + // iteration under SPSC; close()'s POISON enqueue widens the race + // briefly but the `closed` re-check exits cleanly. + while (!inbox.offerLast(event)) { + if (inbox.pollFirst() != null) { + dropped.incrementAndGet(); + } + if (closed) { + return false; + } } if (dispatcherThread == null) { startDispatcherIfNeeded(); @@ -175,6 +181,14 @@ public boolean offer(SenderConnectionEvent event) { return true; } + /** + * Replace the user-supplied listener. Effective for the next delivery. + * Null reverts to the loud-not-silent default. + */ + public void setListener(SenderConnectionListener listener) { + this.listener = listener != null ? listener : DefaultSenderConnectionListener.INSTANCE; + } + private void dispatchLoop() { while (!closed || !inbox.isEmpty()) { SenderConnectionEvent ev; diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java index 240341e6..aef01dde 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java @@ -30,8 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,11 +47,14 @@ * the daemon dispatcher takes from the queue and invokes the handler. * *
When the inbox is full, drops the oldest pending entry to make room + * (sf-client.md section 14.6) and bumps {@link #getDroppedNotifications()}. + * The newest entry is always retained because it carries the most recent + * watermark information. * *
Lazy-starts the dispatcher thread on the first successful offer.
*/
@@ -220,10 +230,18 @@ public boolean offer(SenderError error) {
if (closed || error == null) {
return false;
}
- boolean accepted = inbox.offer(error);
- if (!accepted) {
- dropped.incrementAndGet();
- return false;
+ // Drop-oldest overflow policy. The pollFirst()/offerLast() pair is
+ // not atomic with the consumer, but the consumer can only remove
+ // entries (never add). The retry loop converges in at most one extra
+ // iteration under SPSC; close()'s POISON enqueue widens the race
+ // briefly but the `closed` re-check exits cleanly.
+ while (!inbox.offerLast(error)) {
+ if (inbox.pollFirst() != null) {
+ dropped.incrementAndGet();
+ }
+ if (closed) {
+ return false;
+ }
}
// Common case after the first offer: thread already running, hot
// path is one volatile read. Lazy start happens once per dispatcher
diff --git a/core/src/main/java/io/questdb/client/std/Files.java b/core/src/main/java/io/questdb/client/std/Files.java
index 1b03a40d..7735fb40 100644
--- a/core/src/main/java/io/questdb/client/std/Files.java
+++ b/core/src/main/java/io/questdb/client/std/Files.java
@@ -258,12 +258,15 @@ public static void freeNativePath(long pathPtr) {
*/
public static int rename(String oldPath, String newPath) {
long o = pathPtr(oldPath);
- long n = pathPtr(newPath);
try {
- return rename0(o, n);
+ long n = pathPtr(newPath);
+ try {
+ return rename0(o, n);
+ } finally {
+ freePathPtr(n);
+ }
} finally {
freePathPtr(o);
- freePathPtr(n);
}
}
diff --git a/core/src/main/resources/io/questdb/client/bin/darwin-aarch64/libquestdb.dylib b/core/src/main/resources/io/questdb/client/bin/darwin-aarch64/libquestdb.dylib
index dd757017..4b53fe79 100644
Binary files a/core/src/main/resources/io/questdb/client/bin/darwin-aarch64/libquestdb.dylib and b/core/src/main/resources/io/questdb/client/bin/darwin-aarch64/libquestdb.dylib differ
diff --git a/core/src/main/resources/io/questdb/client/bin/darwin-x86-64/libquestdb.dylib b/core/src/main/resources/io/questdb/client/bin/darwin-x86-64/libquestdb.dylib
index b0eef508..bea672c7 100644
Binary files a/core/src/main/resources/io/questdb/client/bin/darwin-x86-64/libquestdb.dylib and b/core/src/main/resources/io/questdb/client/bin/darwin-x86-64/libquestdb.dylib differ
diff --git a/core/src/main/resources/io/questdb/client/bin/windows-x86-64/libquestdb.dll b/core/src/main/resources/io/questdb/client/bin/windows-x86-64/libquestdb.dll
index 1688d230..7b9c7d95 100755
Binary files a/core/src/main/resources/io/questdb/client/bin/windows-x86-64/libquestdb.dll and b/core/src/main/resources/io/questdb/client/bin/windows-x86-64/libquestdb.dll differ
diff --git a/core/src/test/java/io/questdb/client/test/SenderBuilderErrorApiTest.java b/core/src/test/java/io/questdb/client/test/SenderBuilderErrorApiTest.java
index f02cb914..ed3c35c6 100644
--- a/core/src/test/java/io/questdb/client/test/SenderBuilderErrorApiTest.java
+++ b/core/src/test/java/io/questdb/client/test/SenderBuilderErrorApiTest.java
@@ -94,19 +94,38 @@ public void testErrorHandlerRejectedOnNonWebSocketProtocol() {
}
@Test
- public void testErrorInboxCapacityRejectsZeroAndNegative() {
- try {
- Sender.builder(Sender.Transport.WEBSOCKET).errorInboxCapacity(0);
- Assert.fail("zero capacity must be rejected");
- } catch (LineSenderException expected) {
- Assert.assertTrue(expected.getMessage().contains("error_inbox_capacity"));
- Assert.assertTrue(expected.getMessage().contains(">="));
+ public void testErrorInboxCapacityRejectsBelowSpecFloor() {
+ // sf-client.md section 4.4: minimum is 16. Values below the floor
+ // (including 0 and negative) must surface the floor in the message
+ // so users can fix their configuration.
+ int[] rejected = {-5, 0, 1, 2, 15};
+ for (int v : rejected) {
+ try {
+ Sender.builder(Sender.Transport.WEBSOCKET).errorInboxCapacity(v);
+ Assert.fail("capacity " + v + " must be rejected; spec floor is 16");
+ } catch (LineSenderException expected) {
+ Assert.assertTrue("missing key in message: " + expected.getMessage(),
+ expected.getMessage().contains("error_inbox_capacity"));
+ Assert.assertTrue("missing floor (16) in message: " + expected.getMessage(),
+ expected.getMessage().contains(">= 16"));
+ }
}
+ // The floor itself is accepted.
+ Sender.builder(Sender.Transport.WEBSOCKET).errorInboxCapacity(16);
+ }
+
+ @Test
+ public void testConnectStringRejectsErrorInboxCapacityBelowFloor() {
+ // The connect-string path delegates to the builder, so the floor
+ // applies there too. A value below 16 must surface the floor.
try {
- Sender.builder(Sender.Transport.WEBSOCKET).errorInboxCapacity(-5);
- Assert.fail("negative capacity must be rejected");
+ Sender.builder("ws::addr=127.0.0.1:1;error_inbox_capacity=8;").build().close();
+ Assert.fail("capacity 8 must be rejected; spec floor is 16");
} catch (LineSenderException expected) {
- // ok
+ Assert.assertTrue("missing key in message: " + expected.getMessage(),
+ expected.getMessage().contains("error_inbox_capacity"));
+ Assert.assertTrue("missing floor (16) in message: " + expected.getMessage(),
+ expected.getMessage().contains(">= 16"));
}
}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientFromConfigTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientFromConfigTest.java
index 0d7dc626..4b30259d 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientFromConfigTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientFromConfigTest.java
@@ -160,6 +160,48 @@ public void testAddrPortZeroRejected() {
assertReject("ws::addr=db:0;", "port out of range in addr: db:0 (must be 1-65535)");
}
+ @Test
+ public void testAddrRepeatedKeysAccumulate() {
+ // failover.md section 1: repeated addr= keys must accumulate into a
+ // single ordered list, mirroring the ingress Sender behavior.
+ try (QwpQueryClient c = QwpQueryClient.fromConfig(
+ "ws::addr=alpha:9000;addr=bravo:9001;addr=charlie:9002;")) {
+ Assert.assertEquals(3, c.getEndpointCountForTest());
+ Assert.assertEquals("alpha", c.getEndpointHostForTest(0));
+ Assert.assertEquals(9000, c.getEndpointPortForTest(0));
+ Assert.assertEquals("bravo", c.getEndpointHostForTest(1));
+ Assert.assertEquals(9001, c.getEndpointPortForTest(1));
+ Assert.assertEquals("charlie", c.getEndpointHostForTest(2));
+ Assert.assertEquals(9002, c.getEndpointPortForTest(2));
+ }
+ }
+
+ @Test
+ public void testAddrRepeatedKeysAndCommasMixed() {
+ // The two accumulation forms must compose: comma-list followed by a
+ // second addr= key extends the list, not replaces it.
+ try (QwpQueryClient c = QwpQueryClient.fromConfig(
+ "ws::addr=alpha:9000,bravo:9001;addr=charlie:9002,delta:9003;")) {
+ Assert.assertEquals(4, c.getEndpointCountForTest());
+ Assert.assertEquals("alpha", c.getEndpointHostForTest(0));
+ Assert.assertEquals(9000, c.getEndpointPortForTest(0));
+ Assert.assertEquals("bravo", c.getEndpointHostForTest(1));
+ Assert.assertEquals(9001, c.getEndpointPortForTest(1));
+ Assert.assertEquals("charlie", c.getEndpointHostForTest(2));
+ Assert.assertEquals(9002, c.getEndpointPortForTest(2));
+ Assert.assertEquals("delta", c.getEndpointHostForTest(3));
+ Assert.assertEquals(9003, c.getEndpointPortForTest(3));
+ }
+ }
+
+ @Test
+ public void testAddrRepeatedKeysEmptyEntryStillRejected() {
+ // The empty-entry rejection must fire on every addr= occurrence, not
+ // just the first one, so a second key cannot smuggle in a malformed
+ // value.
+ assertReject("ws::addr=a:9000;addr=b:9000,;", "empty addr entry");
+ }
+
@Test
public void testAddrSingleWhitespaceTrimmedAroundHostPort() {
// The parser splits on commas and trims; a single leading space on a
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpRoleRejectCloseRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpRoleRejectCloseRaceTest.java
index f3a030d2..66c5e630 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpRoleRejectCloseRaceTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpRoleRejectCloseRaceTest.java
@@ -51,7 +51,9 @@ public void testCloseDuringRoleRejectBackoffReturnsPromptly() throws Exception {
+ ";close_flush_timeout_millis=0"
+ ";initial_connect_retry=async;";
- try (Sender sender = Sender.fromConfig(cfg)) {
+ Sender sender = Sender.fromConfig(cfg);
+ long elapsed;
+ try {
// Push a row so the I/O thread starts attempting connect; the
// first attempt will hit the role reject and enter the parkNanos
// backoff branch.
@@ -59,12 +61,15 @@ public void testCloseDuringRoleRejectBackoffReturnsPromptly() throws Exception {
waitFor(() -> server.upgrades.get() >= 1, 5_000);
Thread.sleep(100);
} finally {
+ // Bracket the close() so the timing assertion is meaningful;
+ // the race this test is named for lives entirely inside close().
long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- Assert.assertTrue(
- "close() during role-reject backoff must return promptly (got " + elapsed + "ms)",
- elapsed < 2_000);
+ sender.close();
+ elapsed = System.currentTimeMillis() - start;
}
+ Assert.assertTrue(
+ "close() during role-reject backoff must return promptly (got " + elapsed + "ms)",
+ elapsed < 2_000);
}
}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentRingTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentRingTest.java
index d50d6550..cec666a2 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentRingTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentRingTest.java
@@ -166,6 +166,67 @@ public void testRotationRebasesSpareToCorrectFsnRegardlessOfManagerGuess() throw
});
}
+ @Test
+ public void testRotationRearmsHighWaterBackupWakeup() throws Exception {
+ // The producer-thread flag that gates the high-water backup wakeup is
+ // per-active. Rotation must reset it so the new active can fire the
+ // backup again if the rotation-time wakeup didn't get a fresh spare
+ // installed in time. Pre-fix the flag stayed sticky-true after the
+ // first set, so on every active past the first the backup branch was
+ // a dead path.
+ TestUtils.assertMemoryLeak(() -> {
+ // 4 100-byte frames per segment. signalAtBytes is 75% of segSize,
+ // and HEADER (24) + 3 frames (3*108 = 324) lands publishedOffset
+ // at 348, just past the 342-byte threshold.
+ long segSize = MmapSegment.HEADER_SIZE
+ + 4 * (MmapSegment.FRAME_HEADER_SIZE + 100);
+ long buf = Unsafe.malloc(100, MemoryTag.NATIVE_DEFAULT);
+ try {
+ MmapSegment seg0 = MmapSegment.create(tmpDir + "/wseg0.sfa", 0, segSize);
+ try (SegmentRing ring = new SegmentRing(seg0, segSize)) {
+ int[] wakeups = {0};
+ ring.setManagerWakeup(() -> wakeups[0]++);
+ fillPattern(buf, 100, 0);
+
+ // First active: two frames stay below high-water.
+ assertEquals(0, ring.appendOrFsn(buf, 100));
+ assertEquals(1, ring.appendOrFsn(buf, 100));
+ assertEquals("no wakeup before high-water", 0, wakeups[0]);
+ // Third frame crosses 75%: backup branch fires once.
+ assertEquals(2, ring.appendOrFsn(buf, 100));
+ assertEquals("backup signal fires on high-water crossing", 1, wakeups[0]);
+ // Fourth frame fills the active. Still same active, so the
+ // backup branch must coalesce and not fire again.
+ assertEquals(3, ring.appendOrFsn(buf, 100));
+ assertEquals("backup signal coalesces within an active", 1, wakeups[0]);
+ // Active is full and there is still no spare.
+ assertEquals(SegmentRing.BACKPRESSURE_NO_SPARE,
+ ring.appendOrFsn(buf, 100));
+ assertEquals("backpressure does not fire wakeup", 1, wakeups[0]);
+
+ // Install spare, then retry. The retry triggers rotation,
+ // which fires the wakeup unconditionally.
+ ring.installHotSpare(MmapSegment.create(
+ tmpDir + "/wseg1.sfa", ring.nextSeqHint(), segSize));
+ assertEquals(4, ring.appendOrFsn(buf, 100));
+ assertEquals("rotation fires wakeup", 2, wakeups[0]);
+
+ // New active. Two frames keep publishedOffset below the
+ // high-water mark (the rotated frame counts as the first).
+ assertEquals(5, ring.appendOrFsn(buf, 100));
+ assertEquals(2, wakeups[0]);
+ // Third frame on the new active crosses 75% again. Without
+ // rotation re-arming the per-active flag, this assertion
+ // catches the regression: pre-fix wakeups stayed at 2.
+ assertEquals(6, ring.appendOrFsn(buf, 100));
+ assertEquals("backup signal re-arms on the new active", 3, wakeups[0]);
+ }
+ } finally {
+ Unsafe.free(buf, 100, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
@Test
public void testAcknowledgeAndDrainTrimsOldestFirstUntilUnackedFound() throws Exception {
TestUtils.assertMemoryLeak(() -> {
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SenderConnectionDispatcherTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SenderConnectionDispatcherTest.java
index f3aed2fe..e7c0d7c1 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SenderConnectionDispatcherTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SenderConnectionDispatcherTest.java
@@ -95,19 +95,24 @@ public void testConstructorRejectsNullListener() {
}
@Test
- public void testFullInboxDropsAndCounts() throws Exception {
- // Slow listener releases only when the test allows. Until then,
- // every offer beyond capacity must be dropped (returning false) and
- // counted via getDroppedNotifications.
+ public void testFullInboxDropsOldestAndCounts() throws Exception {
+ // Inherits sf-client.md section 14.6: on overflow, drop the OLDEST
+ // entry and admit the new one. Later connection events carry the
+ // freshest state, so dropping the head loses the least information.
CountDownLatch unblock = new CountDownLatch(1);
- AtomicInteger delivered = new AtomicInteger();
+ List