Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,30 @@ public static MmapSegment create(String path, long baseSeq, long sizeBytes) {
* that filesystem only.
*/
public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long sizeBytes) {
long pathPtr = ff.allocNativePath(path);
try {
return create(ff, pathPtr, path, baseSeq, sizeBytes);
} finally {
ff.freeNativePath(pathPtr);
}
}

/**
* Variant of {@link #create(FilesFacade, String, long, long)} that takes a
* pre-encoded native UTF-8 path pointer plus a parallel String for use in
* exception messages and {@link #path()}. The pointer must be a
* null-terminated UTF-8 path, typically built into a reused
* {@code DirectUtf8Sink} by the rotation hot path so it does not incur a
* per-call {@code byte[]} + native-malloc the way the String overload does.
*/
public static MmapSegment create(FilesFacade ff, long pathPtr, String displayPath, long baseSeq, long sizeBytes) {
if (sizeBytes < HEADER_SIZE + FRAME_HEADER_SIZE + 1) {
throw new IllegalArgumentException(
"sizeBytes too small for header + one minimal frame: " + sizeBytes);
}
int fd = ff.openCleanRW(path, sizeBytes);
int fd = ff.openCleanRW(pathPtr, sizeBytes);
if (fd < 0) {
throw new MmapSegmentException("openCleanRW failed for " + path);
throw new MmapSegmentException("openCleanRW failed for " + displayPath);
}
// Reserve real disk blocks so ENOSPC surfaces here, before the
// producer thread starts writing frames into the mapping. The
Expand All @@ -165,15 +182,14 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long
// empty file does not survive the failure. Under sustained
// disk-full pressure with the manager polling, hundreds would
// otherwise accumulate.
//noinspection ResultOfMethodCallIgnored
ff.remove(path);
throw new MmapSegmentException("pre-allocation failed for " + path);
ff.remove(pathPtr);
throw new MmapSegmentException("pre-allocation failed for " + displayPath);
}
long addr = Files.FAILED_MMAP_ADDRESS;
try {
addr = Files.mmap(fd, sizeBytes, 0, Files.MAP_RW, MemoryTag.MMAP_DEFAULT);
if (addr == Files.FAILED_MMAP_ADDRESS) {
throw new MmapSegmentException("mmap failed for " + path);
throw new MmapSegmentException("mmap failed for " + displayPath);
}
// Header goes straight into the mapping — no separate write syscall.
Unsafe.getUnsafe().putInt(addr, FILE_MAGIC);
Expand All @@ -182,7 +198,7 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long
Unsafe.getUnsafe().putShort(addr + 6, (short) 0); // reserved
Unsafe.getUnsafe().putLong(addr + 8, baseSeq);
Unsafe.getUnsafe().putLong(addr + 16, Os.currentTimeMicros());
return new MmapSegment(path, fd, addr, sizeBytes, baseSeq, HEADER_SIZE, 0, false, 0L);
return new MmapSegment(displayPath, fd, addr, sizeBytes, baseSeq, HEADER_SIZE, 0, false, 0L);
} catch (Throwable t) {
if (addr != Files.FAILED_MMAP_ADDRESS) {
Files.munmap(addr, sizeBytes, MemoryTag.MMAP_DEFAULT);
Expand All @@ -191,8 +207,7 @@ public static MmapSegment create(FilesFacade ff, String path, long baseSeq, long
// mmap (or header writes) failed after a successful allocate —
// best-effort unlink to keep the directory from accumulating
// full-size empty segments under repeated failures.
//noinspection ResultOfMethodCallIgnored
ff.remove(path);
ff.remove(pathPtr);
throw t;
}
}
Expand Down Expand Up @@ -419,7 +434,11 @@ public long tryAppend(long payloadAddr, int payloadLen) {
}
Unsafe.getUnsafe().putInt(mmapAddress + offset, crc);
appendCursor = offset + total;
frameCount++;
// Plain read + write of the volatile field. `frameCount++` would
// trip the "non-atomic increment of volatile" inspection, but
// single-writer invariant (only the producer thread mutates) makes
// the RMW race-free by design.
frameCount = frameCount + 1;
// Publish last. Until this volatile write retires, the consumer
// cannot see any of the bytes we just wrote.
publishedCursor = appendCursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
package io.questdb.client.cutlass.qwp.client.sf.cursor;

import io.questdb.client.std.Files;
import io.questdb.client.std.FilesFacade;
import io.questdb.client.std.Numbers;
import io.questdb.client.std.ObjList;
import io.questdb.client.std.QuietCloseable;
import io.questdb.client.std.str.StringSink;
import io.questdb.client.std.str.DirectUtf8Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,8 +71,13 @@ public final class SegmentManager implements QuietCloseable {
private final Object lock = new Object();
private final long maxTotalBytes;
// Reused by the manager worker thread to build spare-segment paths
// without per-rotation String.format / concat allocation.
private final StringSink pathSink = new StringSink();
// directly into native memory. Each rotation writes the path bytes plus
// a trailing NUL terminator into the same buffer, and passes the
// pointer to the long-ptr Files / MmapSegment overloads -- eliminating
// the byte[] + native malloc pair that Files.pathPtr(String) would
// otherwise allocate per call. Sized for typical SF directory paths;
// grows on demand if a longer path is registered. Closed in close().
private final DirectUtf8Sink pathScratch = new DirectUtf8Sink(256);
private final long pollNanos;
// Reused by the worker thread each tick to snapshot `rings` under the
// lock without per-tick allocation. Owned exclusively by workerLoop().
Expand Down Expand Up @@ -160,6 +166,10 @@ public synchronized void close() {
}
workerThread = null;
}
// Free the rotation-path native scratch buffer. Safe to do here
// (after the worker has joined) since the buffer is only touched
// on the worker thread.
pathScratch.close();
}

