feat(ilp): QWiP store-and-forward client buffer#17
Merged
Conversation
Opt-in durable buffer for the QWP WebSocket ingest client. Outgoing batches
are persisted to disk before they leave the wire; the server's cumulative
ACK trims sealed segments; on restart or transient failure, the I/O thread
silently reconnects and replays whatever is still on disk.
On-disk format is the QWP wire frame captured verbatim, wrapped in an
8-byte SF envelope (CRC32C + length) so torn tails and silent bit-rot are
caught on recovery. Filenames encode (baseSeq, lastSeq) so trim and
recovery don't have to scan sealed segments.
Auto-reconnect absorbs all transient connection failures with exponential
backoff (capped at 30s); only fatal SF storage errors (corruption, frame
larger than segment cap) propagate to the user. flush() under SF returns
once data is on disk, not on server ACK — natural backpressure when SF
total disk cap is reached makes flush() block until ACKs free space.
All file I/O goes through a new native Files layer ported from the
upstream QuestDB server repo: open/read/write/fsync/truncate/allocate/
length/lock/mkdir/exists/remove/rename/dir-iteration. Software CRC32C
implementation (Castagnoli, polynomial 0x1EDC6F41) added alongside.
Configured via connect string:
ws::addr=...
;store_and_forward=on
;sf_dir=/var/lib/qdb/sf
;sf_max_bytes=67108864 (per-segment rotation; default 64 MiB)
;sf_max_total_bytes=4G (hard cap → backpressure; default unlimited)
;sf_fsync=on (fsync after every append; default off)
49 new tests across 6 files: native Files (9), CRC32C (7 incl. property-
fuzz + bit-flip), SegmentLog (14), SegmentLog torture (7 incl. randomized
op-sequence fuzzer + multi-crash), SF integration (10 incl. multi-
reconnect + replay-during-replay + stress), connect-string from-config
(11). 1956 tests pass total.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Multiple correctness fixes to the QWiP store-and-forward client that were silently losing data under realistic outage scenarios. Critical: - markSending() moved after segmentLog.append; nextBatchSequence only advances on append success. Disk-full retry no longer crashes with IllegalStateException + drift exception, recycling the buffer without persistence (C1, C2). - doReconnectCycle no longer drops pendingBuffer on every reconnect attempt. Buffer survives across attempts and is persisted by the post-reconnect ACTIVE state (C3). - createActive closes fd in try/catch on writeHeader/fsync failure; no more fd leak on every failed rotation under disk pressure (C4). - scanActive/replaySegment reject Files.length(fd) == -1 instead of treating it as "empty segment" (C5). Moderate: - scanActive distinguishes torn tail from mid-stream CRC mismatch; bit-rot followed by trailing bytes throws instead of silently truncating (M1). - Files.close accepts any fd >= 0 (was refusing 0/1/2, leaking lock fd in containers where stdin/stdout/stderr were pre-closed) (M2). - Connect-string sf_max_bytes / sf_max_total_bytes parsed as long; was capped at ~2 GB by parseIntValue (M3). - WebSocketSendQueue.client made volatile so close-during-reconnect reads the live ref, not a stale one (M4). - SegmentLog uses ObjList instead of java.util.ArrayList; bytesOnDisk is cached and updated incrementally so append() is O(1) zero-alloc on the I/O hot path (M6, N3). - Each Segment caches a native UTF-8 path pointer; remove(String) is no longer called per-trim, eliminating the byte[] alloc on the I/O thread per ACK (M7). - retryStalled always re-flags interrupt status (M8). Cleanup: - Dead WebSocketSendQueue.safeSendBatch removed (N1). - @FunctionalInterface on Reconnector (N2). - Inline FQNs in QwpWebSocketSender / Sender replaced with imports (N6). - setSegmentLog overload pair co-located with cleaner doc (N8). - Javadoc added to Files.java public surface and Crc32c.update (N9). - Single-arg failConnection overload removed; every call site is now explicit about fatal vs non-fatal (N10). Infrastructure: - New FilesFacade interface + DefaultFilesFacade impl in io.questdb.client.std. SegmentLog refactored to use the facade so tests can inject OS-level failures (short writes, fstat -1, fsync EIO) without filesystem-level tricks. Tests: - 12 new red regression tests for the bug fixes above (now green). - 5 coverage-gap tests for previously-untested error paths (M9): unsupported version header, baseSeq mismatch, multi-active rejection, oldestSeq edge cases, short-write recovery via fault-injection. - Full SF + Files suite: 70 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three correctness fixes from a follow-up review of the QWiP store-and- forward client. Each is paired with a regression test that fails on the unfixed code and passes after the change. Critical: - WebSocketSendQueue.retryStalled split its catch ladder so a fatal SfException during stall-retry (corruption, oversized frame, fsync EIO) is classified the same as the main-loop sendBatch catch: failConnection(_, true) terminal, not (_, false) reconnect. The old behaviour silently reconnected and recycled the buffer as if sent, hiding storage failures and risking infinite loops on persistent errors. (C1) - SegmentLog.createActive registers the freshly-opened fd into the Segment before calling allocNativePath, and the try block now wraps the path-allocation call. The catch closes the fd and best-effort removes the orphan .sfa file. The previous order leaked one fd per failed rotation under OOM pressure. (C2) - ResponseHandler.onBinaryMessage error branch now fails the connection fatally. A server-side per-batch error (parse, schema mismatch, write, security, internal) is a protocol-level rejection of specific bytes; reconnecting and re-sending the same payload produces the same error. Under SF the rejected frame sits on disk and replay-on-reconnect shipped it again, so the previous transient classification turned any poisoned frame into an unbounded reconnect loop. (C4) Infrastructure: - FilesFacade gains allocNativePath / freeNativePath. SegmentLog now routes all path-pointer alloc/free through the facade so tests can inject OOM at the exact moment between openCleanRW and the try block in createActive. Required for the C2 regression test. Tests: - testCreateActiveDoesNotLeakFdOnAllocNativePathOom (SegmentLogTest) - testRetryStalledTreatsSfStorageErrorAsTerminal (SfIntegrationTest) - testPoisonedFrameInSfDoesNotLoopForever (SfIntegrationTest) - Full suite: 1971 tests pass (was 1968), zero regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SegmentLog.scanDirectory used insertion sort over the segments list. At the documented sf_max_total_bytes / sf_max_bytes ceiling (1 TiB / 64 MiB ≈ 16K segments) that is ~268M comparisons + array shifts → multi- second wall time before the I/O thread can start. Replaced with an in-place quicksort with median-of-three pivot. O(N log N) average, no allocation (matching the surrounding code's discipline), recursion depth bounded by ~2 log₂(N) by always recursing into the smaller partition and looping on the larger. Median-of-three is required because the insertion sort's only saving grace was O(N) on already-sorted input, which is the common case from readdir on filesystems that return entries in lexicographic (and therefore baseSeq-hex) order. A naive first-element-pivot quicksort would degrade back to O(N²) in exactly that scenario. Tests: - testLargeSegmentCountReopensInOrder (SegmentLogTortureTest): generates 1024 sealed segments, reopens, asserts the replay returns every appended seq exactly once in order, and that reopen+replay completes within a generous wall-clock bound that catches a regression back to O(N²) at the production ceiling. - Full SF + adjacent suite: 117/117 pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SegmentLog.rotate freed old.pathPtrNative and assigned the new sealed- path pointer in non-atomic order: free → assign-path → alloc → set sealed. If allocNativePath OOMed mid-sequence the segment was left in two simultaneously broken states: - pathPtrNative still held the freed pointer (the assignment never ran). On close() the segments-cleanup loop called freeNativePath on it again — a native-heap double-free that crashed the JVM with malloc free-list corruption (verified via the new red test on the unfixed code: surefire reported "The forked VM terminated without properly saying goodbye"). - sealed/lastSeqOnDisk were never set, so trim()'s !s.sealed guard silently skipped the segment. The .sfs file on disk was never reclaimed within the lifetime of the process. Fix: - Set old.pathPtrNative=0 immediately after the free so a subsequent OOM cannot leave a stale freed pointer in the field. - Mark sealed=true / lastSeqOnDisk=lastSeq BEFORE allocating the new pointer. After OOM the segment is still classified as sealed so trim can reclaim it. - trim() now handles the recovery case where pathPtrNative is 0 by falling back to ff.remove(path) (one-time per trim, acceptable — these recovery branches only fire after an OOM, not on the hot path). Test: testRotateOomLeavesSegmentInRecoverableSealedState (SegmentLogTest). Forces rotation under an injected allocNativePath OOM, then asserts (a) close() does not double-free, (b) trim() reaps the orphan .sfs file. On the unfixed code the JVM dies; on the fix 118 SF + adjacent tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pre-fix behaviour: SegmentLog.trim only deleted sealed segments. Frames that the server had acknowledged but lived in the still-open active segment stayed on disk until the next natural rotation. On restart the new sender replayed those frames and the public Sender.storeAndForward contract — "trimmed when the server acknowledges it" — was load-bearing on server-side seqTxn dedup to avoid duplicate rows. Worst case at the default 64 MiB segment size: ~640 acked batches re-shipped per restart. Fix: when every frame in the active segment has been acked, force-rotate the active (sealing the file) and immediately remove the just-sealed segment. nextSeq is preserved across the auto-rotate so subsequent appends keep monotonic FSNs. The only safe-guard is the rotate-OOM recovery state from the M2 fix: when active.sealed is already true, the sealed pass above has already trimmed the file and force-rotate is skipped. Tests: - testTrimRotatesAndDropsFullyAckedActiveSegment (SegmentLogTest): unit-level proof that trim with full coverage drops the active contents to a fresh empty segment, with nextSeq preserved. - testTrimPartialAckOfActiveLeavesItIntact (SegmentLogTest, replaces testTrimNeverDeletesActive): proves partial ACKs do not seal a segment that still contains unacked data. - testRestartAfterAckedBatchesReplaysNothing (SfIntegrationTest): end-to-end. Send 5 batches, wait for trim, close, reopen with a fresh sender, send one more, assert server saw exactly 6 frames (5 originals + 1 new, no replays). - testCapturedBytesMatchWireBytes (SfIntegrationTest): updated to use a non-acking handler so the test thread's log.replay() doesn't race the I/O thread's trim. - testAutoReconnectAndReplay (SfIntegrationTest): expected frame count drops from 5 to 4 (msg1 trimmed before reconnect, no replay). - testMultiTableSurvivesReconnect (SfIntegrationTest): expected frame count drops from 6 to 5 (alpha-1 trimmed before reconnect). Public API: - Sender.storeAndForwardDir Javadoc rewritten to honestly describe the new contract: acked batches are reclaimed in real time; only batches whose ACK had not been received before sender shutdown are replayed on the next sender against the same directory. Full suite: 1975 tests pass, zero regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The CRC32C table was lazily computed by the first thread to call Java_io_questdb_client_std_Crc32c_update, with a `volatile int crc32c_table_ready` flag for the once-guard. C's `volatile` does not provide acquire/release semantics — it only suppresses compiler reordering. On weakly-ordered platforms (aarch64, the QuestDB ARM Mac/Linux builds) a second thread could observe `ready == 1` while still reading partial / zero entries from `crc32c_table`, producing a silently wrong CRC. The downstream effect would be SegmentLog.scanActive mis-classifying a valid frame as a torn tail and silently truncating good frames after restart. In practice the JNI transition's implicit barriers and x86's TSO made this benign on the platforms we test on. But the C standard does not guarantee it, and the bug class is the kind that surfaces only under load or after a JVM upgrade tightens its barrier semantics. Fix: drop the lazy init entirely. The table is a deterministic function of the Castagnoli polynomial — pre-compute it once and embed the values as a `static const uint32_t[256]` initializer. Zero runtime cost, zero races, perfectly portable. The polynomial is documented in a comment so the table can be regenerated if needed. Tests: - Existing Crc32cTest (7 tests): empty input, known vector, chaining, zeros stable, property-fuzz over 200 random inputs × 5 splits, bit-flip-changes-CRC over 256 positions, empty-chaining-idempotent. All pass — table values verified correct against the lazy-init algorithm by SegmentLog round-trip tests as well. - Full suite: 1975/1975 pass, zero regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two tests targeting the disk-cap deadlock scenario the reviewer flagged as a separate "high" severity finding. Both pass under the per-frame trim that landed in the previous commit; both would deadlock on the pre-fix code where trim left the active segment alone. testSingleActiveSegmentDoesNotDeadlockOnFullCap (new): Sets sf_max_bytes == sf_max_total_bytes, so no natural rotation can ever fire — the append-time projection check raises disk-full before rotate() is reached. Pre per-frame trim this state was permanent: the active was the only segment on disk, trim couldn't touch it, ACKs freed nothing. Force-rotate-on-fully-acked makes the active itself reclaimable, so an ACK covering every appended frame now restores capacity. The test stresses recovery further by refilling to disk-full a second time. testMaxTotalBytesTriggersDiskFullThenRecoversOnAck (renamed from testMaxTotalBytesTriggersDiskFull): The "Acceptable: only the active was on disk and active doesn't trim" branch in the catch block — which the reviewer specifically cited as evidence the deadlock was tolerated by the test suite — is gone. The recovery append after trim now must succeed; we assert it directly instead of swallowing a second disk-full. Full suite: 1976 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sender.storeAndForwardFsync's Javadoc claimed the default sf_fsync=off "runs fsync on rotation and on explicit flush()". In practice flush() never called segmentLog.fsync() — the only production fsync paths were per-append (gated on fsync_each_append, the sf_fsync=on path), rotation (rare), and new-segment header creation (rare). With default config a sender that flushes coarsely between rotations was leaving all bytes in the OS page cache; an OS crash would lose them despite the docs implying durability. Two-part fix: 1. Doc honesty (Sender.java): storeAndForwardFsync rewritten to spell out exactly what sf_fsync=off and sf_fsync=on mean. The default leaves bytes between rotations in the page cache — process crashes survive, OS crashes don't. 2. Opt-in fsync-on-flush: New knob storeAndForwardFsyncOnFlush(boolean) on the builder, parsed as sf_fsync_on_flush=on/off in the connect string. When enabled, every flush() (and the implicit flush in close()) routes a fsync request to the I/O thread before returning. Off by default — small-batch + frequent-flush senders pay one disk fsync per call, which is unacceptable for high-rate workloads. The fsync runs on the I/O thread because SegmentLog is single- threaded (the I/O thread owns every read/write/trim/rotate). Calling segmentLog.fsync() from the user thread would race against an in-flight trim() (which may force-rotate the active under per-frame trim) or append() from a concurrent send. The signal pattern is the same one used by ping/pong: user sets fsyncRequested + waits on fsyncComplete; I/O thread observes the flag at the top of its iteration, performs the fsync, publishes outcome via fsyncError + fsyncComplete. Concurrent callers are serialised by fsyncLock so each gets its own round-trip. Tests: - testFlushDoesNotFsyncByDefault (SfIntegrationTest): with fsyncOnFlush=false the FsyncCountingFacade observes ZERO fsyncs during flush() — proves we did not regress the small-batch hot path. - testFlushFsyncsWhenOptedIn (SfIntegrationTest): with fsyncOnFlush=true the same counter observes >= 1 fsync per flush() — proves the wiring is end-to-end. - testSfFsyncOnFlushParses (SfFromConfigTest): connect-string round-trip. - testInvalidSfFsyncOnFlushValueRejected (SfFromConfigTest): bad value rejected with a useful message. - testSfFsyncOnFlushOnTcpRejected (SfFromConfigTest): TCP transport rejects the WebSocket-only knob. Full suite: 1981 tests pass, zero regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pre-fix: flushPendingRows checked schemaResetNeeded once, at the top of
the encode pass. If the I/O thread completed a reconnect AFTER that
check but BEFORE encoder.finishMessage, the encoded bytes carried
stale schema-id refs into the previous connection's id space. Those
bytes then went through segmentLog.append (persisted to SF) and out to
the new server, which rejected them. Pre-C4: silent unbounded
reconnect-replay loop. Post-C4: terminal failure with no self-heal —
the user has to manually clear the SF dir to recover.
Fix (option C from the previous review): connection-generation tag on
each encoded batch.
- New volatile long QwpWebSocketSender.connectionGeneration, bumped
by performReconnect AFTER schemaResetNeeded is flipped. Order is
load-bearing: a reader that observes the new generation also sees
the new schemaResetNeeded (volatile happens-before within the
writer thread).
- flushPendingRows now wraps the encode in a retry loop:
long genBefore = connectionGeneration; // read FIRST
if (schemaResetNeeded) reset;
encode...
if (connectionGeneration != genBefore) discard + retry;
Re-encoding is cheap because the source rows in QwpTableBuffer are
not reset until AFTER sealAndSwapBuffer (line 1830 in this commit).
encoder.beginMessage internally calls buffer.reset(), so the
discard step is implicit.
- Bounded at MAX_SCHEMA_RACE_RETRIES = 10. Reconnects firing faster
than a single encode is pathological and surfaces as
LineSenderException to the user rather than a silent infinite
loop. countNonEmptyTables extracted as a small helper so the
retry loop reads cleanly.
Together with C4 (server-error responses are now terminal) the
schema-reset race goes from "silent data corruption + infinite loop"
to "no poisoned batch ever reaches SF in the first place".
Tests:
- testGenerationBumpBetweenBatchesTriggersSchemaReset (SfIntegrationTest):
reflectively bumps connectionGeneration + sets schemaResetNeeded
between batches; asserts the next batch carries a fresh schema
definition (frame size >= the first batch).
- testSchemaResetRaceUnderConcurrentBumps (SfIntegrationTest, 30s
timeout): spawns a bumper thread that flips schemaResetNeeded +
bumps generation on a 50us cadence while the main thread flushes
200 batches in a tight loop. Asserts every batch either ships
successfully OR the bounded MAX_SCHEMA_RACE_RETRIES fail-fast
trips — never a silent escape, never an unexpected exception.
Mid-encode injection without instrumentation is timing-sensitive;
the stress test is a smoke check that the retry loop does not crash
under load. End-to-end "poisoned bytes never reach the server"
verification would need a strict QWP-wire-format-validating server
test handler — left as a future test.
Full suite: 1983 tests pass, zero regressions.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each fix is paired with a red-then-green regression test.
C1 — SegmentLog.rotate() partial failure deadlock
When rotate's allocNativePath OOMs or createActive fails after the
rename succeeds, `active` is left pointing at a sealed segment with
fd=-1. A subsequent small append that fits under the cap bypasses
the rotate trigger and falls through to ff.write(fd=-1, ...) which
returns -1 and is wrapped as the recoverable SfDiskFullException.
The I/O thread retries forever (disk-full backpressure path) and
the user thread blocks in flush() — silent deadlock. Guard added at
the top of append() that throws a fatal SfException when active is
in the post-rotate sealed/fd=-1 state.
C2 — Symbol-delta watermark not reset on reconnect
resetSchemaStateForNewConnection cleared maxSentSchemaId and
per-table schema ids but left maxSentSymbolId and
currentBatchMaxSymbolId untouched. The encoder's first
post-reconnect batch then shipped a delta dictionary that excluded
every symbol id <= the old server's high-water mark; column refs
into the new server's empty dictionary decoded as garbage (or were
rejected). Both watermarks are now reset alongside the schema
state.
C3 — trim() forgets sealed segments when remove() fails
trimSealedSegments discarded the boolean return of ff.remove(). On
Windows sharing-violation under antivirus, transient NFS errors,
ESTALE, etc., the file stayed on disk while bytesOnDiskCache was
decremented and the segment was dropped from the in-memory list.
Failure modes: (a) bytesOnDisk underreports reality, so
sf_max_total_bytes stops being an enforceable cap; (b) on next
process start scanDirectory rediscovers the orphan .sfs and
re-ships its already-acked frames to the new server. Failed
removes now keep the segment in the list with a removePending
flag — bytesOnDiskCache stays honest, replay() skips removePending
segments (so already-acked frames don't re-ship), the next trim()
retries naturally, and close() does a last-chance retry too.
C4 — Future server ACK can delete unsent SF data
InFlightWindow.acknowledgeUpTo clamps incoming server sequence at
highestSent; ResponseHandler.onBinaryMessage was passing the raw
uncapped sequence into segmentLog.trim(fsnAtZero + sequence) with
no symmetric clamp. A buggy/replayed/malformed server ACK with a
sequence beyond what the client had sent drove SegmentLog.trim
past every real lastSeq, force-rotating the active segment and
unlinking every sealed segment whose lastSeq <= the bogus value —
including frames mid-replay the new server had never seen.
Permanent silent data loss. The trim path now clamps the sequence
at nextBatchSequence-1 (mirroring the InFlightWindow cap) and
emits a WARN when the cap fires.
C5 — Replay window-wait spin hangs after mid-replay socket drop
replayPersistedFrames's window-wait spin only called
tryReceiveAcks while client.isConnected() returned true. When the
server dropped mid-replay, isConnected went false, no ACKs could
arrive, hasWindowSpace stayed false, and the spin ran forever —
preventing the outer state machine from running another
doReconnectCycle and blocking flush()/close() until the user
signalled shutdown. Compounding: replayPersistedFrames swallowed
internal failures via failConnection(non-fatal) and returned
normally, so doReconnectCycle returned true and ioLoop cleared
reconnectRequested — losing the failure. The spin now exits on
!isConnected or reconnectRequested; doReconnectCycle clears the
stale reconnectRequested before replay (so the freshly-reconnected
spin doesn't bail) and re-checks it after replay (so internal
failures propagate to the outer loop's backoff/retry).
Tests:
SegmentLogTest:
+ testRotateOomThenSmallAppendThrowsFatalNotDiskFull (C1)
+ testTrimRemoveFailureMustNotForgetSealedSegment (C3)
SfIntegrationTest:
+ testReconnectResetsSymbolWatermark (C2)
+ testFutureAckMustNotTrimUnsentSfData (C4)
+ testReplayMustNotHangWhenConnectionDropsMidReplay (C5)
All 1988 client tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plain main()-style benchmark (same idiom as StacBenchmarkClient — no JMH
dependency required). Measures the per-frame latency of the SF persist
path: CRC32C over the payload, frame-envelope construction, two pwrite
syscalls (header + payload), bookkeeping, and an optional fsync when
--fsync=each. Reports min / p50 / p90 / p99 / p99.9 / max in nanoseconds
plus throughput in frames/sec and MB/sec.
Smoke run on darwin-aarch64 (APFS):
--payload-bytes=512 --measure=20000 --fsync=off
p50 ≈ 4 µs, p99 ≈ 14 µs, ~150K frames/sec, ~74 MB/sec
--payload-bytes=512 --measure=5000 --fsync=each
p50 ≈ 28 µs, p99 ≈ 900 µs, ~16K frames/sec, ~8 MB/sec
Run via Maven exec or directly from the IDE; the class lives under
core/src/test so it has free access to the SF code path without adding
to the production classpath.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the byte-at-a-time CRC32C inner loop in the JNI implementation with a slice-by-8 variant that consumes 8 input bytes per iteration via eight parallel 256-entry table lookups whose results are XORed. The seven additional tables (~7 KB of static read-only data) are derived from the existing crc32c_table at build time using the standard `table[k][i] = (table[k-1][i] >> 8) ^ table[0][table[k-1][i] & 0xFF]` recurrence, which corresponds to "advance the input by one more zero byte". They are emitted as static const initialisers — same rationale as the original table: hard-coding sidesteps the C-memory-model pitfalls of lazy initialisation on weakly-ordered platforms. Measured on darwin-aarch64 with SegmentLogLatencyBenchmark at the typical 512-byte SF frame payload (warmup=50_000, measure=500_000, fsync=off): before: min ~12_000 ns, p50 ~14_000 ns after: min 2_625 ns, p50 3_625 ns That collapses the per-append CRC cost from the dominant term (~85% of p50) to a small fraction, which is what the SF store-and-forward layer needs at high frame rates. The tail (p99/p99.9) is dominated by pwrite syscalls and OS scheduling, not CRC, and is unchanged. Correctness is covered by the existing Crc32cTest suite — in particular testChainingPropertyOverManyRandomInputs (200 random buffers up to 2048 bytes, 5 random split points each) which exercises the slice-8 main loop plus byte-at-a-time tail across every alignment offset and length class the SF path can produce. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three independent recovery-time bugs in SegmentLog that all let a
durability layer silently produce or operate on a wrong view of the
on-disk log. Each fix has a red regression test that fails on the
unfixed code and passes after the fix.
1. Mid-rotate crash recovery resets FSN sequence to 0.
rotate() has a window between ff.rename(.sfa → .sfs) and the
subsequent createActive(lastSeq + 1) where the process can die or
createActive can throw (allocNativePath OOM, openCleanRW failure,
etc.) leaving on disk: one or more sealed .sfs files, no .sfa.
openInternal saw active==null after scanDirectory and unconditionally
called createActive(FIRST_SEQ=0), restarting FSN assignment at 0
even though sealed segments on disk already covered 0..N. The new
active produced frames whose FSNs collided with sealed FSNs already
on disk, breaking ACK translation, trim, and replay against data
the recovery never saw. Fix derives the new active's baseSeq from
the highest sealed lastSeqOnDisk + 1 (segments is sorted by baseSeq
and sealed ranges are non-overlapping, so the last entry holds the
largest lastSeqOnDisk).
Tests:
- testMidRotateCrashRecoveryPreservesFsnMonotonicity (fault
injection: failNextActiveAllocNativePath inside rotate()).
- testRestartWithOnlySealedSegmentsRecoversCorrectly (independent
coverage via pure on-disk filesystem manipulation — write frames,
manually rename .sfa to .sfs — to exercise the open/recovery code
in isolation from rotate's failure handling, then verify the full
contract: nextSeq, oldestSeq, replay order, and post-restart
append).
2. oldestSeq() returned a removePending segment's baseSeq even though
replay() skips it.
trim() keeps an undeletable sealed segment in the in-memory list as
removePending; replay() correctly skips such segments so already-
acked frames are not re-shipped on reconnect. oldestSeq() returned
segments.getQuick(0).baseSeq unconditionally — including when the
first segment was removePending. WebSocketSendQueue pins
fsnAtZero = oldestSeq() in both the constructor (line 247-248) and
doReconnectCycle (line 925-926), then asserts fsn == fsnAtZero +
wireSeq inside the replay visitor (line 974). The mismatch threw
"SF replay FSN drift" on the first replayed frame; the catch
triggered failConnection(non-fatal); reconnectRequested fired; the
I/O loop re-entered doReconnectCycle, called oldestSeq() again with
the same stale return, and drift fired identically. Permanent
reconnect loop until either the FS issue cleared AND a non-reconnect
trim ran (it can't — the I/O thread is stuck reconnecting), or the
user closed the sender. Fix skips removePending in oldestSeq() the
same way replay() does.
Tests:
- testOldestSeqMustSkipRemovePendingToMatchReplay (unit-level: cross-
check oldestSeq() against the first FSN replay() actually visits).
- testReplaySucceedsWithRemovePendingSegmentAtHeadOfList (end-to-end
integration: real TestWebSocketServer + sender + RemoveFailingSf
Facade; verified pre-fix to reproduce the reconnect loop with "SF
replay FSN drift: fsn=2 expected=0", post-fix the 2 unacked frames
replay successfully and a fresh send reaches the server).
3. Directory scan errors silently treated as EOF / empty log.
Files.findNext()'s contract is 1=success, 0=EOF, -1=read error.
scanDirectory's while (rc > 0) loop exited identically on both 0 and
-1, conflating a real readdir failure (EIO/ESTALE on NFS, etc.) with
normal end-of-directory. Files.findFirst()==0 means either opendir
failed (errno set — transient EACCES/EMFILE/ESTALE/ENOMEM) or the
directory is empty; scanDirectory unconditionally treated it as
"nothing to scan." By the time scanDirectory runs, openInternal has
created the directory if missing and successfully opened+locked the
lock file inside it, so an empty listing is impossible — find==0
here can only mean opendir failed. The silent fallthrough let
openInternal proceed to createActive(...) on top of any unscanned
on-disk segments, aliasing or overwriting still-existing data — the
exact failure mode a durability layer must guard against. Fix
throws SfException in both branches; recovery refuses to proceed
from a partial / unknown view of its own log.
Tests:
- testScanDirectoryFailsWhenFindFirstReturnsZero (FilesFacade
forces findFirst to return 0; pre-fix open silently succeeded
with empty segments and nextSeq=0 over real on-disk data).
- testScanDirectoryFailsWhenFindNextReturnsError (FilesFacade
forces findNext to return -1; same shape, mid-scan readdir
failure is now fatal).
Full module suite: 1994/1994 green (1988 baseline + 6 new tests).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the user-facing counterpart to QwpEgressLatencyBenchmark in the OSS repo. Measures end-to-end wall time of a single row .at()+flush() against a locally running QuestDB. Default mode is SF on, which measures user-handover latency: flush() returns when the row is durable on the local SF segment. -Dsf=false switches to the no-SF path that blocks for the full server-ACK round-trip (apples-to-apples vs egress). Pulls in JMH 1.37 as a test-scope dependency, wires the annotation processor into maven-compiler-plugin, requires jmh.core + ch.qos.logback.classic in the test module-info. The benchmark's static initializer downgrades the logback root level to WARN before any other class loads -- DEBUG-level WS / SF logging would otherwise emit one log line per flush and inflate measured latency by ~70us. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lays the foundation for a lock-free, mmap-backed alternative to the
current SegmentLog + WebSocketSendQueue + processingLock design. Today
~85% of the user thread's flush() time is spent parked in
__psynch_cvwait waiting for the I/O thread to signal completion (see
QwpIngressLatencyBenchmark async-profiler flamegraph). The cursor design
moves SF.append onto the user thread, making the cross-thread wait
unnecessary -- the user-thread append microbench now reports p50=42ns
vs ~38us in the legacy SF path on the same hardware.
What lands:
* mmap/munmap/msync ported from QuestDB OSS into client/std/Files
(both POSIX and Win32). Native rebuild required per-platform; the
darwin-aarch64 dev lib is the only one rebuilt locally.
* MmapSegment: one mmap'd file, format-compatible with the legacy
SegmentLog (same SF01 magic, 24-byte header, [crc | u32 len |
payload] frame layout). Single-producer cursor (appendCursor plain
field, publishedCursor volatile). tryAppend is pure memory + CRC.
openExisting + scanFrames recover from torn tails.
* SegmentRing: chain of MmapSegments with hot-spare swap and
ACK-driven trim. Four cursors, all single-writer (no CAS). Rotation
rebases the spare's baseSeq at promotion time to avoid the
precompute race.
* SegmentManager: JVM-wide background thread that pre-creates spares
and trims fully-acked segments. Moves the open + truncate + fsync +
rename + unlink quartet (45k samples / 100k I/O thread samples in
the legacy flamegraph) off the hot path.
* CursorSendEngine: facade bundling ring + manager with the API a
future WebSocketSendQueue rewrite will consume.
* sf_engine=legacy|cursor config option in LineSenderBuilder. Default
legacy. Selecting cursor at build time fails fast with a clear "not
yet wired" message -- the WebSocketSendQueue integration that
actually consumes CursorSendEngine is the next PR.
* CursorEngineAppendLatencyBenchmark: standalone microbench for the
user-thread append path (the floor a wired cursor engine would
inherit).
20 new tests across the cursor/ package, all green. FilesTest gains a
mmap roundtrip test.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two pre-wiring additions on the cursor side that the upcoming WebSocketSendQueue replacement will need. 1. SegmentRing.openExisting(sfDir, maxBytesPerSegment) Walks *.sfa files in the directory, opens each via MmapSegment.openExisting (which already validates header + scans torn tails), arranges by baseSeq, and returns a ring with the newest as active and the rest as sealed. Validates that the recovered segments form a contiguous FSN range -- a gap signals manual deletion or partial-write damage and aborts recovery rather than silently producing duplicate / missing FSNs after restart. Stray .sfa files with bad headers are skipped (logged- then-ignored), not fatal. 2. SegmentManager maxTotalBytes cap Manager tracks total bytes it has provisioned across all rings it serves. When provisioning a hot spare would exceed the cap, the manager skips the install and the requesting ring stays in BACKPRESSURE_NO_SPARE until ACK-driven trim frees space. Default is UNLIMITED_TOTAL_BYTES (no behavioural change for existing callers). Disk-full state is logged at WARN, throttled to once per 30s so a sustained-full state doesn't drown the log. Cap is approximate -- it counts only manager-provisioned segments, not the engine's initial active per ring (so the effective on-disk cap is maxTotalBytes + (rings * segmentSizeBytes)). Acceptable for a runaway-growth guard; documented in the constructor. Also makes SegmentRing.sealedSegments mutation thread-safe via a synchronized snapshot path that the I/O loop will use, and marks SegmentRing.active volatile so cross-thread rotation publication is correct without a lock. 10 new tests across SegmentRingTest + SegmentManagerTest covering: recovery happy path, FSN-gap detection, bad-magic skip, cap blocks provisioning, cap is released by ACK-driven trim. All 2020 tests in the suite pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CursorWebSocketSendLoop is the cursor-engine equivalent of
WebSocketSendQueue's I/O loop. Owns one I/O thread that:
* Polls CursorSendEngine.publishedFsn() and walks newly-published
frames from the engine's segments (active + sealed). Sends each
frame's payload as one WS binary frame via WebSocketClient.sendBinary
-- exactly the bytes the legacy WebSocketSendQueue would send,
minus the 8-byte SF envelope which is engine-internal.
* Polls the WebSocket for server ACKs via tryReceiveFrame. On each
successful ACK with cumulative wire seq N, calls
engine.acknowledge(fsnAtZero + N), which advances ackedFsn so the
SegmentManager can trim fully-acked sealed segments.
No locks. The producer thread (user) writes into the engine; this
thread reads. publishedFsn is the volatile publish barrier. Sealed-
segment iteration uses the synchronized snapshot accessor added in
the previous commit, so the producer's rotation can't tear the
ObjList underneath us.
PR1 scope is deliberately the happy path. Deferred (TODO PR2):
* Ping/pong heartbeat
* fsync-on-flush request channel
* Per-table seqTxn tracking
* Reconnect / replay-on-reconnect (walk segments from ackedFsn+1)
* Disk-full retry (the cap from the previous commit handles the
upstream signal; PR2 wires the producer-side recovery)
* Multi-connection failover
Errors are reported via getLastError(); the I/O thread sets it and
exits, producers polling checkError() surface the failure.
Same wireSeq-clamp safety check the legacy path uses (clamp ACK
sequence to nextWireSeq-1 so a malformed/replayed server ACK can't
force trim of segments the new server has never seen).
Companion change: CursorSendEngine.sealedSegmentsSnapshot pass-through
to SegmentRing's thread-safe snapshot accessor.
No new tests in this commit -- the integration test for the wired
end-to-end path lands with the QwpWebSocketSender wiring (next slice).
The class compiles and the 2020-test suite continues to pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ring
Wire CursorSendEngine into the public Sender API and collapse the connect-
string surface around it.
* Sender.build(): cursor is the only async ingest path. sfDir present →
store-and-forward (mmap'd, recoverable); sfDir absent → memory-only ring
(same lock-free architecture, no disk).
* Connect-string keys reshaped:
- drop store_and_forward (sf_dir is the on-switch)
- drop sf_fsync / sf_fsync_on_flush (replaced by sf_durability)
- drop sf_engine (cursor is unconditional now)
- sf_durability=memory|flush|append (today only memory works; flush/append
throw "not yet supported" until cursor learns fsync)
- size suffixes accepted on sf_max_bytes / sf_max_total_bytes (64m, 4g)
- default sf_max_bytes 4 MiB; default sf_max_total_bytes 128 MiB
(memory mode) / 10 GiB (SF mode) — bounded by default rather than the
previous unlimited foot-gun
* MmapSegment.createInMemory() — memory-backed (Unsafe.malloc) variant for
the non-SF async path; same on-the-wire layout.
* SegmentManager — when the registered ring's dir is null, provisions
memory-backed spares and skips file unlink on trim. Producer-thread
unpark of the worker (eager wakeup) cuts the post-rotation tail by
preempting the polling tick.
* CursorSendEngine.appendBlocking — bounded backpressure: deadline (default
30 s) throws LineSenderException; cumulative getTotalBackpressureStalls()
counter; throttled WARN log every 5 s of sustained backpressure. No more
silent unbounded waits.
* CursorWebSocketSendLoop.advanceSegment — replaced fixed-size sealed-list
snapshot with SegmentRing.nextSealedAfter() / firstSealed() lookups.
Fixes "sealed snapshot grew unexpectedly large" crash when the producer
outpaces the wire.
Legacy SF and async-queue paths are dead code at the test layer; their
tests are removed and the remaining src files (WebSocketSendQueue,
SegmentLog, InFlightWindow, Reconnector, SfDiskFullException, SfException)
will be deleted in a follow-up that strips QwpWebSocketSender's legacy
fields and connect overloads.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
QwpWebSocketSender now has a single send pipeline — the cursor SF engine.
The legacy WebSocketSendQueue + SegmentLog stack and the sync (window=1)
mode have no remaining callers, so the sender drops:
* connect() overloads with SegmentLog / fsyncOnFlush parameters
* setSegmentLog*, setSegmentLogFsyncOnFlush, setRequestDurableAck,
getTotalSfDiskFullStalls, getHighestAckedSeqTxn,
getHighestDurableSeqTxn, getMaxSentSymbolId, ping()
* sync-mode flushSync, syncPing, waitForAck, AckFrameHandler,
nextBatchSequence, sync*SeqTxns, sawPong/sawBinaryAck
* SF reconnect machinery (performReconnect, schemaResetNeeded,
connectionGeneration, MAX_SCHEMA_RACE_RETRIES retry loop)
* sendQueue, segmentLog, ownsSegmentLog, fsyncOnFlush fields and
inFlightWindow member
The remaining flow: ensureConnected wires up CursorWebSocketSendLoop;
flush()/close() drain through the cursor I/O thread; sealAndSwapBuffer
hands sealed buffers to engine.appendBlocking on the user thread.
The orphaned legacy source files (WebSocketSendQueue, InFlightWindow,
Reconnector, SegmentLog, SfDiskFullException, SfException) and the
sync-mode QwpWebSocketAckIntegrationTest are deleted in the same commit
since they no longer have any callers.
Net diff: ~4500 lines removed, ~125 added.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CursorSendEngine's constructor unconditionally created a fresh sf-initial.sfa at baseSeq=0 even when the SF directory contained sealed segments from a prior session, restarting the FSN sequence at 0 and overlapping with FSNs already on disk. ACK translation, trim, and replay would then operate on overlapping ranges. The recovery primitive — SegmentRing.openExisting — already exists and does the right thing (scans *.sfa, sorts by baseSeq, validates contiguity, picks the highest-baseSeq segment as the new active). The constructor just never called it. Now it does, in disk mode, before falling back to the fresh-create path on an empty dir. Adds a regression test that writes 5 frames to one engine, closes, reopens against the same dir, and asserts the next append's FSN continues at 5 instead of restarting at 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Captures the design for closing the WS sender's reliability gap that landed when we collapsed onto the cursor engine: flush()/close() no longer wait for ACKs (in either mode), and memory mode can drop data on close-then-exit. Spec covers: - flush()/close() contracts (close gets a 5s drain timeout with fast-close opt-out) - Reconnect with bounded per-outage retry budget (default 5 min) and schema-reset machinery (volatile connectionGeneration counter to close the encode-mid-reconnect race) - Slot directory model: sf_dir is the parent, sender_id picks the slot, foreground sender + opt-in background drainers for orphan recovery - Server-side dedup contract (assumed) 13 decisions locked, no open items. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…illis QwpWebSocketSender.close() previously stopped the cursor I/O loop the moment it was called: any frames already published into the engine but not yet sent (or sent but not yet ACK'd) were silently dropped on JVM exit. In memory mode that means data loss; in SF mode the next sender recovers from disk, but the durability claim of close() was weaker than the spec promised. Closes the gap with one knob from the durability spec (design/qwp-cursor-durability.md, decision #3): close_flush_timeout_millis (default 5000) > 0: close() blocks until ackedFsn >= publishedFsn or timeout 0/-1: fast close — no drain, opt-in to legacy fast-exit behavior On timeout, log WARN and proceed with shutdown. SF-mode pending data is recoverable; memory-mode pending data is not. Wired through: - LineSenderBuilder.closeFlushTimeoutMillis(long) - connect-string key close_flush_timeout_millis - new QwpWebSocketSender.connect overload that takes the timeout Tests cover all three regimes: - delayed-ACK server: close blocks ~ack delay - timeout=0: close returns immediately - silent server: close times out at the configured cap, logs WARN This is decision #3 of the spec; subsequent commits add the connectionGeneration foundation, reconnect/replay, slot dirs, and background drainers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Re-adds the volatile generation counter (and its companion retry loop in flushPendingRows) that the cursor strip had removed. This is the foundation the reconnect work (#20/#21) builds on — the producer needs a way to detect that the wire-side actor has rotated state mid-encode so it can discard now-poisoned schema-ID refs and re-encode with full schema definitions. What lands here: * QwpWebSocketSender: volatile connectionGeneration + lastSeenGeneration pair. Bumped on initial recovery from disk (the recovered FSNs were never seen by *this* server connection, so the first batch must re-publish full schemas). Reconnect path will bump in subsequent work. * flushPendingRows: encode-mid-reconnect retry loop. Sample gen before encode + after finishMessage; if it changed, discard the encoded bytes (table buffers haven't been reset yet — source rows are intact) and retry with reset schema state. Bounded at MAX_SCHEMA_RACE_RETRIES = 10 so reconnect-faster-than-encode surfaces a hard error instead of spinning. * CursorSendEngine.wasRecoveredFromDisk(): single-bit accessor the sender reads during ensureConnected to decide whether to bump. * SegmentRing.openExisting: filter out empty hot-spare leftovers (frameCount=0) from prior sessions. Those carry the provisional baseSeq=0 and would otherwise collide with the real baseSeq=0 segment and trip the contiguity check. Surfaced by the new recovery test — caught a real bug in the recovery scan. * Test hooks bumpConnectionGenerationForTest / accessors for gen and maxSent*Id so reconnect-effect tests can run without spinning up the (still-not-implemented) reconnect path. Tests cover: gen=0 for fresh connect, gen=1 after disk recovery, gen bump triggers schema-state reset on the next encode and is sticky (further flushes without bump don't re-reset). Spec decisions #4 and #5 land here. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cursor I/O loop previously treated any wire failure as terminal —
first disconnect = sender broken, every subsequent batch threw. Now,
when the sender wires a ReconnectFactory + ReconnectListener, a wire
failure triggers:
1. WARN log
2. Build a fresh WebSocketClient via the factory (same auth/TLS/host)
3. Reset wire state: nextWireSeq=0, fsnAtZero = engine.ackedFsn() + 1
4. Reposition the cursor at the first unacked FSN (replay)
5. Notify the listener → producer's connectionGeneration bumps so
the next encode emits full schema definitions, not refs the new
server has never seen
6. Outer ioLoop continues — nextWireSeq=0 starts on the new wire,
trySendOne picks up at the repositioned cursor and replays every
unacked frame, then continues with whatever the producer publishes
next
Added in main:
* CursorWebSocketSendLoop.ReconnectFactory + .ReconnectListener
interfaces (both functional, both null-able for legacy "fail-fast"
semantics)
* positionCursorAt(fsn) — walks frames inside the segment containing
fsn to find the byte offset
* SegmentRing.findSegmentContaining(fsn) + CursorSendEngine
pass-through — used by the cursor reposition
* QwpWebSocketSender extracts buildAndConnect() to use both for the
initial connect and as the reconnect factory; onWireReconnect()
is the listener that bumps connectionGeneration
This commit covers the *mechanics* (one attempt, succeed-or-fail).
The follow-up commit adds policy: exponential backoff with jitter,
per-outage time cap (reconnect_max_duration_millis, default 300s
per spec decision #2), and auth-failure detection (401/403/non-101
treated as terminal so the retry budget isn't wasted on errors that
won't fix themselves).
Two integration tests:
* testReconnectAfterServerInducedDisconnect — server ACKs then
closes; sender reconnects, second batch lands on the new wire
* testReplayResendsUnackedFramesAcrossReconnect — server receives
the first frame WITHOUT ACKing then closes; sender reconnects
and replays the unacked frame on the new connection
Spec decisions #5 (encode-mid-reconnect race) and the core of
#1/#2 (reconnect mechanics) land here.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the single-attempt reconnect with a per-outage retry budget: exponential backoff with jitter, capped duration, terminal classification for HTTP upgrade failures (401/403/426/...) so misconfig surfaces fast instead of grinding through the cap. Knobs (WebSocket only, all bypassable from connect string + builder): reconnect_max_duration_millis default 300_000 (5 min) reconnect_initial_backoff_millis default 100 reconnect_max_backoff_millis default 5_000 Auth-terminal detection walks the cause chain — the WebSocketClient's "WebSocket upgrade failed:" sentinel is wrapped at least once by the connect path, so a top-level message-only check missed it. Tests: testReconnectGivesUpAfterCap exercises the budget exhaustion via server.close() (TCP refused on every retry); testTerminalUpgradeError- AbortsReconnect uses a raw-socket fixture that 101s the first connection then 401s every subsequent one. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
sf_dir is now the *group root*, not a slot. Each sender owns <sf_dir>/<sender_id>/, taking an exclusive flock on <slot>/.lock for its lifetime. Two senders pointing at the same slot is the multi-writer footgun the model exists to prevent — their FSN sequences would interleave on disk and corrupt recovery. Detected at acquisition time; second sender fails fast with the holder's PID in the diagnostic. Knobs: sender_id (default "default") — slot identity inside the group root Allowed sender_id chars: letters, digits, _ - (verbatim dir name). SlotLock writes the holder's PID into the lock file at acquisition; a contended acquire reads it back so the error message names the offending process. flock is released by the kernel on hard process exit, so a crashed sender doesn't leave the slot wedged. Tests: - SlotLockTest: acquire creates dir + .lock, second acquire contends, close releases, distinct slots coexist. - SfFromConfigTest: sender_id creates named slot; two senders with same id collide on lock; two senders with distinct ids coexist; invalid char in sender_id rejected at parse time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small things:
1. initial_connect_retry (default false). When true, the startup connect
goes through the same backoff/cap/auth-terminal loop as in-flight
reconnect. Default off because a misconfigured host shouldn't sit
retrying for the cap on startup. Auth failures stay terminal in
either mode.
2. sf_append_deadline_millis. Was a hardcoded 30s constant; expose so
tight-SLA users can lower and offline-tolerant pipelines can raise.
3. Two new counters on the cursor I/O loop, exposed on
QwpWebSocketSender:
- getTotalReconnectAttempts() — succeeded + failed (diverges from
getTotalReconnectsSucceeded() when the server is flapping)
- getTotalFramesReplayed() — frames re-sent on the post-reconnect
catch-up window; non-zero confirms replay actually fired.
Implementation: extracted the reconnect retry-with-jitter loop into a
static CursorWebSocketSendLoop.connectWithRetry helper so ensureConnected
and the I/O loop's fail() path share verbatim semantics (auth-terminal,
backoff, jitter, throttled logs, cap). Replay counter uses a snapshot of
publishedFsn at swapClient time as a target — incremented per frame
sent, cleared once we cross the boundary. Branch is cold on the
steady-state path.
Tests: InitialConnectRetryTest covers the no-retry-fails-fast path, the
retry-succeeds-when-server-comes-up path, and the retry-gives-up-after-cap
path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous test ran 8 producer threads x 50 iterations and relied on natural contention to provoke the race between deregister and the trim block in SegmentManager.serviceRing. It flaked on Azure Windows CI: under 8-way contention the SegmentManager worker fell behind on hot-spare provisioning, and a producer hit the 1 s spare-arrival deadline at iteration 38 of 50. The test never reached the totalBytes assertion, so even when the fix is correct the test reports a generic "producer thread failed" rather than the contract being verified. Replace it with a deterministic single-ring single-tick test driven by a new package-private test seam on SegmentManager: beforeTrimSyncHook. The hook fires on the worker thread immediately before the trim block's synchronized(lock) entry. The test installs a hook that calls mgr.deregister(ring) synchronously; when the worker subsequently enters the lock, the stillRegistered re-check sees the entry removed and skips the subtract that would otherwise double-count the bytes deregister already accounted for via ring.totalSegmentBytes(). The worker is started with pollNanos=60 s so the test owns every tick boundary; ring.appendOrFsn fires the manager wakeup callback to drive spare provisioning during setup, and wakeWorker() triggers the single race tick. Test completion is observed via the worker returning to TIMED_WAITING, so there are no sleeps and no spin polls in the assertion path. Verified the test catches the bug (Observed -64, segSize=64, bytesBeforeRace=192 - exactly one segment double-subtracted) when the stillRegistered guard is removed, and passes the fix in 70 ms. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Sender connect-string parser silently ignored unknown keys, falling back to a generic "invalid parameter" error only when the value itself was malformed. sf-client.md §4.6 mandates rejection of unknown keys -- forward-compat is governed by the spec, not by silent ignore. Since the Java client is the reference implementation per §2, silent ignore would guarantee that other-language clients drift on the same connect string. The parser now throws "unknown configuration key [key=<name>]" on any unrecognised key. The lone documented exception, zone=, becomes an explicit branch that silently consumes the value, per failover.md §1.1 (zone= is egress-only effective; ingress is zone-blind and accepts the key so a single connect string works on both sides). Tests cover unknown-key rejection across http, tcp, ws, and udp schemes, including the empty-value case. One pre-existing WebSocket test was relying on the silent-ignore behaviour to drive the parser past key parsing and surface "addr is missing"; it now uses a valid key (user=) to reach the same code path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Closes five gaps against sf-client spec §20: getTotalFramesReplayed, getDroppedErrorNotifications, getTotalErrorNotificationsDelivered, getTotalBackpressureStalls, and background-drainer visibility. Counter state lands where the work happens. CursorWebSocketSendLoop gains a totalFramesReplayed AtomicLong, bumped in trySendOne on every frame sent while replayTargetFsn is set (the post-reconnect catch-up window). BackgroundDrainerPool gains totalSucceeded and totalFailed AtomicLongs, bumped in the executor task's finally block based on the drainer's terminal DrainOutcome. QwpWebSocketSender exposes seven new public accessors. Four are thin forwarders to existing dispatcher / engine / loop counters. Drainer visibility is three single-long counters rather than the structured-DTO snapshot the spec originally called for: getActiveBackgroundDrainers, getTotalBackgroundDrainersSucceeded, getTotalBackgroundDrainersFailed. The structured accessor allocated per call, exposed a lifecycle-bounded view through the same surface as the other cumulative counters, and overlapped with the existing BackgroundDrainerListener callback. The three counters cover the "did my orphans get adopted?" dashboard case; per-drainer event-time visibility stays on the listener. The .failed sentinels on disk remain the canonical giveup record across sender restarts. ReconnectTest.testReplayResendsUnackedFramesAcrossReconnect picks up a getTotalFramesReplayed >= 1 assertion after its unacked-batch replay, confirming the new counter increments through the actual reconnect path. The local design/qwp-cursor-durability.md notes are updated to match the simpler three-counter shape; the canonical sf-client.md §20 in the parent repo is updated separately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The durable-ack keepalive PING throttle previously gated only on the timestamp of the last PING. Spec sf-client.md sec. 11 requires the gate to elapse since the last sent frame OR PING: a recently sent binary frame already prompts the OSS server's flushPendingAck path, so an immediate PING after a producer batch goes idle is wasteful. Rename lastKeepalivePingNanos to lastFrameOrPingNanos and stamp it from both call sites -- after each successful client.sendBinary in trySendOne, and after sendPing in sendDurableAckKeepaliveIfDue. The reset to 0L on reconnect is preserved, so the first PING after a fresh connect still fires immediately. The !didWork gate in ioLoop is retained. Under saturating producer traffic it is now strictly redundant with the new timestamp on the send path, but it still short-circuits when tryReceiveAcks was busy this iteration, which more directly expresses "nothing else in flight" than a time delta. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The cursor send loop's connect retry path classified two upgrade-time failures incorrectly relative to docs/qwp/sf-client.md. QwpVersionMismatchException was bucketed alongside auth (401/403) as a terminal SECURITY_ERROR. Per section 13.3, a server advertising an X-QWP-Version outside the client's supported range is a per-endpoint transient: the round walk already moves to the next host within buildAndConnect, but the connect-loop wrapper short-circuited any further retry. On a multi-endpoint cluster mid rolling-upgrade, the sender stopped the moment the tracker landed on the one node that had moved past the client's max version, instead of waiting for the cluster to converge or for the per-outage budget to exhaust. The fix removes the typed catch so the exception falls through to the generic transient branch; if every endpoint mismatches for the full outage budget, the existing giveup path surfaces PROTOCOL_VIOLATION with the unacked FSN span -- the correct terminal classification per section 13.3. QwpDurableAckMismatchException was also classified as SECURITY_ERROR, which is the auth-failure category. Per section 8.1 the mismatch is a loud-fail per connection but it is a capability/config disagreement, not an auth event. It now surfaces as PROTOCOL_VIOLATION (still HALT policy) so an operator reading the error category gets an accurate signal. The sync-initial-connect path in connectWithRetry still immediately rethrows the typed exception to the constructor caller -- no SenderError is built there -- so its surface is unchanged. Update the exception's javadoc to describe the new transient-at-every- layer semantics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The connect-string parser now rejects unknown keys (commit 7b583d3), which means an older client config carrying in_flight_window=... starts throwing "unknown configuration key" instead of just being ignored. The option itself was removed because the store-and-forward mechanism replaces it, but rejecting existing connect strings is a breaking change for callers that have not yet pruned the obsolete setting. Add an explicit branch that silently consumes in_flight_window= and its value, mirroring the zone= no-op pattern. Tests cover HTTP and UDP acceptance plus the empty-value case. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The sf-client spec section 4.5 declares both keys as size type with defaults of 64K and 100M, allowing K/M/G/T suffixes like sf_max_bytes. However the connect-string parser used parseIntValue for both, so a user writing max_buf_size=100m got a confusing "invalid max_buf_size [value=100m]" error. Add a parseSizeIntValue helper that delegates to the existing parseSizeValue and bounds-checks for int range -- the setters take int, so values above Integer.MAX_VALUE (e.g. 4g) are rejected at parse time rather than silently overflowing. Switch both call sites to the helper. Existing error wording is preserved for the empty, negative, and non-numeric cases, since parseSizeValue falls through to Numbers.parseLong on the digit body and emits the same "invalid X [value=...]" message. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ThreadLocalRandom.nextLong(0) throws IllegalArgumentException. With reconnectInitialBackoffMillis = 0, the first iteration of the durable-ack retry loop in BackgroundDrainer would escape the inner try/catch (which only guards the connect block), bypass markFailed, kill the drainer thread, and leave the slot for the orphan scanner to respawn forever. Clamp the jitter bound with Math.max(1L, backoffMillis). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The QWP client test suite used a static port pattern of the form `19_500 + (int) (System.nanoTime() % 200)` to pick a base port, then added small offsets per test. With only 100-200 distinct values per file, this collides under parallel Surefire forks and across re-runs that hit TIME_WAIT. Introduce a shared TestPorts helper that hands out ephemeral ports via `new ServerSocket(0).getLocalPort()`. Migrate all sixteen affected test classes to allocate a fresh port per usage site instead of deriving offsets from a static base. Tests that previously chained offsets off port1 (e.g., `port1 + 100`) now allocate each port independently, since the base is no longer a predictable anchor. A small TOCTOU window remains between releasing the probe socket and the test rebinding, but it is orders of magnitude less likely to collide than the prior 1-in-100-to-200 modulo scheme. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Address review comment H8: spec section 6 makes pre-allocation a core invariant, but the production code only called ftruncate via openCleanRW. The resulting file was logically sized but sparse, so ENOSPC would surface as a SIGBUS on a later mmap store rather than as a clean failure at create time. Files.allocate (which wraps posix_fallocate / F_PREALLOCATE) existed as a native method but had no production caller and no entry on FilesFacade, so disk-full behavior could not be fault-injected from tests either. MmapSegment.create now routes through a FilesFacade-aware overload and calls ff.allocate(fd, sizeBytes) immediately after openCleanRW. On failure the fd is closed and the partial file is unlinked before MmapSegmentException is thrown, so repeated failures under sustained disk-full pressure no longer accumulate full-size empty segments. The existing three-arg create stays as a thin shim using FilesFacade.INSTANCE, so production callers and existing tests keep working unchanged apart from the new pre-allocation step. FilesFacade gains a boolean allocate(int, long) method and DefaultFilesFacade delegates to Files.allocate. MmapSegmentTest gets three new tests exercising the create-time failure paths: allocate returning false (clean exception, fd closed, file removed), openCleanRW returning -1 (no fd opened, so no close/remove), and 50 repeated allocate failures leaving an empty tmpDir. A nested FaultyFilesFacade helper counts calls and toggles the failure mode. Out of scope: routing SegmentManager, SlotLock, OrphanScanner, and AckWatermark through FilesFacade. They currently reach for the static Files class and bypass the facade entirely; that broader cleanup is a separate concern from the H8 invariant fixed here. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three race tests in the QWP client used Thread.sleep as a synchronization primitive and would flake on loaded CI: SegmentManagerTotalBytesRaceTest slept 200ms to wait for in-flight serviceRing iterations to drain, InitialConnectAsyncTest polled an AtomicReference with sleep(20) over a 5s budget, and CursorWebSocketSendLoopReconnectLeakTest slept 50ms server-side before closing the wire to "hope" the client read the ACK first. Apply the deterministic-seam pattern that SegmentManagerTrimDeregisterRaceTest already uses. SegmentManager gains an analogous beforeInstallSyncHook fired between the install path's MmapSegment.create and the synchronized commit block; null in production. The bytes-race test is rewritten as a single-shot race driven by the hook (one ring, one append, one tick), with awaitParked(workerThread) replacing the drain sleep. The test runs in ~5ms instead of ~1.6s; verified to fail with "expected:<0> but was:<64>" if the stillRegistered guard is removed from the install block. InitialConnectAsyncTest gets an ErrorInbox (CountDownLatch-backed SenderErrorHandler) and AckHandler.awaitFirstAck to replace the sleep-poll loops on observedError and totalAcked. The Thread.sleep(150) that waited for the I/O thread to fail an initial connect becomes a spin on getTotalReconnectAttempts >= 1. The connection-lost test drops its sleep-based retry loop entirely: the I/O loop's tryReceiveAcks polls every 50us and discovers the peer disconnect on its own, so no producer activity is needed to trigger reconnect budget exhaustion. CursorWebSocketSendLoopReconnectLeakTest replaces the server-side sleep with a deterministic barrier: the handler waits for the client's getTotalAcks() to advance past the pre-send baseline before closing the wire, guaranteeing the ACK is processed before the disconnect. The reconnect-counter sleep-poll becomes a Thread.onSpinWait loop with a 5s deadline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The test started the worker thread before register + hook installation,
which on slower CI hosts let the worker iterate first. Two failure
modes resulted, both observed on this branch:
- worker installed a hot spare before readTotalBytes, so
bytesAfterRegister was 2*segSize instead of segSize;
- worker installed a hot spare before the hook was set, so the
later appendOrFsn found needsHotSpare()==false and the install
path never re-entered, leaving hookDone.await to time out.
Move mgr.start() to after register and setBeforeInstallSyncHook. The
Thread.start() happens-before guarantees the worker's first iteration
sees both the registered ring and the hook. hotSpare is null at
construction, so needsHotSpare() is already true on the first tick;
the producer-side appendOrFsn (and its 32-byte scratch buffer) is no
longer needed and only added a worker-vs-producer trigger race.
If a user-thread caller (.at()/flush()) already caught the I/O loop's latched terminal error before close() runs, close()'s own flush / drain paths can rethrow the SAME instance. try-with-resources then calls primary.addSuppressed(self), which Throwable rejects with IllegalArgumentException: "Self-suppression not permitted". close() now snapshots the already-surfaced lastError up front and drops terminalError at the final rethrow if it matches that instance. The guard mirrors the intent of the existing hasUnsurfacedError() check at the explicit safety-net checkError() call, but extends it to cover the implicit rethrow paths inside flushPendingRows / sealAndSwapBuffer / drainOnClose. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SenderConnectionDispatcher, SenderErrorDispatcher and SenderProgressDispatcher all share a lazy-start pattern: offer() peeks at dispatcherThread off-lock and falls through to a synchronized startDispatcherIfNeeded() on the first null; the field is written under `lock` in startDispatcherIfNeeded() and close(). The off-lock read had no happens-before with the under-lock writes, so per the JMM there was a real (though benign) data race on a load-bearing publication field. The bug stayed hidden because the synchronized re-check inside startDispatcherIfNeeded() catches a stale-null read: a redundant re-entry costs at most one wasted monitor acquisition and never spawns a second thread. SenderProgressDispatcher carries one extra symptom -- offer() re-reads the field after the lazy-start gate to obtain the LockSupport.unpark target, and a stale-null there silently skips the unpark and delays delivery by one idle tick. Declaring the field volatile gives the off-lock reads happens-before with the under-lock writes, eliminates the redundant re-checks on the hot path, and matches the convention M1/M3 already pin reflectively in MemoryOrderingFindingsTest. Three new tests in that class assert the modifier so the invariant cannot regress; the existing tests were reordered alphabetically to match the project's member-ordering rule. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The totalDurableTrimAdvances counter was incremented every time drainPendingDurable popped at least one entry, regardless of whether engine.acknowledge actually advanced ackedFsn. The counter's docstring promises it tracks watermark advances and the existing test asserts trimAdvances <= durableAcks, so the placement contradicts the contract. In well-behaved server flow the values happen to coincide -- wireSeqs are monotonic from a single connection, swapClient clears the queue and realigns fsnAtZero on reconnect, so engine.acknowledge always returns true when something popped. The bug surfaces when a duplicate OK frame arrives at the same wireSeq after that wireSeq was already drained: the Math.min(wireSeq, highestSent) cap does not dedupe, the entry pops because the watermark already covers it, engine.acknowledge no-ops with fsn == ackedFsn, and the counter still bumps -- overcount. Move the increment inside the if (engine.acknowledge(fsn)) block so the counter reflects actual advances. Add testTotalDurableTrimAdvancesSkips RedundantEngineAck which delivers a duplicate OK and asserts the counter stays put and ackedFsn does not move. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The SenderProgressHandler class javadoc previously buried the DROP_AND_CONTINUE behaviour in a parenthetical inside the watermark semantics section, while the surrounding text described ackedFsn as "durable on the server side". A user who reads the docstring and wires onAcked into a durability gate -- mark outbox entries saved, release locks, delete source records, emit downstream confirmations -- silently drops data whenever the server rejects a batch under DROP_AND_CONTINUE, because the watermark advances over the rejected FSN exactly like an OK does. Add a prominent WARNING section at the top of the class javadoc that states ackedFsn is a settled watermark rather than a durable one, explains why the loop has to advance over dropped FSNs (storage trim, wire progress), names the silent-data-loss failure mode, and prescribes the required correlation pattern: register a SenderErrorHandler and exclude the [fromFsn, toFsn] range of every DROP_AND_CONTINUE error from the "durable" set derived from the watermark. Rephrase "durable" to "settled" in the watermark-semantics polling example and the "what this callback is for" section so the wording matches the new caveat. Update the onAcked method-level doc to point back at the class-level WARNING and to spell out that "settled" covers both OK frames and DROP rejections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Javadoc on Sender.fromConfig(CharSequence) carried two unusable @see entries. LineSenderBuilder#fromConfig(CharSequence) targets a private method, so the standard javadoc tool (default visibility protected+) renders it as a broken link. The adjacent @see #fromConfig(CharSequence) was a self-reference back to the very method being documented, which conveys nothing to the reader. Replace both with a single @see #builder(CharSequence). That is the public factory that internally calls the private builder.fromConfig, so it is the correct cross-reference for a user landing on Sender.fromConfig and wondering how to obtain a builder pre-populated from a config string. The remaining @see #fromEnv() entry still points the reader at the env-var convenience factory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The AtomicLong consecutiveSendErrors on CursorWebSocketSendLoop had three write sites -- field initializer, reset in swapClient on every reconnect, reset in trySendOne after every successful send -- and zero read sites. It was never incremented, even in the initial happy-path commit that introduced it; the reconnect-and-replay commit only added another reset. There was never a consumer in the tree. The sf-client spec confirms the design intent. Section 13.1 (Failure detection) routes any wire error directly into the reconnect loop with no threshold, section 13.2 / 13.6 bound the loop by a time budget rather than an error count, and section 20 (Observability) lists no consecutive-error counter among the conformant metrics. The actual code matches: trySendOne's catch on client.sendBinary goes straight to fail() / connectLoop() on the first throw with no accumulation. The field is a leftover from an early design that anticipated "N consecutive send errors before reconnecting" and was abandoned for the immediate-trigger model that both the spec and the code settled on. Remove the field and its two reset sites; the per-send reset ran on the hot path of every successful frame send. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Align the private boolean field with the existing public accessor. Before this change the field was named recoveredFromDisk while the accessor returning it was wasRecoveredFromDisk(); having identical names removes the small mental hop when reading the getter body and makes the field follow a past-tense convention that fits a one-shot construction-time state. The rename also shifts the alphabetical position of the field, so the declaration moves from between ownsManager and ring (its r position) to between slotLock and watermark (its new w position), keeping the class member ordering convention. The accessor itself is unchanged in name and position; all call sites in tests already use the public getter, so this edit touches only the declaration, the constructor assignment, and the getter's return expression. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Files_allocate on Windows previously only called SetFileInformationByHandle with FileEndOfFileInfo, which on NTFS sets the logical file size but leaves clusters unallocated. The file stays sparse; clusters get allocated lazily as writes occur. If the disk fills between segment create and an mmap'd store, the cache manager raises an in-page exception on the writing thread when it flushes a mapped page -- a SIGBUS-class failure that tears down the JVM. This violates sf-client.md section 6 (lines 340-352), which mandates that ENOSPC must surface as a clean create-time failure, exactly the way posix_fallocate on Linux and F_PREALLOCATE/F_ALLOCATEALL on macOS already deliver. Switch to FILE_ALLOCATION_INFO via SetFileInformationByHandle. On NTFS this physically reserves clusters synchronously and the call returns ERROR_DISK_FULL when free space is insufficient, matching the posix_fallocate contract. Same primitive RocksDB uses for its Windows preallocation path. Match POSIX semantics by rounding the request up to the existing logical size so the call can never shrink a file the caller already extended. Extend the logical EOF in a second SetFileInformationByHandle call only when growing. Update the Javadocs on Files.allocate() and FilesFacade.allocate() to document the Windows path and the never-shrinks behaviour alongside the existing Linux/macOS notes. The prebuilt windows-x86-64/libquestdb.dll under core/src/main/resources/io/questdb/client/bin/ still needs to be rebuilt and recommitted before this change takes effect at runtime. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
[PR Coverage check]😍 pass : 2833 / 3529 (80.28%) file detail
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TODO before merge
windows-x86-64/libquestdb.dllon a Windows host and commit it undercore/src/main/resources/io/questdb/client/bin/windows-x86-64/. The native source change toFiles_allocate(FILE_ALLOCATION_INFO path) has no runtime effect until the prebuilt DLL is refreshed.allocate()now returnsfalsewithERROR_DISK_FULLinstead of letting the JVM die on mmap-store with an in-page exception. Confirms the spec's create-time ENOSPC contract (§6) on NTFS.Summary
Opt-in store-and-forward (SF) durability for the QWP WebSocket ingest client. Outgoing batches are persisted to disk before they leave the wire; the server's cumulative ACK trims sealed segments; on transient failure or process restart the I/O thread silently reconnects and replays whatever is still on disk. User code does not see transient disconnects.
Delivery semantics
SF preserves at-least-once delivery across all failure modes (transient wire errors, server restart, sender process restart, host crash). The persisted
<slot>/.ack-watermark(see Architecture) carries the previous sender's durable-ack high-water mark across the SIGKILL/crash boundary, so the new sender's recovery seedsackedFsn = max(lowestSurvivingBaseSeq - 1, watermark)and replays only frames the previous sender hadn't yet been told were durable. The window for incidental re-replay shrinks to whatever durable acks arrived between the manager's last tick and the kill (~one tick = milliseconds).For strict at-most-once, the target table should declare
DEDUP UPSERT KEYS(...)covering the row identity. DEDUP absorbs the tick-window residue and any acks that landed after the manager's last write but before the kernel flushed the mmap page. The parent repo's e2e suite usesDEDUP UPSERT KEYS(timestamp, v)for exactly this defense-in-depth.Architecture
CursorSendEngine— central engine. The producer (user thread) appends encoded QWP frames into mmap'd ring segments; an I/O thread (CursorWebSocketSendLoop) walks the cursor and sends frames; ACKs trim sealed segments. FSN (Frame Sequence Number) is the monotonic-across-restarts identity of each frame.MmapSegment— fixed-size mmap'd file. A 24-byte header (magic 'SF01' | u8 ver | u8 flags | u16 reserved | u64 baseSeq | u64 createdMicros) is followed by frame envelopes of[u32 crc32c | u32 payloadLen | payloadLen bytes]. Recovery detects torn tails and silent bit-rot and truncates the active segment to the last good frame. AmemoryBackedmode (no file, malloc'd buffer) backs memory-only async ingest whensf_diris omitted — same cursor architecture, no durability across restarts.SegmentRing— append/seal/rotate ring of segments. Files are namedsf-<gen:016x>.sfawheregenis a JVM-wide monotonic generation counter (not a baseSeq — spares are minted before the producer knows the eventual baseSeq). Recovery enumerates.sfafiles, readsbaseSeq+frameCountfrom each header, sorts bybaseSeq, and asserts FSN contiguity.SegmentManager— provisions hot spares ahead of the producer, enforces the per-enginesf_max_total_bytescap, and unmaps trimmed segments.SlotLock— advisory exclusive lock on<sf_dir>/<sender_id>/.lock. Released onengine.close()or kernel-on-process-exit.AckWatermark— mmap'd 16-byte file at<sf_dir>/<sender_id>/.ack-watermark. Single monotonic FSN, written by the manager on each tick whereackedFsnadvanced. Hot-path write is one 8-byte alignedputLongagainst the mapped region — no alloc, no syscall, no fsync (host-crash falls back to the segment-derived seed, no regression). Read once at engine startup to seed recovery.OrphanScanner+BackgroundDrainerPool— opt-in (drain_orphans=true) at foreground sender startup: scan<sf_dir>/*/for sibling slots that are unlocked and contain unacked segments, lock each one, drain it on its own connection, release. Capped atmax_background_drainers=4. Failed drains drop a.failedsentinel so future scans skip the slot until the user clears it.Wire/disk decoupling
Wire
messageSequencealways starts at 0 each connect; SF's persistent FSN is monotonic across restarts.fsnAtZerois pinned at connect time so server ACKs translate back to FSN for trim. Invariant:fsn = fsnAtZero + wireSeq. Cursor frames are self-sufficient — every frame carries full schema and full symbol-dict delta from id 0, so wire-level replay against any fresh server connection (post-reconnect, post-restart, drainer adopting an orphan slot) is correct. Row-level uniqueness across replay requires DEDUP on the target table; see Delivery semantics.Reconnect & close
reconnect_max_backoff_millis. Per-outage time budgetreconnect_max_duration_millis(default 5 min). Auth failures (QwpAuthFailedException,QwpVersionMismatchException,QwpDurableAckMismatchException, non-101 upgrade reject without role hint) are immediately terminal.421with anX-QuestDB-Roleheader (role-rejected upgrade) is treated as failover input: backoff is reset and the next configured endpoint is tried.addr=accepts a comma-separated list; the I/O loop rotates through endpoints on every reconnect attempt within a single outage budget.initial_connect_retry=sync(aliaseson/true) retries on the user thread up to the reconnect cap.initial_connect_retry=asyncreturns theSenderimmediately and lets the I/O thread retry in the background, surfacing terminal failures via the error inbox.flush()returns once data is published into the engine (in-RAM for memory mode, on-disk for SF) — it never waits for server ACK.close()blocks up toclose_flush_timeout_millis(default 5 s) waiting forackedFsn >= publishedFsn;0or-1skips the drain.STATUS_DURABLE_ACKframes, it emits keepalive PINGs everydurable_ack_keepalive_interval_millis(default 200 ms;0or negative disables). Workaround for a current OSS-server limitation: the server only flushes pending durable-ack frames on inbound recv events, so an idle opted-in client has to prod it. The keepalive can be retired once the server pushes durable acks asynchronously.Backpressure
Per-engine
sf_max_total_bytescap. When full,appendBlockingspins forsf_append_deadline_millis(default 30 s); ACK arrival → trim → space frees → append succeeds. If the deadline fires the call throws.Tradeoffs
flush()no longer waits for server ACK. Both memory mode and SF mode run the new cursor architecture.flush()returns once data is published into the engine (RAM in memory mode, disk in SF). Apps that previously relied onflush()as a server-side commit point need to switch torequest_durable_ack=on+Sender.awaitAckedFsn(...), or accept the asynchronous semantics. This is a behavior change for existing users.DEDUP UPSERT KEYS(...)covering row identity, duplicates appear. See Delivery semantics.sf_max_total_bytesreached, drain stalled),appendBlockingspins up tosf_append_deadline_millis(default 30 s) before throwing. Latency-sensitive callers should tune this down or pre-check engine pressure via stats; the alternative — dropping data — would violate the durability contract.sf_durability=memoryis the only shipping mode. Data on disk survives a process SIGKILL (verified by the parent-repo e2e tests) but not OS crash or power loss without higher-layer guarantees.=flush/=appendare deferred..sfammaps + fds open for the engine's lifetime. Memory mode trades disk for RAM (default 128 MiB cap). High-fan-out deployments should sizeulimit -n, total mmap budget, andsf_max_total_bytes * <slot count>against host capacity..ack-watermarkalready records which prefix is safe. With slow WAL upload and bursty traffic, disk usage can climb towardsf_max_total_bytesand trigger backpressure earlier than the cumulative byte count would suggest. Smallersf_max_bytesmitigates by giving the durable-ack watermark finer trim granularity. See Known follow-ups for the disk-reclaim work.sf_dir/sender_id) per producer thread.Connect-string knobs
<size>values (sf_max_bytes,sf_max_total_bytes) accept JVM-style unit suffixes: bare digits are bytes (4096), or usek/m/g/twith an optional trailingb(4k,4kb,4m,4mb,4g,10gb,1t). Case-insensitive, powers of 2 (1024-based), matches-Xmxconventions.All file I/O goes through a new native Files layer (POSIX + Win32) and a software CRC32C (Castagnoli, slice-by-8) — no
java.nio.FileChannel,java.util.zip.CRC32C, orMappedByteBuffer.Test plan
FilesTest): write/read roundtrip, truncate, allocate, append, rename, dir iteration, exclusive lock, exists/remove, page size.Crc32cTest): known vector, chaining, property fuzz over random inputs and split points, single-bit-flip detection.MmapSegmentTest,SegmentRingTest,SegmentRingRecoveryUnlinkTest,SegmentManagerTest,SegmentManagerRecoveryCapTest,SegmentManagerCloseRaceTest,SegmentManagerTrimDeregisterRaceTest,SegmentManagerTotalBytesRaceTest): append/replay, multi-segment rotation, trim, torn-tail recovery, CRC mismatch, oversized length defense, FSN-gap detection, disk-cap enforcement, close/trim races.SlotLockTest,EngineCloseSlotLockReleaseTest,OrphanScannerTest,OrphanScanIntegrationTest,EmptyOrphanSlotChurnTest,AckWatermarkTest): lock collision,.failedsentinel, scanner skip-rules, end-to-end orphan drain, empty-slot churn, ack-watermark roundtrip + cross-session persistence + corrupt-file fallback.CursorSendEngineTest,CursorWebSocketSendLoopCloseTest,CursorWebSocketSendLoopReconnectLeakTest,CursorWebSocketSendLoopErrorClassificationTest,CursorWebSocketSendLoopErrorLatchTest,CursorWebSocketSendLoopDurableAckTest,CursorWebSocketSendLoopDurableAckFuzzTest,BackgroundDrainerPoolRaceTest,BackgroundDrainerPoolListenerTest,BackgroundDrainerDurableAckRetryTest,SenderConnectionDispatcherTest,SenderErrorDispatcherTest,SenderProgressDispatcherTest,DefaultSenderErrorHandlerTest,MemoryOrderingFindingsTest,PrReviewRedTests).SfFromConfigTest): all knobs, validation rejections, disk-full backpressure, sender takes ownership of engine.ReconnectTest,RecoveryReplayTest,ServerErrorAckTerminalTest,InitialConnectRetryTest,IoThreadErrorSurfacedOnRowApiTest,CloseDrainTest,CleanShutdownNoReplayTest,SelfSufficientFramesTest,BackgroundDrainerEndToEndTest,DurableAckIntegrationTest): reconnect cap, terminal upgrade error, replay-on-connect, close-drain timeout, fast-close, recovery replay against fresh server.CursorEngineAppendLatencyBenchmark.questdb-ent/e2e/tests/test_failover.py): server SIGKILL with sender failover (test_kill9_primary_failover_no_data_loss,test_failover_during_active_send,test_two_failovers_in_one_scenario), orphan drainer (test_orphan_drainer_durable_ack_survives_kill), single-cycle sender SIGKILL + on-disk recovery against a still-alive primary with DEDUP on the target (test_sender_kill9_sf_recovery_replays), multi-cycle SIGKILL torture (test_sender_repeated_sigkill_no_state_corruption) -- 6 consecutive kill/restart cycles with varying settle times against the same slot, dense oracle over the union of all batches, and partial-ack-segment recovery with DEDUP collapse (test_partial_ack_sealed_segment_replay_dedup_collapses) -- pins the segment-granular re-replay contract. Plustest_failover_fuzz.pyrandomised torture across batch sizes and kill timings.TestUtils.assertMemoryLeak()so leaks surface in CI. Pure-Java dispatcher and error-classification tests skip the wrapper because they don't allocate native memory.Known follow-ups (deliberately deferred)
sf_durability=flush/=append(per-flush / per-append fsync). Cursor accepts onlymemorytoday.fallocate(FALLOC_FL_PUNCH_HOLE)on Linux (no Windows equivalent), or a<segment>.ack-offsetsidecar consulted by the trim path. Reduces effective disk consumption under slow durable-ack cadence; does not affect correctness. See Tradeoffs -- "Segment-granular trim is conservative on disk".Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com