Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,16 @@ function(resolve_avro_dependency)
NAMES
avro-cpp
CONFIG)
elseif(DEFINED ENV{ICEBERG_AVRO_GIT_URL})
# Support custom git URL for mirrors
fetchcontent_declare(avro-cpp
${FC_DECLARE_COMMON_OPTIONS}
GIT_REPOSITORY $ENV{ICEBERG_AVRO_GIT_URL}
GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5
SOURCE_SUBDIR
lang/c++
FIND_PACKAGE_ARGS
NAMES
avro-cpp
CONFIG)
else()
# Default to GitHub - uses unreleased version
if(DEFINED ENV{ICEBERG_AVRO_GIT_URL})
set(AVRO_GIT_REPOSITORY "$ENV{ICEBERG_AVRO_GIT_URL}")
else()
set(AVRO_GIT_REPOSITORY "https://github.com/apache/avro.git")
endif()
fetchcontent_declare(avro-cpp
${FC_DECLARE_COMMON_OPTIONS}
GIT_REPOSITORY https://github.com/apache/avro.git
GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5
GIT_REPOSITORY ${AVRO_GIT_REPOSITORY}
GIT_TAG 11fb55500bed9fbe9af53b85112cd13887f0ce80
SOURCE_SUBDIR
lang/c++
FIND_PACKAGE_ARGS
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,17 @@ class AvroReader::Impl {
return InvalidArgument("Projected schema is required by Avro reader");
}

batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
batch_size_ = options.properties.Get(ReaderProperties::kBatchSize);
read_schema_ = options.projection;

// Open the input stream and adapt to the avro interface.
ICEBERG_ASSIGN_OR_RAISE(
auto input_stream,
CreateInputStream(options,
options.properties->Get(ReaderProperties::kAvroBufferSize)));
options.properties.Get(ReaderProperties::kAvroBufferSize)));

// Create the appropriate backend based on configuration
if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) {
if (options.properties.Get(ReaderProperties::kAvroSkipDatum)) {
backend_ = std::make_unique<DirectDecoderBackend>();
} else {
backend_ = std::make_unique<GenericDatumBackend>();
Expand Down
57 changes: 46 additions & 11 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,41 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions
return std::make_unique<AvroOutputStream>(output, buffer_size);
}

Result<::avro::Codec> ParseCodec(const WriterProperties& properties) {
const auto& codec_name = properties.Get(WriterProperties::kAvroCompression);
::avro::Codec codec;
if (codec_name == "uncompressed") {
codec = ::avro::NULL_CODEC;
} else if (codec_name == "gzip") {
codec = ::avro::DEFLATE_CODEC;
} else if (codec_name == "snappy") {
codec = ::avro::SNAPPY_CODEC;
} else if (codec_name == "zstd") {
codec = ::avro::ZSTD_CODEC;
} else {
return InvalidArgument("Unsupported Avro codec: {}", codec_name);
}
ICEBERG_PRECHECK(::avro::isCodecAvailable(codec),
"Avro codec {} is not available in the current build", codec_name);
return codec;
}

Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
auto level_str = properties.Get(WriterProperties::kAvroCompressionLevel);
if (level_str.empty()) {
return std::nullopt;
}
ICEBERG_ASSIGN_OR_RAISE(auto level, StringUtils::ParseInt<int32_t>(level_str));
return level;
}

// Abstract base class for Avro write backends.
class AvroWriteBackend {
public:
virtual ~AvroWriteBackend() = default;
virtual Status Init(std::unique_ptr<AvroOutputStream> output_stream,
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
::avro::Codec codec, std::optional<int32_t> compression_level,
const std::map<std::string, std::vector<uint8_t>>& metadata) = 0;
virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
int64_t row_index) = 0;
Expand All @@ -70,10 +99,11 @@ class DirectEncoderBackend : public AvroWriteBackend {
public:
Status Init(std::unique_ptr<AvroOutputStream> output_stream,
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
::avro::Codec codec, std::optional<int32_t> compression_level,
const std::map<std::string, std::vector<uint8_t>>& metadata) override {
writer_ = std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream),
avro_schema, sync_interval,
::avro::NULL_CODEC, metadata);
writer_ = std::make_unique<::avro::DataFileWriterBase>(
std::move(output_stream), avro_schema, sync_interval, codec, metadata,
compression_level);
avro_root_node_ = avro_schema.root();
return {};
}
Expand Down Expand Up @@ -111,10 +141,11 @@ class GenericDatumBackend : public AvroWriteBackend {
public:
Status Init(std::unique_ptr<AvroOutputStream> output_stream,
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
::avro::Codec codec, std::optional<int32_t> compression_level,
const std::map<std::string, std::vector<uint8_t>>& metadata) override {
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), avro_schema, sync_interval, ::avro::NULL_CODEC,
metadata);
std::move(output_stream), avro_schema, sync_interval, codec, metadata,
compression_level);
datum_ = std::make_unique<::avro::GenericDatum>(avro_schema);
return {};
}
Expand Down Expand Up @@ -158,7 +189,7 @@ class AvroWriter::Impl {
::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
if (const auto& schema_name =
options.properties->Get(WriterProperties::kAvroSchemaName);
options.properties.Get(WriterProperties::kAvroSchemaName);
!schema_name.empty()) {
root->setName(::avro::Name(schema_name));
}
Expand All @@ -169,7 +200,7 @@ class AvroWriter::Impl {
ICEBERG_ASSIGN_OR_RAISE(
auto output_stream,
CreateOutputStream(options,
options.properties->Get(WriterProperties::kAvroBufferSize)));
options.properties.Get(WriterProperties::kAvroBufferSize)));
arrow_output_stream_ = output_stream->arrow_output_stream();