/**
Expand Down Expand Up @@ -307,13 +317,27 @@ private static long scanMaxGeneration(String dir) {
* provisional at create time (SegmentRing.appendOrFsn rebases it at
* rotation). Pattern: {@code <dir>/sf-<gen:016x>.sfa}. Recovery
* discovers segments by extension + header magic, not by filename.
* <p>
* Builds the path bytes directly into {@link #pathScratch} and writes
* a trailing NUL so the same buffer can be handed to the long-ptr
* {@link FilesFacade} overloads (no per-rotation byte[] + native
* malloc). The String returned is the same path without the NUL --
* captured before terminating so it is suitable for {@link MmapSegment#path()}
* and exception messages.
*/
private String nextSparePath(String dir) {
pathSink.clear();
pathSink.put(dir).put("/sf-");
Numbers.appendHex(pathSink, fileGeneration.getAndIncrement(), true);
pathSink.put(".sfa");
return pathSink.toString();
pathScratch.clear();
pathScratch.putAscii(dir).putAscii("/sf-");
Numbers.appendHex(pathScratch, fileGeneration.getAndIncrement(), true);
pathScratch.putAscii(".sfa");
String displayPath = pathScratch.toString();
// Trailing NUL must be appended AFTER toString() captures the path
// text -- DirectUtf8Sink.toString reads the full sink contents and
// would otherwise include the terminator in the result. Use putAny
// so the assertion in DirectUtf8Sink.put(byte) does not trip on a
// non-negative byte.
pathScratch.putAny((byte) 0);
return displayPath;
}

