diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegment.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegment.java index 2eea695a..6584c085 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegment.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegment.java @@ -144,13 +144,30 @@ public static MmapSegment create(String path, long baseSeq, long sizeBytes) { * that filesystem only. */ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long sizeBytes) { + long pathPtr = ff.allocNativePath(path); + try { + return create(ff, pathPtr, path, baseSeq, sizeBytes); + } finally { + ff.freeNativePath(pathPtr); + } + } + + /** + * Variant of {@link #create(FilesFacade, String, long, long)} that takes a + * pre-encoded native UTF-8 path pointer plus a parallel String for use in + * exception messages and {@link #path()}. The pointer must be a + * null-terminated UTF-8 path, typically built into a reused + * {@code DirectUtf8Sink} by the rotation hot path so it does not incur a + * per-call {@code byte[]} + native-malloc the way the String overload does. + */ + public static MmapSegment create(FilesFacade ff, long pathPtr, String displayPath, long baseSeq, long sizeBytes) { if (sizeBytes < HEADER_SIZE + FRAME_HEADER_SIZE + 1) { throw new IllegalArgumentException( "sizeBytes too small for header + one minimal frame: " + sizeBytes); } - int fd = ff.openCleanRW(path, sizeBytes); + int fd = ff.openCleanRW(pathPtr, sizeBytes); if (fd < 0) { - throw new MmapSegmentException("openCleanRW failed for " + path); + throw new MmapSegmentException("openCleanRW failed for " + displayPath); } // Reserve real disk blocks so ENOSPC surfaces here, before the // producer thread starts writing frames into the mapping. The @@ -165,15 +182,14 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long // empty file does not survive the failure. Under sustained // disk-full pressure with the manager polling, hundreds would // otherwise accumulate. - //noinspection ResultOfMethodCallIgnored - ff.remove(path); - throw new MmapSegmentException("pre-allocation failed for " + path); + ff.remove(pathPtr); + throw new MmapSegmentException("pre-allocation failed for " + displayPath); } long addr = Files.FAILED_MMAP_ADDRESS; try { addr = Files.mmap(fd, sizeBytes, 0, Files.MAP_RW, MemoryTag.MMAP_DEFAULT); if (addr == Files.FAILED_MMAP_ADDRESS) { - throw new MmapSegmentException("mmap failed for " + path); + throw new MmapSegmentException("mmap failed for " + displayPath); } // Header goes straight into the mapping — no separate write syscall. Unsafe.getUnsafe().putInt(addr, FILE_MAGIC); @@ -182,7 +198,7 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long Unsafe.getUnsafe().putShort(addr + 6, (short) 0); // reserved Unsafe.getUnsafe().putLong(addr + 8, baseSeq); Unsafe.getUnsafe().putLong(addr + 16, Os.currentTimeMicros()); - return new MmapSegment(path, fd, addr, sizeBytes, baseSeq, HEADER_SIZE, 0, false, 0L); + return new MmapSegment(displayPath, fd, addr, sizeBytes, baseSeq, HEADER_SIZE, 0, false, 0L); } catch (Throwable t) { if (addr != Files.FAILED_MMAP_ADDRESS) { Files.munmap(addr, sizeBytes, MemoryTag.MMAP_DEFAULT); @@ -191,8 +207,7 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long // mmap (or header writes) failed after a successful allocate — // best-effort unlink to keep the directory from accumulating // full-size empty segments under repeated failures. - //noinspection ResultOfMethodCallIgnored - ff.remove(path); + ff.remove(pathPtr); throw t; } } @@ -419,7 +434,11 @@ public long tryAppend(long payloadAddr, int payloadLen) { } Unsafe.getUnsafe().putInt(mmapAddress + offset, crc); appendCursor = offset + total; - frameCount++; + // Plain read + write of the volatile field. `frameCount++` would + // trip the "non-atomic increment of volatile" inspection, but + // single-writer invariant (only the producer thread mutates) makes + // the RMW race-free by design. + frameCount = frameCount + 1; // Publish last. Until this volatile write retires, the consumer // cannot see any of the bytes we just wrote. publishedCursor = appendCursor; diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java index aa22f598..82b81cbf 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java @@ -25,10 +25,11 @@ package io.questdb.client.cutlass.qwp.client.sf.cursor; import io.questdb.client.std.Files; +import io.questdb.client.std.FilesFacade; import io.questdb.client.std.Numbers; import io.questdb.client.std.ObjList; import io.questdb.client.std.QuietCloseable; -import io.questdb.client.std.str.StringSink; +import io.questdb.client.std.str.DirectUtf8Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +71,13 @@ public final class SegmentManager implements QuietCloseable { private final Object lock = new Object(); private final long maxTotalBytes; // Reused by the manager worker thread to build spare-segment paths - // without per-rotation String.format / concat allocation. - private final StringSink pathSink = new StringSink(); + // directly into native memory. Each rotation writes the path bytes plus + // a trailing NUL terminator into the same buffer, and passes the + // pointer to the long-ptr Files / MmapSegment overloads -- eliminating + // the byte[] + native malloc pair that Files.pathPtr(String) would + // otherwise allocate per call. Sized for typical SF directory paths; + // grows on demand if a longer path is registered. Closed in close(). + private final DirectUtf8Sink pathScratch = new DirectUtf8Sink(256); private final long pollNanos; // Reused by the worker thread each tick to snapshot `rings` under the // lock without per-tick allocation. Owned exclusively by workerLoop(). @@ -160,6 +166,10 @@ public synchronized void close() { } workerThread = null; } + // Free the rotation-path native scratch buffer. Safe to do here + // (after the worker has joined) since the buffer is only touched + // on the worker thread. + pathScratch.close(); } /** @@ -307,13 +317,27 @@ private static long scanMaxGeneration(String dir) { * provisional at create time (SegmentRing.appendOrFsn rebases it at * rotation). Pattern: {@code /sf-.sfa}. Recovery * discovers segments by extension + header magic, not by filename. + *

+ * Builds the path bytes directly into {@link #pathScratch} and writes + * a trailing NUL so the same buffer can be handed to the long-ptr + * {@link FilesFacade} overloads (no per-rotation byte[] + native + * malloc). The String returned is the same path without the NUL -- + * captured before terminating so it is suitable for {@link MmapSegment#path()} + * and exception messages. */ private String nextSparePath(String dir) { - pathSink.clear(); - pathSink.put(dir).put("/sf-"); - Numbers.appendHex(pathSink, fileGeneration.getAndIncrement(), true); - pathSink.put(".sfa"); - return pathSink.toString(); + pathScratch.clear(); + pathScratch.putAscii(dir).putAscii("/sf-"); + Numbers.appendHex(pathScratch, fileGeneration.getAndIncrement(), true); + pathScratch.putAscii(".sfa"); + String displayPath = pathScratch.toString(); + // Trailing NUL must be appended AFTER toString() captures the path + // text -- DirectUtf8Sink.toString reads the full sink contents and + // would otherwise include the terminator in the result. Use putAny + // so the assertion in DirectUtf8Sink.put(byte) does not trip on a + // non-negative byte. + pathScratch.putAny((byte) 0); + return displayPath; } private void serviceRing(RingEntry e) { @@ -355,7 +379,14 @@ private void serviceRing(RingEntry e) { spare = MmapSegment.createInMemory(e.ring.nextSeqHint(), segmentSizeBytes); } else { path = nextSparePath(e.dir); - spare = MmapSegment.create(path, e.ring.nextSeqHint(), segmentSizeBytes); + // Native path bytes (NUL-terminated) live in pathScratch + // from the call above. Hand them straight to MmapSegment.create + // via its long-ptr overload, bypassing the byte[] + native + // malloc that the String overload would incur on every + // rotation. + spare = MmapSegment.create(FilesFacade.INSTANCE, + pathScratch.ptr(), path, + e.ring.nextSeqHint(), segmentSizeBytes); } Runnable installHook = beforeInstallSyncHook; if (installHook != null) { diff --git a/core/src/main/java/io/questdb/client/std/DefaultFilesFacade.java b/core/src/main/java/io/questdb/client/std/DefaultFilesFacade.java index 8172a95e..e8b06075 100644 --- a/core/src/main/java/io/questdb/client/std/DefaultFilesFacade.java +++ b/core/src/main/java/io/questdb/client/std/DefaultFilesFacade.java @@ -106,11 +106,26 @@ public int openCleanRW(String path, long size) { return Files.openCleanRW(path, size); } + @Override + public int openCleanRW(long pathPtr, long size) { + return Files.openCleanRW(pathPtr, size); + } + @Override public int openRW(String path) { return Files.openRW(path); } + @Override + public int openRW(long pathPtr) { + return Files.openRW(pathPtr); + } + + @Override + public long length(long pathPtr) { + return Files.length(pathPtr); + } + @Override public long read(int fd, long addr, long len, long offset) { return Files.read(fd, addr, len, offset); diff --git a/core/src/main/java/io/questdb/client/std/Files.java b/core/src/main/java/io/questdb/client/std/Files.java index 7735fb40..681d68b5 100644 --- a/core/src/main/java/io/questdb/client/std/Files.java +++ b/core/src/main/java/io/questdb/client/std/Files.java @@ -143,6 +143,17 @@ public static int openRW(String path) { } } + /** + * Variant of {@link #openRW(String)} that takes a pre-encoded native UTF-8 + * path pointer (from {@link #allocNativePath(String)} or a reused + * {@code DirectUtf8Sink}). Lets callers in hot paths avoid the {@code + * byte[]} + native-malloc allocations that {@link #pathPtr(String)} incurs + * on every call. The pointer must reference a null-terminated UTF-8 string. + */ + public static int openRW(long pathPtr) { + return openRW0(pathPtr); + } + /** * Opens {@code path} for append-only writes, creating it (mode 0644) if * absent. Every {@link #append(int, long, long)} writes at end-of-file @@ -174,6 +185,16 @@ public static int openCleanRW(String path, long size) { } } + /** + * Variant of {@link #openCleanRW(String, long)} taking a pre-encoded + * native UTF-8 path pointer; lets callers cache the encoded path and + * skip the per-call {@code byte[]} + native-malloc that + * {@link #pathPtr(String)} incurs. + */ + public static int openCleanRW(long pathPtr, long size) { + return openCleanRW0(pathPtr, size); + } + /** * Returns the on-disk size of {@code path} via {@code stat}, or -1 if * the path does not exist or is otherwise unreadable. @@ -187,6 +208,14 @@ public static long length(String path) { } } + /** + * Variant of {@link #length(String)} taking a pre-encoded native UTF-8 + * path pointer; same allocation-elision rationale as {@link #openRW(long)}. + */ + public static long length(long pathPtr) { + return length0(pathPtr); + } + /** * Creates a directory at {@code path} with the given mode (POSIX-style * permission bits, e.g. {@link #DIR_MODE_DEFAULT}). Returns 0 on success, diff --git a/core/src/main/java/io/questdb/client/std/FilesFacade.java b/core/src/main/java/io/questdb/client/std/FilesFacade.java index 2aea8e5a..b64a9f7d 100644 --- a/core/src/main/java/io/questdb/client/std/FilesFacade.java +++ b/core/src/main/java/io/questdb/client/std/FilesFacade.java @@ -89,8 +89,25 @@ public interface FilesFacade { int openCleanRW(String path, long size); + /** + * Variant of {@link #openCleanRW(String, long)} taking a pre-encoded + * native UTF-8 path pointer; lets callers in hot paths cache the encoded + * path (e.g. via a reused {@code DirectUtf8Sink}) and skip the per-call + * {@code byte[]} + native-malloc allocation. + */ + int openCleanRW(long pathPtr, long size); + int openRW(String path); + /** Variant of {@link #openRW(String)} taking a pre-encoded native UTF-8 path pointer. */ + int openRW(long pathPtr); + + /** + * Variant of {@code length(String)} taking a pre-encoded native UTF-8 path + * pointer; same allocation-elision rationale as {@link #openRW(long)}. + */ + long length(long pathPtr); + long read(int fd, long addr, long len, long offset); boolean remove(String path); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/MmapSegmentTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/MmapSegmentTest.java index 8cbed98c..34be6745 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/MmapSegmentTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/MmapSegmentTest.java @@ -144,7 +144,7 @@ public void testCreateFailsCleanlyWhenAllocateReturnsFalse() throws Exception { FaultyFilesFacade ff = new FaultyFilesFacade(); ff.failOnAllocate = true; try { - MmapSegment.create(ff, path, 0L, sizeBytes); + MmapSegment.create(ff, path, 0L, sizeBytes).close(); fail("expected MmapSegmentException from failed pre-allocation"); } catch (MmapSegmentException expected) { assertTrue(expected.getMessage(), @@ -168,7 +168,7 @@ public void testCreateFailsCleanlyWhenOpenCleanRWReturnsMinusOne() throws Except FaultyFilesFacade ff = new FaultyFilesFacade(); ff.failOnOpenCleanRW = true; try { - MmapSegment.create(ff, path, 0L, sizeBytes); + MmapSegment.create(ff, path, 0L, sizeBytes).close(); fail("expected MmapSegmentException from openCleanRW returning -1"); } catch (MmapSegmentException expected) { assertTrue(expected.getMessage(), @@ -196,7 +196,7 @@ public void testCreateRepeatedAllocateFailuresDoNotAccumulateOrphans() throws Ex int attempts = 50; for (int i = 0; i < attempts; i++) { try { - MmapSegment.create(ff, tmpDir + "/seg-" + i + ".sfa", 0L, sizeBytes); + MmapSegment.create(ff, tmpDir + "/seg-" + i + ".sfa", 0L, sizeBytes).close(); fail("expected MmapSegmentException on iteration " + i); } catch (MmapSegmentException ignored) { // expected @@ -604,11 +604,30 @@ public int openCleanRW(String path, long size) { return INSTANCE.openCleanRW(path, size); } + @Override + public int openCleanRW(long pathPtr, long size) { + openCleanRWCalls++; + if (failOnOpenCleanRW) { + return -1; + } + return INSTANCE.openCleanRW(pathPtr, size); + } + @Override public int openRW(String path) { return INSTANCE.openRW(path); } + @Override + public int openRW(long pathPtr) { + return INSTANCE.openRW(pathPtr); + } + + @Override + public long length(long pathPtr) { + return INSTANCE.length(pathPtr); + } + @Override public long read(int fd, long addr, long len, long offset) { return INSTANCE.read(fd, addr, len, offset); @@ -622,6 +641,7 @@ public boolean remove(String path) { @Override public boolean remove(long pathPtr) { + removeCalls++; return INSTANCE.remove(pathPtr); }