std::map<std::string, std::vector<uint8_t>> metadata;
Expand All @@ -181,15 +212,19 @@ class AvroWriter::Impl {
}

// Create the appropriate backend based on configuration
if (options.properties->Get(WriterProperties::kAvroSkipDatum)) {
if (options.properties.Get(WriterProperties::kAvroSkipDatum)) {
backend_ = std::make_unique<DirectEncoderBackend>();
} else {
backend_ = std::make_unique<GenericDatumBackend>();
}

ICEBERG_RETURN_UNEXPECTED(backend_->Init(
std::move(output_stream), *avro_schema_,
options.properties->Get(WriterProperties::kAvroSyncInterval), metadata));
ICEBERG_ASSIGN_OR_RAISE(auto codec, ParseCodec(options.properties));
ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties));

ICEBERG_RETURN_UNEXPECTED(
backend_->Init(std::move(output_stream), *avro_schema_,
options.properties.Get(WriterProperties::kAvroSyncInterval), codec,
compression_level, metadata));

ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
Expand Down
10 changes: 3 additions & 7 deletions src/iceberg/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
return reader;
}

std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
return std::unique_ptr<ReaderProperties>(new ReaderProperties());
}

std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
ReaderProperties ReaderProperties::FromMap(
const std::unordered_map<std::string, std::string>& properties) {
auto reader_properties = std::unique_ptr<ReaderProperties>(new ReaderProperties());
reader_properties->configs_ = properties;
ReaderProperties reader_properties;
reader_properties.configs_ = properties;
return reader_properties;
}

Expand Down
12 changes: 2 additions & 10 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,16 @@ class ReaderProperties : public ConfigBase<ReaderProperties> {

/// \brief The batch size to read.
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};

/// \brief Skip GenericDatum in Avro reader for better performance.
/// When true, decode directly from Avro to Arrow without GenericDatum intermediate.
/// Default: true (skip GenericDatum for better performance).
inline static Entry<bool> kAvroSkipDatum{"read.avro.skip-datum", true};

/// \brief The buffer size used by Avro input stream.
inline static Entry<int64_t> kAvroBufferSize{"read.avro.buffer-size", 1024 * 1024};

/// \brief Create a default ReaderProperties instance.
static std::unique_ptr<ReaderProperties> default_properties();

/// \brief Create a ReaderProperties instance from a map of key-value pairs.
static std::unique_ptr<ReaderProperties> FromMap(
static ReaderProperties FromMap(
const std::unordered_map<std::string, std::string>& properties);

private:
ReaderProperties() = default;
};

/// \brief Options for creating a reader.
Expand All @@ -116,7 +108,7 @@ struct ICEBERG_EXPORT ReaderOptions {
/// that may have different field names than the current schema.
std::shared_ptr<class NameMapping> name_mapping;
/// \brief Format-specific or implementation-specific properties.
std::shared_ptr<ReaderProperties> properties = ReaderProperties::default_properties();
ReaderProperties properties;
};

/// \brief Factory function to create a reader of a specific file format.
Expand Down
10 changes: 3 additions & 7 deletions src/iceberg/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
return writer;
}

std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
return std::unique_ptr<WriterProperties>(new WriterProperties());
}

std::unique_ptr<WriterProperties> WriterProperties::FromMap(
WriterProperties WriterProperties::FromMap(
const std::unordered_map<std::string, std::string>& properties) {
auto writer_properties = std::unique_ptr<WriterProperties>(new WriterProperties());
writer_properties->configs_ = properties;
WriterProperties writer_properties;
writer_properties.configs_ = properties;
return writer_properties;
}

Expand Down
22 changes: 11 additions & 11 deletions src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ class WriterProperties : public ConfigBase<WriterProperties> {

/// \brief The name of the Avro root node schema to write.
inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name", ""};

/// \brief The buffer size used by Avro output stream.
inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024};

/// \brief The sync interval used by Avro writer.
inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024};

