Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ resources/*.xml
*.o
.vscode
cpp/pixels-retina/third_party/

# AI tools
.codex
.claude/
.cursor/
.continue/
.aider*
.ai/
.notes/
CLAUDE.local.md
AGENTS.md.local
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Joiner;
import io.pixelsdb.pixels.cli.Main;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.exception.RetinaException;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Compact;
Expand Down Expand Up @@ -261,7 +262,10 @@ public void execute(Namespace ns, String command) throws Exception
// Issue #192: wait for the compaction to complete.
compactExecutor.shutdown();
while (!compactExecutor.awaitTermination(100, TimeUnit.SECONDS));
metadataService.addFiles(compactFiles);
if (!metadataService.addFiles(compactFiles))
{
throw new MetadataException("failed to add compact files to metadata");
}

if (retinaService.isEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.pixelsdb.pixels.cli.executor;

import com.google.common.collect.ImmutableList;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.File;
import io.pixelsdb.pixels.common.metadata.domain.Layout;
Expand Down Expand Up @@ -67,7 +68,10 @@ public void execute(Namespace ns, String command) throws Exception
try
{
List<File> importFiles = getImportFiles(ordered, writableLayout);
metadataService.addFiles(importFiles);
if (!metadataService.addFiles(importFiles))
{
throw new MetadataException("failed to import pixels files into metadata");
}
System.out.println(command + " is successful");
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ public void execute(Namespace ns, String command) throws Exception
{
File file = loadedInfo.loadedFile;
Path path = loadedInfo.loadedPath;
metadataService.updateFile(file);
if (!metadataService.updateFile(file))
{
throw new MetadataException("failed to publish loaded file " + file.getName());
}
try
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,14 @@ private void cleanupTemporaryFiles()
{
for (File tmpFile : tmpFiles)
{
if (tmpFile.getType() == File.Type.TEMPORARY)
if (tmpFile.getType() == File.Type.TEMPORARY_INGEST)
{
try
{
metadataService.deleteFiles(Collections.singletonList((tmpFile.getId())));
if (!metadataService.deleteFiles(Collections.singletonList((tmpFile.getId()))))
{
throw new MetadataException("failed to delete temporary load file " + tmpFile.getId());
}
} catch (MetadataException e)
{
e.printStackTrace();
Expand Down Expand Up @@ -207,11 +210,14 @@ protected File openTmpFile(String fileName, Path filePath) throws MetadataExcept
{
File file = new File();
file.setName(fileName);
file.setType(File.Type.TEMPORARY);
file.setType(File.Type.TEMPORARY_INGEST);
file.setNumRowGroup(1);
file.setPathId(filePath.getId());
String tmpFilePath = filePath.getUri() + "/" + fileName;
this.metadataService.addFiles(Collections.singletonList(file));
if (!this.metadataService.addFiles(Collections.singletonList(file)))
{
throw new MetadataException("failed to add temporary load file " + tmpFilePath);
}
file.setId(metadataService.getFileId(tmpFilePath));
return file;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@ public class MainIndexBuffer implements Closeable
private final MainIndexCache indexCache;
private boolean populateCache = false;

public static final class FlushSnapshot
{
private final long fileId;
private final int entryCount;
private final List<RowIdRange> rowIdRanges;

private FlushSnapshot(long fileId, int entryCount, List<RowIdRange> rowIdRanges)
{
this.fileId = fileId;
this.entryCount = entryCount;
this.rowIdRanges = Collections.unmodifiableList(new ArrayList<>(rowIdRanges));
}

public long getFileId()
{
return fileId;
}

public int getEntryCount()
{
return entryCount;
}

public List<RowIdRange> getRowIdRanges()
{
return rowIdRanges;
}

public boolean isEmpty()
{
return entryCount == 0;
}
}

/**
* Create a main index buffer and bind the main index cache to it.
* Entries put into this buffer will also be put into the cache.
Expand Down Expand Up @@ -143,20 +177,19 @@ public IndexProto.RowLocation lookup(long rowId) throws MainIndexException
}

/**
* Flush the (row id -> row location) mappings of the given file id into ranges and remove them from the buffer.
* This method does not evict the main index cache bind to this buffer as the cached entries are not out of date.
* However, this method may disable synchronous cache population and clear the cache if remaining file ids in the
* buffer is below or equals to the {@link #CACHE_POP_ENABLE_THRESHOLD}.
* Build a stable snapshot of the (row id -> row location) mappings of the given file id.
* This method must not mutate the buffer or cache; callers should only discard the buffered
* entries after the snapshot has been durably committed.
* @param fileId the given file id to flush
* @return the flushed row id ranges to be persisited into the storage
* @return the row id range snapshot to be persisted into the storage
* @throws MainIndexException
*/
public List<RowIdRange> flush(long fileId) throws MainIndexException
public FlushSnapshot snapshotForFlush(long fileId) throws MainIndexException
{
Map<Long, IndexProto.RowLocation> fileBuffer = this.indexBuffer.get(fileId);
if (fileBuffer == null)
{
return null;
return new FlushSnapshot(fileId, 0, Collections.emptyList());
}
ImmutableList.Builder<RowIdRange> ranges = ImmutableList.builder();
RowIdRange.Builder currRangeBuilder = new RowIdRange.Builder();
Expand Down Expand Up @@ -210,16 +243,34 @@ public List<RowIdRange> flush(long fileId) throws MainIndexException
// release the flushed file index buffer
if(fileBuffer.size() != rowIds.length)
{
throw new MainIndexException("FileBuffer Changed while flush");
throw new MainIndexException("FileBuffer changed while building flush snapshot");
}
return new FlushSnapshot(fileId, rowIds.length, ranges.build());
}

