Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions include/paimon/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @return Result containing the actual number of bytes read on success, or an error status on
/// failure.
/// @note The stream position advances by the number of bytes actually read.
virtual Result<int32_t> Read(char* buffer, uint32_t size) = 0;
virtual Result<int64_t> Read(char* buffer, int64_t size) = 0;

/// Read data from given position in the stream.
///
Expand All @@ -83,7 +83,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @param[out] buffer The buffer to store the read content.
/// @param size The number of bytes to read.
/// @param offset The position in the stream to read from.
virtual Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) = 0;
virtual Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) = 0;

/// Asynchronously read data from the input stream.
///
Expand All @@ -98,7 +98,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @param callback The callback function to be invoked upon completion of the read operation.
/// The callback will receive a Status object indicating the success or failure
/// of the read operation.
virtual void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
virtual void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) = 0;

/// Get an identifier that uniquely identify the underlying content.
Expand All @@ -107,7 +107,7 @@ class PAIMON_EXPORT InputStream : public Stream {
virtual Result<std::string> GetUri() const = 0;

/// Get the total length of the file in bytes.
virtual Result<uint64_t> Length() const = 0;
virtual Result<int64_t> Length() const = 0;
};

/// Abstract class for output stream operations.
Expand All @@ -121,7 +121,7 @@ class PAIMON_EXPORT OutputStream : public Stream {
/// @return Result containing the actual number of bytes written on success, or an error status
/// on failure.
/// @note The stream position advances by the number of bytes actually written.
virtual Result<int32_t> Write(const char* buffer, uint32_t size) = 0;
virtual Result<int64_t> Write(const char* buffer, int64_t size) = 0;

/// Flush pending data to the disk.
virtual Status Flush() = 0;
Expand Down Expand Up @@ -160,7 +160,7 @@ class PAIMON_EXPORT FileStatus {

/// Get the size of the file in bytes.
/// @note For directories, this method is undefined behavior.
virtual uint64_t GetLen() const = 0;
virtual int64_t GetLen() const = 0;

/// Check if this entry represents a directory.
virtual bool IsDir() const = 0;
Expand Down
22 changes: 11 additions & 11 deletions include/paimon/io/buffered_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PAIMON_EXPORT BufferedInputStream : public InputStream {
/// @param in The underlying input stream to wrap.
/// @param buffer_size Size of the internal buffer in bytes.
/// @param pool Memory pool for buffer allocation.
BufferedInputStream(const std::shared_ptr<InputStream>& in, int32_t buffer_size,
BufferedInputStream(const std::shared_ptr<InputStream>& in, uint64_t buffer_size,
MemoryPool* pool);

~BufferedInputStream() noexcept override;
Expand All @@ -51,36 +51,36 @@ class PAIMON_EXPORT BufferedInputStream : public InputStream {

Result<int64_t> GetPos() const override;

Result<int32_t> Read(char* buffer, uint32_t size) override;
Result<int64_t> Read(char* buffer, int64_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;
Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override;
Result<int64_t> Length() const override;

Status Close() override;

Result<std::string> GetUri() const override;

static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192;
static constexpr uint64_t DEFAULT_BUFFER_SIZE = 8192;

private:
/// Fill the internal buffer from the underlying stream.
Status Fill();

/// Internal read implementation.
/// @pre size > 0
Result<int32_t> InnerRead(char* buffer, int32_t size);
Result<int64_t> InnerRead(char* buffer, int64_t size);

/// Validate that the expected number of bytes were read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;
Status AssertReadLength(int64_t read_length, int64_t actual_read_length) const;

private:
int32_t buffer_size_;
int32_t pos_ = 0;
int32_t count_ = 0;
uint64_t buffer_size_;
uint64_t pos_ = 0;
uint64_t count_ = 0;
std::unique_ptr<Bytes> buffer_;
std::shared_ptr<InputStream> in_;
};
Expand Down
10 changes: 5 additions & 5 deletions include/paimon/io/byte_array_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
return position_;
}

Result<int32_t> Read(char* buffer, uint32_t size) override;
Result<int64_t> Read(char* buffer, int64_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;
Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override {
Result<int64_t> Length() const override {
return length_;
}

Expand All @@ -59,6 +59,6 @@ class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
private:
const char* buffer_;
const uint64_t length_;
int64_t position_;
uint64_t position_;
};
} // namespace paimon
8 changes: 4 additions & 4 deletions include/paimon/io/data_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PAIMON_EXPORT DataInputStream {
/// Read raw data of specified size from the stream.
/// @param data Buffer to store the read data.
/// @param size Number of bytes to read.
Status Read(char* data, uint32_t size) const;
Status Read(char* data, int64_t size) const;

/// Read string from the stream.
/// @note First read length (int16), then read string bytes.
Expand All @@ -65,7 +65,7 @@ class PAIMON_EXPORT DataInputStream {
Result<int64_t> GetPos() const;

/// Get the total length of the underlying input stream.
Result<uint64_t> Length() const;
Result<int64_t> Length() const;

/// Set the byte order for endianness conversion.
/// @param order The byte order to use `PAIMON_BIG_ENDIAN` or `PAIMON_LITTLE_ENDIAN`.
Expand All @@ -77,11 +77,11 @@ class PAIMON_EXPORT DataInputStream {
/// Validate that the expected number of bytes were read.
/// @param read_length Expected number of bytes to read.
/// @param actual_read_length Actual number of bytes read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;
Status AssertReadLength(int64_t read_length, int64_t actual_read_length) const;

/// Check if there are enough bytes available to read.
/// @param need_length Number of bytes needed.
Status AssertBoundary(int32_t need_length) const;
Status AssertBoundary(int64_t need_length) const;

/// Determine if byte swapping is needed based on current byte order and system endianness.
/// @return `true` if byte swapping is required, `false` otherwise.
Expand Down
2 changes: 1 addition & 1 deletion include/paimon/memory/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class PAIMON_EXPORT Bytes {
/// @param length Number of bytes to allocate.
/// @param pool Memory pool to use for allocation.
/// @return Unique pointer to the newly allocated Bytes object.
static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(int32_t length, MemoryPool* pool);
static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(uint64_t length, MemoryPool* pool);

/// Allocate a new Bytes object from string data.
///
Expand Down
16 changes: 14 additions & 2 deletions src/paimon/common/data/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,20 @@ Result<std::unique_ptr<InputStream>> Blob::NewInputStream(
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> file,
fs->Open(impl_->GetDescriptor()->Uri()));

return OffsetInputStream::Create(std::move(file), impl_->GetDescriptor()->Length(),
impl_->GetDescriptor()->Offset());
int64_t blob_length = impl_->GetDescriptor()->Length();
int64_t blob_offset = impl_->GetDescriptor()->Offset();

PAIMON_ASSIGN_OR_RAISE(int64_t total_length, file->Length());
if (PAIMON_UNLIKELY(blob_offset > total_length)) {
return Status::Invalid(
fmt::format("offset {} exceed total length {}", blob_offset, total_length));
}
if (blob_length == -1) {
// blob_length == -1 means it's dynamic length, should read to the end
blob_length = total_length - blob_offset;
}

return OffsetInputStream::Create(std::move(file), blob_length, blob_offset);
}

Result<PAIMON_UNIQUE_PTR<Bytes>> Blob::ToData(const std::shared_ptr<FileSystem>& fs,
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/data/blob_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TEST_F(BlobTest, TestNewInputStreamWithOffsetAndLength) {
ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_));
ASSERT_TRUE(input_stream);

ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());
ASSERT_EQ(6, length);

// Test reading with offset and length applied
Expand All @@ -133,7 +133,7 @@ TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) {
ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_));
ASSERT_TRUE(input_stream);

ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());
ASSERT_EQ(12, length);

// Test reading from offset to end (should read "cdefghijklmn")
Expand Down
3 changes: 2 additions & 1 deletion src/paimon/common/data/serializer/binary_row_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ Status BinaryRowSerializer::SerializeWithoutLength(const BinaryRow& record,
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
BinaryRow row(num_fields_);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(read_length, pool_.get());
std::shared_ptr<Bytes> bytes =
Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
return row;
Comment on lines 44 to 51
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ TEST_F(BitmapIndexTest, TestHighCardinalityForCompatibility) {
auto file_system = std::make_unique<LocalFileSystem>();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
file_system->Open(index_file_name));
ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());

BitmapFileIndex file_index({});
ASSERT_OK_AND_ASSIGN(auto reader, file_index.CreateReader(CreateArrowSchema(type).get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ Result<std::shared_ptr<FileIndexReader>> BloomFilterFileIndex::CreateReader(

PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
auto bytes = std::make_shared<Bytes>(length, pool.get());
PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len,
input_stream->Read(bytes->data(), bytes->size()));
if (static_cast<size_t>(actual_read_len) != bytes->size()) {
if (actual_read_len != bytes->size()) {
return Status::Invalid(
fmt::format("create reader for BloomFilterFileIndex failed, expected read len "
"{}, actual read len {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ Result<std::shared_ptr<FileIndexReader>> BitSliceIndexBitmapFileIndex::CreateRea

PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
auto bytes = std::make_unique<Bytes>(length, pool.get());
PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len,
input_stream->Read(bytes->data(), bytes->size()));
if (static_cast<size_t>(actual_read_len) != bytes->size()) {
if (actual_read_len != bytes->size()) {
return Status::Invalid(
fmt::format("create reader for BitSliceIndexBitmapFileIndex failed, expected read len "
"{}, actual read len {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <algorithm>
#include <cmath>
#include <limits>
#include <string>

#include "paimon/common/io/memory_segment_output_stream.h"
Expand All @@ -43,7 +44,7 @@ Result<std::unique_ptr<BitSliceIndexBitmap>> BitSliceIndexBitmap::Create(
PAIMON_ASSIGN_OR_RAISE(int8_t slices_size, data_in->ReadValue<int8_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t ebm_size, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t indexes_length, data_in->ReadValue<int32_t>());
auto indexes = Bytes::AllocateBytes(indexes_length, pool.get());
auto indexes = Bytes::AllocateBytes(static_cast<uint64_t>(indexes_length), pool.get());
PAIMON_RETURN_NOT_OK(data_in->Read(indexes->data(), indexes_length));
Comment on lines 44 to 48
Comment on lines 44 to 48
return std::unique_ptr<BitSliceIndexBitmap>(new BitSliceIndexBitmap(
Comment on lines 44 to 49
indexes_length, std::move(indexes), ebm_size, slices_size, input_stream,
Expand Down Expand Up @@ -82,7 +83,7 @@ BitSliceIndexBitmap::BitSliceIndexBitmap(int32_t indexes_length, PAIMON_UNIQUE_P
Result<const RoaringBitmap32*> BitSliceIndexBitmap::GetExistenceBitmap() {
if (!ebm_.has_value()) {
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(ebm_length_, pool_.get());
const auto bytes = Bytes::AllocateBytes(static_cast<uint64_t>(ebm_length_), pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), ebm_length_));
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), ebm_length_));
Expand Down Expand Up @@ -118,8 +119,8 @@ Status BitSliceIndexBitmap::LoadSlices(int32_t start, int32_t end) {
length += slice_length;
}
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_ + ebm_length_ + offset, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(length, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), length));
const auto bytes = Bytes::AllocateBytes(static_cast<uint64_t>(length), pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), static_cast<uint64_t>(length)));
int32_t byte_position = 0;
for (int32_t i = start; i < end; ++i) {
const int32_t slice_length = lengths[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TEST_F(RangeBitmapIoTest, TestSimple) {
ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
fs_->Create(file_path, /*overwrite=*/false));
ASSERT_OK_AND_ASSIGN(
int32_t write_len,
uint64_t write_len,
out->Write(reinterpret_cast<char*>(serialized_bytes->data()), serialized_bytes->size()));
ASSERT_EQ(write_len, serialized_bytes->size());
ASSERT_OK(out->Flush());
Expand Down
18 changes: 12 additions & 6 deletions src/paimon/common/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>

#include "fmt/format.h"
#include "paimon/common/utils/math.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/common/utils/string_utils.h"
Expand Down Expand Up @@ -48,10 +49,14 @@ Status FileSystem::ReadFile(const std::string& path, std::string* content) {
Status s = in->Close();
(void)s;
});
PAIMON_ASSIGN_OR_RAISE(uint64_t length, in->Length());
content->resize(length);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, in->Read(content->data(), length));
if (read_length != static_cast<int32_t>(length)) {
PAIMON_ASSIGN_OR_RAISE(int64_t length, in->Length());
if (length < 0) {
return Status::Invalid(
fmt::format("path {}, file length {} is less than 0", path, length));
}
content->resize(static_cast<std::string::size_type>(length));
PAIMON_ASSIGN_OR_RAISE(int64_t read_length, in->Read(content->data(), length));
if (read_length != length) {
return Status::IOError(fmt::format("path {}, expect read len {}, actual read len {}",
path, length, read_length));
}
Expand All @@ -69,8 +74,9 @@ Status FileSystem::WriteFile(const std::string& path, const std::string& content
Status s = out->Close();
(void)s;
});
int32_t length = content.size();
PAIMON_ASSIGN_OR_RAISE(int32_t write_length, out->Write(content.data(), length));
PAIMON_RETURN_NOT_OK(ValidateValueInRange<int64_t>(content.size(), "content length"));
int64_t length = static_cast<int64_t>(content.size());
PAIMON_ASSIGN_OR_RAISE(int64_t write_length, out->Write(content.data(), length));
if (write_length != length) {
return Status::IOError(fmt::format("path {}, expect write len {}, actual write len {}",
path, length, write_length));
Expand Down
Loading
Loading