Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
f5afbf6
feat(ilp): QWiP store-and-forward client buffer
bluestreak01 Apr 26, 2026
efde7bc
fix(ilp): harden SF disk-full retry, reconnect, and CRC paths
bluestreak01 Apr 26, 2026
efbd8e1
Rebuild CXX libraries
Apr 26, 2026
cf3152d
fix(ilp): fail fast on fatal SF errors instead of looping
bluestreak01 Apr 26, 2026
86b6e6f
perf(ilp): replace O(N²) SF segment sort at open
bluestreak01 Apr 26, 2026
acb32b9
fix(ilp): plug SF rotate-OOM double-free and on-disk leak
bluestreak01 Apr 26, 2026
519f5e4
fix(ilp): trim acked frames from active SF segment
bluestreak01 Apr 26, 2026
91669d2
fix(ilp): pre-compute CRC32C table to remove lazy-init memory race
bluestreak01 Apr 26, 2026
e38b4d5
test(ilp): regression guards for SF disk-cap recovery
bluestreak01 Apr 26, 2026
03afa61
feat(ilp): opt-in sf_fsync_on_flush, fix sf_fsync default doc lie
bluestreak01 Apr 26, 2026
0304df8
fix(ilp): close schema-reset race window with connection-generation tag
bluestreak01 Apr 26, 2026
0ad83b9
Rebuild CXX libraries
Apr 26, 2026
07c20e0
fix(ilp): five SF correctness fixes from PR-17 review
bluestreak01 Apr 26, 2026
af58d7d
test(ilp): SegmentLog.append latency benchmark
bluestreak01 Apr 26, 2026
83bb368
perf(ilp): slice-by-8 CRC32C cuts SF append p50 ~4x
bluestreak01 Apr 26, 2026
87320e5
fix(ilp): three SF recovery correctness fixes from PR-17 review
bluestreak01 Apr 26, 2026
f58f766
test(ilp): JMH ingress latency benchmark for QWP Sender
bluestreak01 Apr 26, 2026
49f1683
feat(ilp): cursor SF engine primitives (mmap segments, ring, manager)
bluestreak01 Apr 26, 2026
cc4a68f
feat(ilp): cursor SF -- on-disk recovery + maxTotalBytes cap
bluestreak01 Apr 26, 2026
889c46c
feat(ilp): cursor SF -- happy-path WebSocket send loop
bluestreak01 Apr 26, 2026
e17c12d
feat(ilp): wire cursor SF as the only async path; refactor connect st…
bluestreak01 Apr 26, 2026
9781771
refactor(ilp): strip QwpWebSocketSender of legacy SF / sync paths
bluestreak01 Apr 26, 2026
36263c4
fix(ilp): cursor recovery — derive next FSN from on-disk segments
bluestreak01 Apr 26, 2026
4f4e1e5
docs: cursor SF — durability & reconnect spec
bluestreak01 Apr 27, 2026
3caa2d3
feat(ilp): close() drain — bounded ACK wait via close_flush_timeout_m…
bluestreak01 Apr 27, 2026
71afa21
feat(ilp): connectionGeneration foundation + encode-mid-reconnect retry
bluestreak01 Apr 27, 2026
0ec66f3
feat(ilp): cursor I/O loop reconnect + replay
bluestreak01 Apr 27, 2026
8828038
feat(ilp): cursor reconnect policy — backoff cap + auth-terminal
bluestreak01 Apr 27, 2026
f152583
feat(ilp): slot directory model — sender_id + advisory exclusive .lock
bluestreak01 Apr 27, 2026
40f9742
feat(ilp): initial-connect retry opt-in + replay/attempt counters
bluestreak01 Apr 27, 2026
b9b6e2f
feat(ilp): orphan-slot scanner + .failed sentinel + drain_orphans knob
bluestreak01 Apr 27, 2026
520231c
feat(ilp): cursor frames are self-sufficient — full schemas, full dict
bluestreak01 Apr 27, 2026
fa5c838
fix(ilp): recovery replays sealed segments from baseSeq, not active
bluestreak01 Apr 27, 2026
c25773f
feat(ilp): background drainer pool — adopt orphan slots and replay them
bluestreak01 Apr 27, 2026
267b380
docs(ilp): TODO for cursor SF — multi-host failover, deferred items
bluestreak01 Apr 27, 2026
923dcb4
fix(ilp): cursor SF review fixes — perf, cap, torn-tail, races
bluestreak01 Apr 27, 2026
07b930a
fix(ilp): cursor SF correctness — concurrency, lifecycle, findFirst
bluestreak01 Apr 28, 2026
05c3829
test(ilp): wrap SF cursor tests in assertMemoryLeak; PR-17 regression…
bluestreak01 Apr 28, 2026
36b0839
error handling
bluestreak01 Apr 28, 2026
41ae975
fix(ilp): cursor SF — apply PR-17 review critical and moderate findings
bluestreak01 Apr 28, 2026
052f6ee
Make close() rethrow latched terminal errors
bluestreak01 Apr 29, 2026
fc8d8b3
Rebuild CXX libraries
Apr 29, 2026
12049d8
Add async initial connect for cursor SF sender
bluestreak01 Apr 29, 2026
41b9ec0
ci: auto-detect matching questdb branch
bluestreak01 Apr 29, 2026
ce92148
test(ilp): align cursor SF tests with close() rethrow + drainer changes
bluestreak01 Apr 29, 2026
13ea8a2
Harden close() error preservation and SegmentManager lifecycle
bluestreak01 Apr 29, 2026
16a3eb6
Merge branch 'vi_sf' of https://github.com/questdb/java-questdb-clien…
bluestreak01 Apr 29, 2026
9e298e7
test fix
bluestreak01 Apr 29, 2026
9be35cb
bugfix
bluestreak01 Apr 30, 2026
a6b45c3
fix(ilp): cross-platform drainer/slot-lock and quieter PARSE_ERROR logs
bluestreak01 Apr 30, 2026
21d885b
Defer cursor SF trim to durable-ack when opted in
bluestreak01 May 3, 2026
5ae7149
Switch durable-ack fuzz to QuestDB Rnd + TestUtils.generateRandom
bluestreak01 May 4, 2026
de8ba19
Add async progress callback, fast-close opt-out, and test coverage
bluestreak01 May 4, 2026
45f4ae5
Send keepalive PINGs when waiting on durable-ack confirmations
bluestreak01 May 4, 2026
12fc150
Make durable-ack keepalive PING cadence configurable
bluestreak01 May 4, 2026
1125b0b
Make close() rethrow gated on whether the user already saw the error
bluestreak01 May 5, 2026
1d61de7
documentation
bluestreak01 May 5, 2026
387fe91
review
bluestreak01 May 5, 2026
396d20b
1. ingress support multi hosts 2. egress failover optimise
kafka1991 May 6, 2026
9463d6a
1. ingress support multi hosts 2. egress failover optimise
kafka1991 May 6, 2026
6671888
code review
kafka1991 May 6, 2026
fad90cc
use 421 instead of 503
kafka1991 May 6, 2026
cab4429
fix tests
kafka1991 May 6, 2026
33b4420
code review
kafka1991 May 7, 2026
055d0b2
fix serval bugs
kafka1991 May 7, 2026
ec23894
final code review
kafka1991 May 7, 2026
5299364
QWiP failover
bluestreak01 May 7, 2026
40b6ddd
Allow fd=0 in FilesTest.testWriteReadRoundtrip
bluestreak01 May 8, 2026
6896173
Fast-fail durable-ack opt-in on 421 role-reject
bluestreak01 May 8, 2026
d9fbf1f
Accept comma-separated addr list; rename auth_timeout
bluestreak01 May 8, 2026
079f32e
Merge remote-tracking branch 'origin/vi_sf' into ingress_role
kafka1991 May 8, 2026
28e0632
Merge pull request #20 from questdb/ingress_role
kafka1991 May 8, 2026
86c25a4
fix tests
kafka1991 May 8, 2026
0a26e3d
remove obsolete config IN_FLIGHT_WINDOW_SIZE
kafka1991 May 8, 2026
a2c8b04
ingress spec alignment
bluestreak01 May 8, 2026
20c8c5a
fix close and drain race condition
kafka1991 May 8, 2026
9b4756d
Merge remote-tracking branch 'origin/vi_sf' into vi_sf
bluestreak01 May 8, 2026
f3f3ff2
Restore test accessors after vi_sf merge
bluestreak01 May 8, 2026
0998e0a
Merge remote-tracking branch 'origin/vi_sf' into vi_sf
bluestreak01 May 8, 2026
00d3f63
QWP egress: parse optional SERVER_INFO zone trailer
bluestreak01 May 8, 2026
4b82c1f
QWP failover: align ingress impl with spec
bluestreak01 May 8, 2026
ebe91fe
QwpQueryClient: surface auth failures during failover reconnect
bluestreak01 May 8, 2026
3d5e480
QWP egress: add zone= connect-string support
bluestreak01 May 8, 2026
a8a2da4
QWP ingress: fast-fail durable-ack opt-in against role-rejected upgrades
bluestreak01 May 8, 2026
4d610d6
review-pr skill: sync with parent questdb submodule
bluestreak01 May 8, 2026
13e2f6b
fix ent tests
bluestreak01 May 8, 2026
0d867bb
tests
bluestreak01 May 9, 2026
3d36dba
fix CI
bluestreak01 May 9, 2026
d39d113
review
bluestreak01 May 10, 2026
f699ef0
review 2
bluestreak01 May 10, 2026
19537ed
make trim-deregister race test deterministic
bluestreak01 May 11, 2026
22241e6
ack watermark
bluestreak01 May 12, 2026
3988001
update spec and add test
bluestreak01 May 12, 2026
7b583d3
Reject unknown keys in connect-string parser
mtopolnik May 12, 2026
a16c343
Add cursor SF observability counters
mtopolnik May 12, 2026
3f1450a
Defer durable-ack keepalive PING after sent frames
mtopolnik May 12, 2026
83d06e7
Align reconnect classification with sf-client spec
mtopolnik May 12, 2026
f980a8b
Accept in_flight_window as a no-op key
mtopolnik May 12, 2026
2c5606f
Parse max_buf_size / init_buf_size as size
mtopolnik May 12, 2026
2281ec1
Guard drainer backoff jitter against zero initial backoff
mtopolnik May 12, 2026
4e98ba2
Allocate test ports via ServerSocket(0) to avoid collisions
mtopolnik May 12, 2026
2b8cd7e
Reserve disk blocks when creating SF segments
mtopolnik May 12, 2026
c66376b
Make race tests deterministic
mtopolnik May 12, 2026
0b90996
test fixes
bluestreak01 May 12, 2026
49f35fe
Make SegmentManagerTotalBytesRaceTest non-racy
bluestreak01 May 12, 2026
17aa5e7
Suppress double-throw of latched terminal error in close()
bluestreak01 May 13, 2026
5945452
Make dispatcherThread volatile in lazy-start dispatchers
mtopolnik May 12, 2026
a4683b5
Formatting
mtopolnik May 12, 2026
8a6d5a3
Gate durable trim counter on watermark advance
mtopolnik May 13, 2026
cfecbd3
Flag dropped-batch caveat in progress handler doc
mtopolnik May 13, 2026
0ad29bd
Fix broken @see links on Sender.fromConfig
mtopolnik May 13, 2026
67b0cfa
Fix Javadoc
mtopolnik May 13, 2026
ed41cb7
Remove unused consecutiveSendErrors counter
mtopolnik May 13, 2026
d270a11
Rename recoveredFromDisk to wasRecoveredFromDisk
mtopolnik May 13, 2026
1398bb5
Reserve real disk blocks in Windows Files_allocate
mtopolnik May 13, 2026
3a82f3e
Rebuild CXX libraries
May 13, 2026
bde0b56
Drop oldest on dispatcher inbox overflow
mtopolnik May 13, 2026
c35237c
Stop masking F_PREALLOCATE failures on macOS
mtopolnik May 13, 2026
265ad20
Merge branch 'main' into vi_sf
mtopolnik May 13, 2026
b5acb4f
Accumulate repeated addr= keys in egress config
mtopolnik May 13, 2026
dfede71
Bracket close() in role-reject race test
mtopolnik May 13, 2026
cef1c9b
Enforce spec minimum of 16 for error_inbox_capacity
mtopolnik May 13, 2026
93dc73c
Rebuild CXX libraries
May 13, 2026
0a236e9
Re-arm backup wakeup on rotation; fix rename leak
mtopolnik May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/c/share/files.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ JNIEXPORT jboolean JNICALL Java_io_questdb_client_std_Files_allocate
fst.fst_length = (off_t) size;
fst.fst_bytesalloc = 0;
if (fcntl((int) fd, F_PREALLOCATE, &fst) == -1) {
/* Contiguous allocation failed (e.g. fragmented filesystem); retry
* non-contiguous all-or-nothing. Only fall through to ftruncate when
* the filesystem doesn't support F_PREALLOCATE at all; real failures
* (notably ENOSPC) must surface so the caller doesn't end up with a
* sparse file that SIGBUSes on later mmap store (sf-client.md §6). */
fst.fst_flags = F_ALLOCATEALL;
(void) fcntl((int) fd, F_PREALLOCATE, &fst);
/* if F_PREALLOCATE fails we still try ftruncate to set logical size */
if (fcntl((int) fd, F_PREALLOCATE, &fst) == -1
&& errno != ENOTSUP && errno != EOPNOTSUPP) {
return JNI_FALSE;
}
}
#endif
int res2;
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ final class LineSenderBuilder {
private static final long DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
private static final int DEFAULT_WS_AUTO_FLUSH_ROWS = 1_000;
private static final int MIN_BUFFER_SIZE = AuthUtils.CHALLENGE_LEN + 1; // challenge size + 1;
// sf-client.md section 4.4: the inbox capacity must accommodate the
// distinct error categories in a bursty error stream so that drop-oldest
// does not erase the trailing distribution of categories.
private static final int MIN_ERROR_INBOX_CAPACITY = 16;
// The PARAMETER_NOT_SET_EXPLICITLY constant is used to detect if a parameter was set explicitly in configuration parameters
// where it matters. This is needed to detect invalid combinations of parameters. Why?
// We want to fail-fast even when an explicitly configured options happens to be same value as the default value,
Expand Down Expand Up @@ -1864,15 +1868,20 @@ public LineSenderBuilder errorHandler(io.questdb.client.SenderErrorHandler handl
*
* <p>WebSocket transport only; setting on other transports throws.
*
* @param capacity must be {@code >= 1}
* @param capacity must be {@code >= 16} (sf-client.md section 4.4).
* The floor exists because overflow drops the oldest
* entry and watermarks are monotonic, so the inbox
* must be wide enough to keep a useful trailing
* window of categories under bursty errors.
* @return this instance for method chaining
*/
public LineSenderBuilder errorInboxCapacity(int capacity) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("error_inbox_capacity is only supported for WebSocket transport");
}
if (capacity < 1) {
throw new LineSenderException("error_inbox_capacity must be >= 1, was " + capacity);
if (capacity < MIN_ERROR_INBOX_CAPACITY) {
throw new LineSenderException("error_inbox_capacity must be >= "
+ MIN_ERROR_INBOX_CAPACITY + ", was " + capacity);
}
this.errorInboxCapacity = capacity;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ private QwpQueryClient(String host, int port) {
* <ul>
* <li>{@code addr=host[:port][,host[:port]...]} -- required. Comma-separated list of
* WebSocket endpoints; {@link #connect()} walks them in order and stops at the
* first matching {@code target=}. Default port on each entry is {@value #DEFAULT_WS_PORT}.</li>
* first matching {@code target=}. Default port on each entry is {@value #DEFAULT_WS_PORT}.
* Per {@code failover.md} section 1, the comma form and repeated {@code addr=} keys
* both accumulate into a single ordered list; empty entries are rejected.</li>
* <li>{@code target=any|primary|replica} -- endpoint filter applied against the role
* byte from the v2 {@code SERVER_INFO} frame. Default {@code any}. {@code primary}
* accepts {@code PRIMARY}, {@code PRIMARY_CATCHUP} and {@code STANDALONE}.</li>
Expand Down Expand Up @@ -359,7 +361,7 @@ public static QwpQueryClient fromConfig(CharSequence configurationString) {
"unsupported schema [schema=" + sink + ", supported-schemas=[ws, wss]]");
}

List<Endpoint> parsedEndpoints = null;
List<Endpoint> parsedEndpoints = new ArrayList<>();
String path = DEFAULT_ENDPOINT_PATH;
String target = TARGET_ANY;
Boolean failover = null;
Expand Down Expand Up @@ -398,7 +400,9 @@ public static QwpQueryClient fromConfig(CharSequence configurationString) {
String value = sink.toString();
switch (key) {
case "addr":
parsedEndpoints = parseEndpointList(value);
// failover.md §1: comma syntax and repeated addr= keys must
// accumulate. parseEndpointList rejects empty entries.
parsedEndpoints.addAll(parseEndpointList(value));
break;
case "target":
if (!TARGET_ANY.equals(value) && !TARGET_PRIMARY.equals(value) && !TARGET_REPLICA.equals(value)) {
Expand Down Expand Up @@ -551,7 +555,7 @@ public static QwpQueryClient fromConfig(CharSequence configurationString) {
throw new IllegalArgumentException("unknown configuration key: " + key);
}
}
if (parsedEndpoints == null || parsedEndpoints.isEmpty()) {
if (parsedEndpoints.isEmpty()) {
throw new IllegalArgumentException("missing required key: addr");
}
boolean hasBasic = username != null || password != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public class QwpWebSocketSender implements Sender {
private static final int DEFAULT_MICROBATCH_BUFFER_SIZE = 1024 * 1024; // 1MB
private static final Logger LOG = LoggerFactory.getLogger(QwpWebSocketSender.class);
private static final int MAX_TABLE_NAME_LENGTH = 127;
// sf-client.md section 4.4 floor: drop-oldest under bursts needs a wide
// enough window to preserve the trailing category distribution.
private static final int MIN_ERROR_INBOX_CAPACITY = 16;
private static final String WRITE_PATH = "/write/v4";
private final String authorizationHeader;
private final int autoFlushBytes;
Expand Down Expand Up @@ -1725,10 +1728,13 @@ public void setErrorHandler(SenderErrorHandler handler) {
/**
* Configure the bounded inbox capacity used by the dispatcher. Must be
* called before {@code connect()}; later changes have no effect.
* The minimum follows sf-client.md section 4.4: drop-oldest under bursts
* needs a wide enough window to preserve the trailing category distribution.
*/
public void setErrorInboxCapacity(int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("errorInboxCapacity must be >= 1, was " + capacity);
if (capacity < MIN_ERROR_INBOX_CAPACITY) {
throw new IllegalArgumentException("errorInboxCapacity must be >= "
+ MIN_ERROR_INBOX_CAPACITY + ", was " + capacity);
}
this.errorInboxCapacity = capacity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ public long appendOrFsn(long payloadAddr, int payloadLen) {
hotSpare = null;
// Fresh active just consumed the spare → ask the manager to start
// making the next one immediately, before this segment fills.
// The flag is per-active and tracks whether the backup-signal
// branch has fired for the *current* active. Rotation installs a
// new active, so the flag resets here to re-arm the backup branch.
// Plain field reset is safe (producer-only state).
wakeupRequestedForActive = true;
wakeupRequestedForActive = false;
Runnable wakeup = managerWakeup;
if (wakeup != null) {
wakeup.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -61,10 +60,10 @@ public final class SenderConnectionDispatcher implements QuietCloseable {
SenderConnectionEvent.NO_ROUND_NUMBER,
null, 0L);
private final AtomicLong dropped = new AtomicLong();
// volatile so the user can swap the listener post-connect, mirroring
// SenderErrorDispatcher's setHandler contract.
private volatile SenderConnectionListener listener;
private final BlockingQueue<SenderConnectionEvent> inbox;
// Deque (not plain queue) so offer() can drop the head when the inbox is
// full, per the drop-oldest contract inherited from SenderErrorDispatcher
// (sf-client.md section 14.6).
private final LinkedBlockingDeque<SenderConnectionEvent> inbox;
// First offer() that observes a null thread wins the race to spawn it.
private final Object lock = new Object();
private final String threadName;
Expand All @@ -75,6 +74,9 @@ public final class SenderConnectionDispatcher implements QuietCloseable {
// double-checked first-null guard is a JMM race -- benign in practice
// (the synchronized re-check covers double-start) but spec-incorrect.
private volatile Thread dispatcherThread;
// volatile so the user can swap the listener post-connect, mirroring
// SenderErrorDispatcher's setHandler contract.
private volatile SenderConnectionListener listener;

public SenderConnectionDispatcher(SenderConnectionListener listener) {
this(listener, DEFAULT_CAPACITY, "qdb-sf-connection-dispatcher");
Expand All @@ -92,7 +94,7 @@ public SenderConnectionDispatcher(SenderConnectionListener listener, int capacit
throw new IllegalArgumentException("capacity must be >= 1, was " + capacity);
}
this.listener = listener;
this.inbox = new ArrayBlockingQueue<>(capacity);
this.inbox = new LinkedBlockingDeque<>(capacity);
this.threadName = threadName;
}

Expand All @@ -103,7 +105,6 @@ public void close() {
return;
}
closed = true;
//noinspection ResultOfMethodCallIgnored
inbox.offer(POISON);
Thread t = dispatcherThread;
if (t != null) {
Expand All @@ -128,9 +129,10 @@ public void close() {
}

/**
* Total events dropped via inbox-overflow since startup. Non-zero means the
* listener is slower than the event rate -- typically a symptom of a
* misbehaving listener. Reported by the sender for ops dashboards.
* Total events discarded by the drop-oldest overflow policy since startup.
* Non-zero means the listener is slower than the event rate -- typically
* a symptom of a misbehaving listener. Reported by the sender for ops
* dashboards.
*/
public long getDroppedNotifications() {
return dropped.get();
Expand All @@ -145,36 +147,48 @@ public long getTotalDelivered() {
}

/**
* Replace the user-supplied listener. Effective for the next delivery.
* Null reverts to the loud-not-silent default.
*/
public void setListener(SenderConnectionListener listener) {
this.listener = listener != null ? listener : DefaultSenderConnectionListener.INSTANCE;
}

/**
* Non-blocking enqueue. Returns {@code true} if the event will be
* delivered to the listener (eventually, on the dispatcher thread).
* Returns {@code false} if the inbox was full or the dispatcher was
* closed -- caller's only obligation is to not block.
* Non-blocking enqueue. Always admits the new event unless the dispatcher
* is closed or {@code event} is null. Returns {@code true} when the new
* event is enqueued for delivery, {@code false} when rejected outright.
*
* <p>When the inbox is full, drops the oldest pending entry to make room
* (sf-client.md section 14.6, inherited contract) and bumps
* {@link #getDroppedNotifications()}. The newest entry is always retained
* because later events carry the most recent connection state.
*
* <p>Lazy-starts the dispatcher thread on the first successful offer.
*/
public boolean offer(SenderConnectionEvent event) {
if (closed || event == null) {
return false;
}
boolean accepted = inbox.offer(event);
if (!accepted) {
dropped.incrementAndGet();
return false;
// Drop-oldest overflow policy. The pollFirst()/offerLast() pair is
// not atomic with the consumer, but the consumer can only remove
// entries (never add). The retry loop converges in at most one extra
// iteration under SPSC; close()'s POISON enqueue widens the race
// briefly but the `closed` re-check exits cleanly.
while (!inbox.offerLast(event)) {
if (inbox.pollFirst() != null) {
dropped.incrementAndGet();
}
if (closed) {
return false;
}
}
if (dispatcherThread == null) {
startDispatcherIfNeeded();
}
return true;
}

/**
* Replace the user-supplied listener. Effective for the next delivery.
* Null reverts to the loud-not-silent default.
*/
public void setListener(SenderConnectionListener listener) {
this.listener = listener != null ? listener : DefaultSenderConnectionListener.INSTANCE;
}

private void dispatchLoop() {
while (!closed || !inbox.isEmpty()) {
SenderConnectionEvent ev;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -48,11 +47,14 @@
* the daemon dispatcher takes from the queue and invokes the handler.
*
* <h2>Backpressure</h2>
* The queue is bounded ({@code capacity}, default 256). When full,
* {@link #offer} returns {@code false} immediately and bumps
* The inbox is bounded ({@code capacity}, default 256). When full, {@link #offer}
* drops the oldest entry to admit the new one and bumps
* {@link #getDroppedNotifications()}. The I/O thread does NOT spin or block.
* A non-zero dropped count means the handler is too slow to keep up — visible
* to operators via the sender's accessor.
* to operators via the sender's accessor. Drop-oldest is mandated by
* sf-client.md section 14.6: watermarks are monotonic, so the latest entry is
* always the most informative and drops compress information rather than
* lose it.
*
* <h2>Lifecycle</h2>
* The dispatcher thread is started lazily on the first successful
Expand Down Expand Up @@ -88,7 +90,11 @@ public final class SenderErrorDispatcher implements QuietCloseable {
// and reconfigurable apps install a new handler at any time without
// tearing down the dispatcher thread.
private volatile SenderErrorHandler handler;
private final BlockingQueue<SenderError> inbox;
// Deque (not plain queue) so offer() can drop the head when the inbox is
// full, per spec section 14.6. SPSC in steady state: the I/O thread is
// the sole producer, the dispatcher is the sole consumer; close() also
// enqueues POISON, but only once and under `lock`.
private final LinkedBlockingDeque<SenderError> inbox;
// Threads are started lazily under this monitor; takes the same role as
// SegmentManager.start() — first offer() that observes a null thread
// wins the race to spawn it.
Expand Down Expand Up @@ -118,7 +124,7 @@ public SenderErrorDispatcher(SenderErrorHandler handler, int capacity, String th
throw new IllegalArgumentException("capacity must be >= 1, was " + capacity);
}
this.handler = handler;
this.inbox = new ArrayBlockingQueue<>(capacity);
this.inbox = new LinkedBlockingDeque<>(capacity);
this.threadName = threadName;
}

Expand Down Expand Up @@ -170,10 +176,10 @@ public void close() {
}

/**
* Total errors delivered via inbox-overflow drop since startup. Non-zero
* means the user's handler is slower than the error rate — typically a
* symptom of a misbehaving handler or a misconfigured server. Reported by
* the sender for ops dashboards.
* Total errors discarded by the drop-oldest overflow policy since startup.
* Non-zero means the user's handler is slower than the error rate —
* typically a symptom of a misbehaving handler or a misconfigured server.
* Reported by the sender for ops dashboards.
*/
public long getDroppedNotifications() {
return dropped.get();
Expand Down Expand Up @@ -209,21 +215,33 @@ public void setHandler(SenderErrorHandler handler) {
}

/**
* Non-blocking enqueue. Returns {@code true} if the error will be
* delivered to the handler (eventually, on the dispatcher thread). Returns
* {@code false} if the inbox was full or the dispatcher was closed —
* caller's only obligation is to not block.
* Non-blocking enqueue. Always admits the new error unless the dispatcher
* is closed or {@code error} is null. Returns {@code true} when the new
* error is enqueued for delivery, {@code false} when rejected outright.
*
* <p>When the inbox is full, drops the oldest pending entry to make room
* (sf-client.md section 14.6) and bumps {@link #getDroppedNotifications()}.
* The newest entry is always retained because it carries the most recent
* watermark information.
*
* <p>Lazy-starts the dispatcher thread on the first successful offer.
*/
public boolean offer(SenderError error) {
if (closed || error == null) {
return false;
}
boolean accepted = inbox.offer(error);
if (!accepted) {
dropped.incrementAndGet();
return false;
// Drop-oldest overflow policy. The pollFirst()/offerLast() pair is
// not atomic with the consumer, but the consumer can only remove
// entries (never add). The retry loop converges in at most one extra
// iteration under SPSC; close()'s POISON enqueue widens the race
// briefly but the `closed` re-check exits cleanly.
while (!inbox.offerLast(error)) {
if (inbox.pollFirst() != null) {
dropped.incrementAndGet();
}
if (closed) {
return false;
}
}
// Common case after the first offer: thread already running, hot
// path is one volatile read. Lazy start happens once per dispatcher
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/io/questdb/client/std/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,15 @@ public static void freeNativePath(long pathPtr) {
*/
public static int rename(String oldPath, String newPath) {
long o = pathPtr(oldPath);
long n = pathPtr(newPath);
try {
return rename0(o, n);
long n = pathPtr(newPath);
try {
return rename0(o, n);
} finally {
freePathPtr(n);
}
} finally {
freePathPtr(o);
freePathPtr(n);
}
}

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading
Loading