/**
* Discard a flush snapshot after the backing store has durably committed it.
* @param snapshot the committed snapshot
* @throws MainIndexException if the buffer no longer matches the committed snapshot
*/
public void discardFlushed(FlushSnapshot snapshot) throws MainIndexException
{
if (snapshot.isEmpty())
{
return;
}
Map<Long, IndexProto.RowLocation> fileBuffer = this.indexBuffer.get(snapshot.getFileId());
if (fileBuffer == null || fileBuffer.size() != snapshot.getEntryCount())
{
throw new MainIndexException("FileBuffer changed before committed flush discard");
}
fileBuffer.clear();
this.indexBuffer.remove(fileId);
this.indexBuffer.remove(snapshot.getFileId());
if (this.indexBuffer.size() <= CACHE_POP_ENABLE_THRESHOLD)
{
this.populateCache = false;
this.indexCache.evictAllEntries();
}
return ranges.build();
}

public List<Long> cachedFileIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ public boolean addFiles(Collection<File> files) throws MetadataException
{
throw new MetadataException("failed to add file", e);
}
return false;
return true;
}

/**
Expand Down Expand Up @@ -1420,14 +1420,17 @@ public File.Type getFileType(String filePathUri) throws MetadataException
{
throw new MetadataException("response token does not match.");
}
return File.Type.valueOf(response.getFileType().getNumber());
return File.Type.valueOf(response.getFileTypeValue());
}
catch (Exception e)
{
throw new MetadataException("failed to get file type", e);
}
}

/**
* Return query-visible REGULAR files under the path.
*/
public List<File> getFiles(long pathId) throws MetadataException
{
String token = UUID.randomUUID().toString();
Expand Down Expand Up @@ -1476,7 +1479,7 @@ public boolean updateFile(File file) throws MetadataException
{
throw new MetadataException("failed to update file", e);
}
return false;
return true;
}

public boolean deleteFiles(List<Long> fileIds) throws MetadataException
Expand All @@ -1502,7 +1505,7 @@ public boolean deleteFiles(List<Long> fileIds) throws MetadataException
{
throw new MetadataException("failed to delete files", e);
}
return false;
return true;
}

/**
Expand Down Expand Up @@ -1537,8 +1540,8 @@ public File getFileById(long fileId) throws MetadataException
}

/**
* Atomically promote a TEMPORARY file to REGULAR and delete the old files.
* @param newFileId the id of the new TEMPORARY file to promote
* Atomically promote a temporary file to REGULAR and delete the old files.
* @param newFileId the id of the new temporary file to promote
* @param oldFileIds the ids of old files to delete
* @throws MetadataException if the request fails
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,37 @@
*/
public class File extends Base
{
/**
* Files such as loaded and compacted are marked as REGULAR, while file
* created by pixelsWriterImpl during build are marked as TEMPORARY.
*/
public enum Type
{
TEMPORARY, REGULAR;
TEMPORARY_INGEST(0),
REGULAR(1),
TEMPORARY_GC(2),
RETIRED(3);

private final int number;

Type(int number)
{
this.number = number;
}

public int getNumber()
{
return number;
}

public static Type valueOf(int number)
{
switch (number)
{
case 0:
return TEMPORARY;
return TEMPORARY_INGEST;
case 1:
return REGULAR;
case 2:
return TEMPORARY_GC;
case 3:
return RETIRED;
default:
throw new InvalidArgumentException("invalid number for File.Type");
}
Expand All @@ -61,6 +76,7 @@ public static Type valueOf(int number)
private long minRowId;
private long maxRowId;
private long pathId;
private Long cleanupAt;

public File()
{
Expand All @@ -70,11 +86,12 @@ public File(MetadataProto.File file)
{
this.setId(file.getId());
this.name = file.getName();
this.type = Type.valueOf(file.getType().getNumber());
this.type = Type.valueOf(file.getTypeValue());
this.numRowGroup = file.getNumRowGroup();
this.minRowId = file.getMinRowId();
this.maxRowId = file.getMaxRowId();
this.pathId = file.getPathId();
this.cleanupAt = file.hasCleanupAt() ? file.getCleanupAt() : null;
}

public String getName()
Expand Down Expand Up @@ -137,6 +154,16 @@ public void setPathId(long pathId)
this.pathId = pathId;
}

public Long getCleanupAt()
{
return cleanupAt;
}

public void setCleanupAt(Long cleanupAt)
{
this.cleanupAt = cleanupAt;
}

public static List<File> convertFiles(List<MetadataProto.File> protoFiles)
{
requireNonNull(protoFiles, "protoFiles is null");
Expand Down Expand Up @@ -182,8 +209,14 @@ public static String getFilePath(Path path, File file)
@Override
public MetadataProto.File toProto()
{
return MetadataProto.File.newBuilder().setId(this.getId()).setName(this.name)
.setTypeValue(this.type.ordinal()).setNumRowGroup(this.numRowGroup)
.setMinRowId(this.minRowId).setMaxRowId(this.maxRowId).setPathId(this.pathId).build();
MetadataProto.File.Builder builder = MetadataProto.File.newBuilder()
.setId(this.getId()).setName(this.name)
.setTypeValue(this.type.getNumber()).setNumRowGroup(this.numRowGroup)
.setMinRowId(this.minRowId).setMaxRowId(this.maxRowId).setPathId(this.pathId);
if (this.cleanupAt != null)
{
builder.setCleanupAt(this.cleanupAt);
}
return builder.build();
}
}
Loading
Loading