/// \brief Whether to skip GenericDatum and use direct encoder for Avro writing.
/// When true, uses direct encoder (faster). When false, uses GenericDatum.
inline static Entry<bool> kAvroSkipDatum{"write.avro.skip-datum", true};
inline static Entry<std::string> kAvroCompression{"write.avro.compression-codec",
"gzip"};
inline static Entry<std::string> kAvroCompressionLevel{"write.avro.compression-level",
""};

/// TODO(gangwu): add more properties, like compression codec, compression level, etc.
inline static Entry<std::string> kParquetCompression{"write.parquet.compression-codec",
"zstd"};
inline static Entry<std::string> kParquetCompressionLevel{
"write.parquet.compression-level", ""};

/// \brief Create a default WriterProperties instance.
static std::unique_ptr<WriterProperties> default_properties();
/// TODO(gangwu): add table properties with write.avro|parquet|orc.*

/// \brief Create a WriterProperties instance from a map of key-value pairs.
static std::unique_ptr<WriterProperties> FromMap(
static WriterProperties FromMap(
const std::unordered_map<std::string, std::string>& properties);

private:
WriterProperties() = default;
};

/// \brief Options for creating a writer.
Expand All @@ -79,7 +79,7 @@ struct ICEBERG_EXPORT WriterOptions {
/// \brief Metadata to write to the file.
std::unordered_map<std::string, std::string> metadata;
/// \brief Format-specific or implementation-specific properties.
std::shared_ptr<WriterProperties> properties = WriterProperties::default_properties();
WriterProperties properties;
};

/// \brief Base writer class to write data from different file formats.
Expand Down
2 changes: 0 additions & 2 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,6 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
if (json.contains(kProperties)) {
ICEBERG_ASSIGN_OR_RAISE(auto properties, FromJsonMap(json, kProperties));
table_metadata->properties = TableProperties::FromMap(std::move(properties));
} else {
table_metadata->properties = TableProperties::default_properties();
}

// This field is optional, but internally we set this to -1 when not set
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
std::unordered_map<std::string, std::string> metadata, std::string_view schema_name) {
auto writer_properties = WriterProperties::default_properties();
WriterProperties writer_properties;
if (!schema_name.empty()) {
writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
writer_properties.Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
}
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
FileFormatType::kAvro,
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ParquetReader::Impl {
::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(
options.properties->Get(ReaderProperties::kBatchSize));
options.properties.Get(ReaderProperties::kBatchSize));
arrow_reader_properties.set_arrow_extensions_enabled(true);

// Open the Parquet file reader
Expand Down
39 changes: 37 additions & 2 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,48 @@ Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
return output;
}

Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) {
const auto& compression_name = properties.Get(WriterProperties::kParquetCompression);
if (compression_name == "uncompressed") {
return ::arrow::Compression::UNCOMPRESSED;
} else if (compression_name == "snappy") {
return ::arrow::Compression::SNAPPY;
} else if (compression_name == "gzip") {
return ::arrow::Compression::GZIP;
} else if (compression_name == "brotli") {
return ::arrow::Compression::BROTLI;
} else if (compression_name == "lz4") {
return ::arrow::Compression::LZ4;
} else if (compression_name == "zstd") {
return ::arrow::Compression::ZSTD;
} else {
return InvalidArgument("Unsupported Parquet compression codec: ", compression_name);
}
}

Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
auto level_str = properties.Get(WriterProperties::kParquetCompressionLevel);
if (level_str.empty()) {
return std::nullopt;
}
ICEBERG_ASSIGN_OR_RAISE(auto level, StringUtils::ParseInt<int32_t>(level_str));
return level;
}

} // namespace

class ParquetWriter::Impl {
public:
Status Open(const WriterOptions& options) {
auto writer_properties =
::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
ICEBERG_ASSIGN_OR_RAISE(auto compression, ParseCompression(options.properties));
ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties));

auto properties_builder = ::parquet::WriterProperties::Builder();
properties_builder.compression(compression);
if (compression_level.has_value()) {
properties_builder.compression_level(compression_level.value());
}
auto writer_properties = properties_builder.memory_pool(pool_)->build();
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();

ArrowSchema c_schema;
Expand Down
2 changes: 0 additions & 2 deletions src/iceberg/table_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const std::unordered_set<std::string>& TableProperties::commit_properties() {
return kCommitProperties;
}

TableProperties TableProperties::default_properties() { return {}; }

TableProperties TableProperties::FromMap(
std::unordered_map<std::string, std::string> properties) {
TableProperties table_properties;
Expand Down
5 changes: 0 additions & 5 deletions src/iceberg/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,6 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase<TableProperties> {
/// \brief Get the set of commit table property keys.
static const std::unordered_set<std::string>& commit_properties();

/// \brief Create a default TableProperties instance.
///
/// \return A unique pointer to a TableProperties instance with default values
static TableProperties default_properties();

/// \brief Create a TableProperties instance from a map of key-value pairs.
///
/// \param properties The map containing property key-value pairs
Expand Down
Loading
Loading