private void serviceRing(RingEntry e) {
Expand Down Expand Up @@ -355,7 +379,14 @@ private void serviceRing(RingEntry e) {
spare = MmapSegment.createInMemory(e.ring.nextSeqHint(), segmentSizeBytes);
} else {
path = nextSparePath(e.dir);
spare = MmapSegment.create(path, e.ring.nextSeqHint(), segmentSizeBytes);
// Native path bytes (NUL-terminated) live in pathScratch
// from the call above. Hand them straight to MmapSegment.create
// via its long-ptr overload, bypassing the byte[] + native
// malloc that the String overload would incur on every
// rotation.
spare = MmapSegment.create(FilesFacade.INSTANCE,
pathScratch.ptr(), path,
e.ring.nextSeqHint(), segmentSizeBytes);
}
Runnable installHook = beforeInstallSyncHook;
if (installHook != null) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/io/questdb/client/std/DefaultFilesFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,26 @@ public int openCleanRW(String path, long size) {
return Files.openCleanRW(path, size);
}

@Override
public int openCleanRW(long pathPtr, long size) {
return Files.openCleanRW(pathPtr, size);
}

@Override
public int openRW(String path) {
return Files.openRW(path);
}

@Override
public int openRW(long pathPtr) {
return Files.openRW(pathPtr);
}

@Override
public long length(long pathPtr) {
return Files.length(pathPtr);
}

@Override
public long read(int fd, long addr, long len, long offset) {
return Files.read(fd, addr, len, offset);
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/io/questdb/client/std/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ public static int openRW(String path) {
}
}

/**
* Variant of {@link #openRW(String)} that takes a pre-encoded native UTF-8
* path pointer (from {@link #allocNativePath(String)} or a reused
* {@code DirectUtf8Sink}). Lets callers in hot paths avoid the {@code
* byte[]} + native-malloc allocations that {@link #pathPtr(String)} incurs
* on every call. The pointer must reference a null-terminated UTF-8 string.
*/
public static int openRW(long pathPtr) {
return openRW0(pathPtr);
}

/**
* Opens {@code path} for append-only writes, creating it (mode 0644) if
* absent. Every {@link #append(int, long, long)} writes at end-of-file
Expand Down Expand Up @@ -174,6 +185,16 @@ public static int openCleanRW(String path, long size) {
}
}

/**
* Variant of {@link #openCleanRW(String, long)} taking a pre-encoded
* native UTF-8 path pointer; lets callers cache the encoded path and
* skip the per-call {@code byte[]} + native-malloc that
* {@link #pathPtr(String)} incurs.
*/
public static int openCleanRW(long pathPtr, long size) {
return openCleanRW0(pathPtr, size);
}

/**
* Returns the on-disk size of {@code path} via {@code stat}, or -1 if
* the path does not exist or is otherwise unreadable.
Expand All @@ -187,6 +208,14 @@ public static long length(String path) {
}
}

/**
* Variant of {@link #length(String)} taking a pre-encoded native UTF-8
* path pointer; same allocation-elision rationale as {@link #openRW(long)}.
*/
public static long length(long pathPtr) {
return length0(pathPtr);
}

/**
* Creates a directory at {@code path} with the given mode (POSIX-style
* permission bits, e.g. {@link #DIR_MODE_DEFAULT}). Returns 0 on success,
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/io/questdb/client/std/FilesFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,25 @@ public interface FilesFacade {

int openCleanRW(String path, long size);

/**
* Variant of {@link #openCleanRW(String, long)} taking a pre-encoded
* native UTF-8 path pointer; lets callers in hot paths cache the encoded
* path (e.g. via a reused {@code DirectUtf8Sink}) and skip the per-call
* {@code byte[]} + native-malloc allocation.
*/
int openCleanRW(long pathPtr, long size);

int openRW(String path);

/** Variant of {@link #openRW(String)} taking a pre-encoded native UTF-8 path pointer. */
int openRW(long pathPtr);

/**
* Variant of {@code length(String)} taking a pre-encoded native UTF-8 path
* pointer; same allocation-elision rationale as {@link #openRW(long)}.
*/
long length(long pathPtr);

long read(int fd, long addr, long len, long offset);

boolean remove(String path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testCreateFailsCleanlyWhenAllocateReturnsFalse() throws Exception {
FaultyFilesFacade ff = new FaultyFilesFacade();
ff.failOnAllocate = true;
try {
MmapSegment.create(ff, path, 0L, sizeBytes);
MmapSegment.create(ff, path, 0L, sizeBytes).close();
fail("expected MmapSegmentException from failed pre-allocation");
} catch (MmapSegmentException expected) {
assertTrue(expected.getMessage(),
Expand All @@ -168,7 +168,7 @@ public void testCreateFailsCleanlyWhenOpenCleanRWReturnsMinusOne() throws Except
FaultyFilesFacade ff = new FaultyFilesFacade();
ff.failOnOpenCleanRW = true;
try {
MmapSegment.create(ff, path, 0L, sizeBytes);
MmapSegment.create(ff, path, 0L, sizeBytes).close();
fail("expected MmapSegmentException from openCleanRW returning -1");
} catch (MmapSegmentException expected) {
assertTrue(expected.getMessage(),
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testCreateRepeatedAllocateFailuresDoNotAccumulateOrphans() throws Ex
int attempts = 50;
for (int i = 0; i < attempts; i++) {
try {
MmapSegment.create(ff, tmpDir + "/seg-" + i + ".sfa", 0L, sizeBytes);
MmapSegment.create(ff, tmpDir + "/seg-" + i + ".sfa", 0L, sizeBytes).close();
fail("expected MmapSegmentException on iteration " + i);
} catch (MmapSegmentException ignored) {
// expected
Expand Down Expand Up @@ -604,11 +604,30 @@ public int openCleanRW(String path, long size) {
return INSTANCE.openCleanRW(path, size);
}

@Override
public int openCleanRW(long pathPtr, long size) {
openCleanRWCalls++;
if (failOnOpenCleanRW) {
return -1;
}
return INSTANCE.openCleanRW(pathPtr, size);
}

@Override
public int openRW(String path) {
return INSTANCE.openRW(path);
}

@Override
public int openRW(long pathPtr) {
return INSTANCE.openRW(pathPtr);
}

@Override
public long length(long pathPtr) {
return INSTANCE.length(pathPtr);
}

@Override
public long read(int fd, long addr, long len, long offset) {
return INSTANCE.read(fd, addr, len, offset);
Expand All @@ -622,6 +641,7 @@ public boolean remove(String path) {

@Override
public boolean remove(long pathPtr) {
removeCalls++;
return INSTANCE.remove(pathPtr);
}

Expand Down
Loading