Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions include/paimon/io/buffered_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include <cstdint>
#include <functional>
#include <memory>
#include <string>

#include "paimon/fs/file_system.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/visibility.h"

namespace paimon {
class Bytes;
class MemoryPool;

/// A buffered input stream that wraps another `InputStream` to provide buffering capabilities.
///
/// `BufferedInputStream` improves I/O performance by reducing the number of system calls
/// through internal buffering. It reads data from the underlying stream in larger chunks
/// and serves subsequent read requests from the internal buffer when possible.
class PAIMON_EXPORT BufferedInputStream : public InputStream {
public:
/// Creates a new buffered input stream that wraps the provided input stream.
/// The buffer is allocated from the specified memory pool.
///
/// @param in The underlying input stream to wrap.
/// @param buffer_size Size of the internal buffer in bytes.
/// @param pool Memory pool for buffer allocation.
BufferedInputStream(const std::shared_ptr<InputStream>& in, int32_t buffer_size,
MemoryPool* pool);

~BufferedInputStream() noexcept override;

Status Seek(int64_t offset, SeekOrigin origin) override;

Result<int64_t> GetPos() const override;

Result<int32_t> Read(char* buffer, uint32_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override;

Status Close() override;

Result<std::string> GetUri() const override;

static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192;

private:
/// Fill the internal buffer from the underlying stream.
Status Fill();

/// Internal read implementation.
/// @pre size > 0
Result<int32_t> InnerRead(char* buffer, int32_t size);

/// Validate that the expected number of bytes were read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;

private:
int32_t buffer_size_;
int32_t pos_ = 0;
int32_t count_ = 0;
std::unique_ptr<Bytes> buffer_;
std::shared_ptr<InputStream> in_;
};

} // namespace paimon
67 changes: 67 additions & 0 deletions include/paimon/io/byte_array_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>
#include <functional>
#include <string>

#include "paimon/fs/file_system.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/visibility.h"

namespace paimon {
/// Input stream for memory buffer, inherits from `InputStream`.
class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
public:
ByteArrayInputStream(const char* buffer, uint64_t length);
~ByteArrayInputStream() override = default;

/// @return The raw data pointer of current pos.
const char* GetRawData() const;

Status Seek(int64_t offset, SeekOrigin origin) override;

Result<int64_t> GetPos() const override {
return position_;
}

Result<int32_t> Read(char* buffer, uint32_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override {
return length_;
}

Status Close() override;

Result<std::string> GetUri() const override;

private:
const char* buffer_;
const uint64_t length_;
int64_t position_;
};
} // namespace paimon
97 changes: 97 additions & 0 deletions include/paimon/io/data_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>
#include <memory>
#include <string>

#include "paimon/io/byte_order.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/visibility.h"

namespace paimon {
class Bytes;
class InputStream;

/// `DataInputStream` provides a convenient wrapper around `InputStream` for reading typed data.
/// @note The default byte order is big-endian to maintain compatibility with the Java
/// implementation.
class PAIMON_EXPORT DataInputStream {
public:
/// Constructs a `DataInputStream` wrapping the given `InputStream`.
/// @param input_stream The underlying input stream to read from.
explicit DataInputStream(const std::shared_ptr<InputStream>& input_stream);

/// Seek to a specific position in the underlying input stream.
/// @param offset The absolute byte offset to seek to.
Status Seek(int64_t offset) const;

/// Read a typed value from the stream.
/// @return Result containing the read value or an error status.
template <typename T>
Result<T> ReadValue() const;

/// Read some bytes to a `Bytes` object from the stream. The length of bytes is the number of
/// bytes read from the stream.
/// @param bytes Buffer to store the read bytes.
Status ReadBytes(Bytes* bytes) const;

/// Read raw data of specified size from the stream.
/// @param data Buffer to store the read data.
/// @param size Number of bytes to read.
Status Read(char* data, uint32_t size) const;

/// Read string from the stream.
/// @note First read length (int16), then read string bytes.
Result<std::string> ReadString() const;

/// Get the current position in the underlying input stream.
Result<int64_t> GetPos() const;

/// Get the total length of the underlying input stream.
Result<uint64_t> Length() const;

/// Set the byte order for endianness conversion.
/// @param order The byte order to use `PAIMON_BIG_ENDIAN` or `PAIMON_LITTLE_ENDIAN`.
void SetOrder(ByteOrder order) {
byte_order_ = order;
}

private:
/// Validate that the expected number of bytes were read.
/// @param read_length Expected number of bytes to read.
/// @param actual_read_length Actual number of bytes read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;

/// Check if there are enough bytes available to read.
/// @param need_length Number of bytes needed.
Status AssertBoundary(int32_t need_length) const;

/// Determine if byte swapping is needed based on current byte order and system endianness.
/// @return `true` if byte swapping is required, `false` otherwise.
bool NeedSwap() const;

private:
std::shared_ptr<InputStream> input_stream_;
ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
};
} // namespace paimon
Loading