From ad4464666b8509cf8e2cf4c5b14e407a07bb6141 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 22 Jan 2026 09:59:07 +0800 Subject: [PATCH] feat: add compression config to writer properties - refactor writer and reader properties not to use pointers - bump avro-cpp to use codec enum - adapter parquet and avro writers to respect codec config --- .../IcebergThirdpartyToolchain.cmake | 22 +++---- src/iceberg/avro/avro_reader.cc | 6 +- src/iceberg/avro/avro_writer.cc | 57 +++++++++++++++---- src/iceberg/file_reader.cc | 10 +--- src/iceberg/file_reader.h | 12 +--- src/iceberg/file_writer.cc | 10 +--- src/iceberg/file_writer.h | 22 +++---- src/iceberg/json_internal.cc | 2 - src/iceberg/manifest/manifest_writer.cc | 4 +- src/iceberg/parquet/parquet_reader.cc | 2 +- src/iceberg/parquet/parquet_writer.cc | 39 ++++++++++++- src/iceberg/table_properties.cc | 2 - src/iceberg/table_properties.h | 5 -- src/iceberg/test/avro_test.cc | 30 +++++----- src/iceberg/test/parquet_test.cc | 34 +++++++---- src/iceberg/test/snapshot_util_test.cc | 1 - .../test/table_metadata_builder_test.cc | 1 - .../test/update_partition_spec_test.cc | 1 - 18 files changed, 153 insertions(+), 107 deletions(-) diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index 3d2c9b3a7..289caa400 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -191,24 +191,16 @@ function(resolve_avro_dependency) NAMES avro-cpp CONFIG) - elseif(DEFINED ENV{ICEBERG_AVRO_GIT_URL}) - # Support custom git URL for mirrors - fetchcontent_declare(avro-cpp - ${FC_DECLARE_COMMON_OPTIONS} - GIT_REPOSITORY $ENV{ICEBERG_AVRO_GIT_URL} - GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5 - SOURCE_SUBDIR - lang/c++ - FIND_PACKAGE_ARGS - NAMES - avro-cpp - CONFIG) else() - # Default to GitHub - uses unreleased version + if(DEFINED ENV{ICEBERG_AVRO_GIT_URL}) + set(AVRO_GIT_REPOSITORY "$ENV{ICEBERG_AVRO_GIT_URL}") + else() + set(AVRO_GIT_REPOSITORY "https://github.com/apache/avro.git") + endif() fetchcontent_declare(avro-cpp ${FC_DECLARE_COMMON_OPTIONS} - GIT_REPOSITORY https://github.com/apache/avro.git - GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5 + GIT_REPOSITORY ${AVRO_GIT_REPOSITORY} + GIT_TAG 11fb55500bed9fbe9af53b85112cd13887f0ce80 SOURCE_SUBDIR lang/c++ FIND_PACKAGE_ARGS diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 9b3a1e252..f4985d9ac 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -226,17 +226,17 @@ class AvroReader::Impl { return InvalidArgument("Projected schema is required by Avro reader"); } - batch_size_ = options.properties->Get(ReaderProperties::kBatchSize); + batch_size_ = options.properties.Get(ReaderProperties::kBatchSize); read_schema_ = options.projection; // Open the input stream and adapt to the avro interface. ICEBERG_ASSIGN_OR_RAISE( auto input_stream, CreateInputStream(options, - options.properties->Get(ReaderProperties::kAvroBufferSize))); + options.properties.Get(ReaderProperties::kAvroBufferSize))); // Create the appropriate backend based on configuration - if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) { + if (options.properties.Get(ReaderProperties::kAvroSkipDatum)) { backend_ = std::make_unique(); } else { backend_ = std::make_unique(); diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 307f2fd62..91b2a93f6 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -52,12 +52,41 @@ Result> CreateOutputStream(const WriterOptions return std::make_unique(output, buffer_size); } +Result<::avro::Codec> ParseCodec(const WriterProperties& properties) { + const auto& codec_name = properties.Get(WriterProperties::kAvroCompression); + ::avro::Codec codec; + if (codec_name == "uncompressed") { + codec = ::avro::NULL_CODEC; + } else if (codec_name == "gzip") { + codec = ::avro::DEFLATE_CODEC; + } else if (codec_name == "snappy") { + codec = ::avro::SNAPPY_CODEC; + } else if (codec_name == "zstd") { + codec = ::avro::ZSTD_CODEC; + } else { + return InvalidArgument("Unsupported Avro codec: {}", codec_name); + } + ICEBERG_PRECHECK(::avro::isCodecAvailable(codec), + "Avro codec {} is not available in the current build", codec_name); + return codec; +} + +Result> ParseCodecLevel(const WriterProperties& properties) { + auto level_str = properties.Get(WriterProperties::kAvroCompressionLevel); + if (level_str.empty()) { + return std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(auto level, StringUtils::ParseInt(level_str)); + return level; +} + // Abstract base class for Avro write backends. class AvroWriteBackend { public: virtual ~AvroWriteBackend() = default; virtual Status Init(std::unique_ptr output_stream, const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + ::avro::Codec codec, std::optional compression_level, const std::map>& metadata) = 0; virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array& array, int64_t row_index) = 0; @@ -70,10 +99,11 @@ class DirectEncoderBackend : public AvroWriteBackend { public: Status Init(std::unique_ptr output_stream, const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + ::avro::Codec codec, std::optional compression_level, const std::map>& metadata) override { - writer_ = std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream), - avro_schema, sync_interval, - ::avro::NULL_CODEC, metadata); + writer_ = std::make_unique<::avro::DataFileWriterBase>( + std::move(output_stream), avro_schema, sync_interval, codec, metadata, + compression_level); avro_root_node_ = avro_schema.root(); return {}; } @@ -111,10 +141,11 @@ class GenericDatumBackend : public AvroWriteBackend { public: Status Init(std::unique_ptr output_stream, const ::avro::ValidSchema& avro_schema, int64_t sync_interval, + ::avro::Codec codec, std::optional compression_level, const std::map>& metadata) override { writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( - std::move(output_stream), avro_schema, sync_interval, ::avro::NULL_CODEC, - metadata); + std::move(output_stream), avro_schema, sync_interval, codec, metadata, + compression_level); datum_ = std::make_unique<::avro::GenericDatum>(avro_schema); return {}; } @@ -158,7 +189,7 @@ class AvroWriter::Impl { ::avro::NodePtr root; ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); if (const auto& schema_name = - options.properties->Get(WriterProperties::kAvroSchemaName); + options.properties.Get(WriterProperties::kAvroSchemaName); !schema_name.empty()) { root->setName(::avro::Name(schema_name)); } @@ -169,7 +200,7 @@ class AvroWriter::Impl { ICEBERG_ASSIGN_OR_RAISE( auto output_stream, CreateOutputStream(options, - options.properties->Get(WriterProperties::kAvroBufferSize))); + options.properties.Get(WriterProperties::kAvroBufferSize))); arrow_output_stream_ = output_stream->arrow_output_stream(); std::map> metadata; @@ -181,15 +212,19 @@ class AvroWriter::Impl { } // Create the appropriate backend based on configuration - if (options.properties->Get(WriterProperties::kAvroSkipDatum)) { + if (options.properties.Get(WriterProperties::kAvroSkipDatum)) { backend_ = std::make_unique(); } else { backend_ = std::make_unique(); } - ICEBERG_RETURN_UNEXPECTED(backend_->Init( - std::move(output_stream), *avro_schema_, - options.properties->Get(WriterProperties::kAvroSyncInterval), metadata)); + ICEBERG_ASSIGN_OR_RAISE(auto codec, ParseCodec(options.properties)); + ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties)); + + ICEBERG_RETURN_UNEXPECTED( + backend_->Init(std::move(output_stream), *avro_schema_, + options.properties.Get(WriterProperties::kAvroSyncInterval), codec, + compression_level, metadata)); ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); return {}; diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc index 022002a55..31b7b39d6 100644 --- a/src/iceberg/file_reader.cc +++ b/src/iceberg/file_reader.cc @@ -59,14 +59,10 @@ Result> ReaderFactoryRegistry::Open( return reader; } -std::unique_ptr ReaderProperties::default_properties() { - return std::unique_ptr(new ReaderProperties()); -} - -std::unique_ptr ReaderProperties::FromMap( +ReaderProperties ReaderProperties::FromMap( const std::unordered_map& properties) { - auto reader_properties = std::unique_ptr(new ReaderProperties()); - reader_properties->configs_ = properties; + ReaderProperties reader_properties; + reader_properties.configs_ = properties; return reader_properties; } diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index a54d2ee78..923ac6bdb 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -75,24 +75,16 @@ class ReaderProperties : public ConfigBase { /// \brief The batch size to read. inline static Entry kBatchSize{"read.batch-size", 4096}; - /// \brief Skip GenericDatum in Avro reader for better performance. /// When true, decode directly from Avro to Arrow without GenericDatum intermediate. /// Default: true (skip GenericDatum for better performance). inline static Entry kAvroSkipDatum{"read.avro.skip-datum", true}; - /// \brief The buffer size used by Avro input stream. inline static Entry kAvroBufferSize{"read.avro.buffer-size", 1024 * 1024}; - /// \brief Create a default ReaderProperties instance. - static std::unique_ptr default_properties(); - /// \brief Create a ReaderProperties instance from a map of key-value pairs. - static std::unique_ptr FromMap( + static ReaderProperties FromMap( const std::unordered_map& properties); - - private: - ReaderProperties() = default; }; /// \brief Options for creating a reader. @@ -116,7 +108,7 @@ struct ICEBERG_EXPORT ReaderOptions { /// that may have different field names than the current schema. std::shared_ptr name_mapping; /// \brief Format-specific or implementation-specific properties. - std::shared_ptr properties = ReaderProperties::default_properties(); + ReaderProperties properties; }; /// \brief Factory function to create a reader of a specific file format. diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc index ef0c84bd2..df397975f 100644 --- a/src/iceberg/file_writer.cc +++ b/src/iceberg/file_writer.cc @@ -59,14 +59,10 @@ Result> WriterFactoryRegistry::Open( return writer; } -std::unique_ptr WriterProperties::default_properties() { - return std::unique_ptr(new WriterProperties()); -} - -std::unique_ptr WriterProperties::FromMap( +WriterProperties WriterProperties::FromMap( const std::unordered_map& properties) { - auto writer_properties = std::unique_ptr(new WriterProperties()); - writer_properties->configs_ = properties; + WriterProperties writer_properties; + writer_properties.configs_ = properties; return writer_properties; } diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 959ca72cd..8913bed1a 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -42,28 +42,28 @@ class WriterProperties : public ConfigBase { /// \brief The name of the Avro root node schema to write. inline static Entry kAvroSchemaName{"write.avro.schema-name", ""}; - /// \brief The buffer size used by Avro output stream. inline static Entry kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024}; - /// \brief The sync interval used by Avro writer. inline static Entry kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024}; - /// \brief Whether to skip GenericDatum and use direct encoder for Avro writing. /// When true, uses direct encoder (faster). When false, uses GenericDatum. inline static Entry kAvroSkipDatum{"write.avro.skip-datum", true}; + inline static Entry kAvroCompression{"write.avro.compression-codec", + "gzip"}; + inline static Entry kAvroCompressionLevel{"write.avro.compression-level", + ""}; - /// TODO(gangwu): add more properties, like compression codec, compression level, etc. + inline static Entry kParquetCompression{"write.parquet.compression-codec", + "zstd"}; + inline static Entry kParquetCompressionLevel{ + "write.parquet.compression-level", ""}; - /// \brief Create a default WriterProperties instance. - static std::unique_ptr default_properties(); + /// TODO(gangwu): add table properties with write.avro|parquet|orc.* /// \brief Create a WriterProperties instance from a map of key-value pairs. - static std::unique_ptr FromMap( + static WriterProperties FromMap( const std::unordered_map& properties); - - private: - WriterProperties() = default; }; /// \brief Options for creating a writer. @@ -79,7 +79,7 @@ struct ICEBERG_EXPORT WriterOptions { /// \brief Metadata to write to the file. std::unordered_map metadata; /// \brief Format-specific or implementation-specific properties. - std::shared_ptr properties = WriterProperties::default_properties(); + WriterProperties properties; }; /// \brief Base writer class to write data from different file formats. diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index a614a4b9d..056af671c 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -1119,8 +1119,6 @@ Result> TableMetadataFromJson(const nlohmann::jso if (json.contains(kProperties)) { ICEBERG_ASSIGN_OR_RAISE(auto properties, FromJsonMap(json, kProperties)); table_metadata->properties = TableProperties::FromMap(std::move(properties)); - } else { - table_metadata->properties = TableProperties::default_properties(); } // This field is optional, but internally we set this to -1 when not set diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index 0045e2c06..36b34b8bc 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -256,9 +256,9 @@ Result> OpenFileWriter( std::string_view location, std::shared_ptr schema, std::shared_ptr file_io, std::unordered_map metadata, std::string_view schema_name) { - auto writer_properties = WriterProperties::default_properties(); + WriterProperties writer_properties; if (!schema_name.empty()) { - writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name)); + writer_properties.Set(WriterProperties::kAvroSchemaName, std::string(schema_name)); } ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( FileFormatType::kAvro, diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 4f4ac8e41..d13df0eb8 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -124,7 +124,7 @@ class ParquetReader::Impl { ::parquet::ReaderProperties reader_properties(pool_); ::parquet::ArrowReaderProperties arrow_reader_properties; arrow_reader_properties.set_batch_size( - options.properties->Get(ReaderProperties::kBatchSize)); + options.properties.Get(ReaderProperties::kBatchSize)); arrow_reader_properties.set_arrow_extensions_enabled(true); // Open the Parquet file reader diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 886348f03..bac25c9ef 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -46,13 +46,48 @@ Result> OpenOutputStream( return output; } +Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) { + const auto& compression_name = properties.Get(WriterProperties::kParquetCompression); + if (compression_name == "uncompressed") { + return ::arrow::Compression::UNCOMPRESSED; + } else if (compression_name == "snappy") { + return ::arrow::Compression::SNAPPY; + } else if (compression_name == "gzip") { + return ::arrow::Compression::GZIP; + } else if (compression_name == "brotli") { + return ::arrow::Compression::BROTLI; + } else if (compression_name == "lz4") { + return ::arrow::Compression::LZ4; + } else if (compression_name == "zstd") { + return ::arrow::Compression::ZSTD; + } else { + return InvalidArgument("Unsupported Parquet compression codec: ", compression_name); + } +} + +Result> ParseCodecLevel(const WriterProperties& properties) { + auto level_str = properties.Get(WriterProperties::kParquetCompressionLevel); + if (level_str.empty()) { + return std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(auto level, StringUtils::ParseInt(level_str)); + return level; +} + } // namespace class ParquetWriter::Impl { public: Status Open(const WriterOptions& options) { - auto writer_properties = - ::parquet::WriterProperties::Builder().memory_pool(pool_)->build(); + ICEBERG_ASSIGN_OR_RAISE(auto compression, ParseCompression(options.properties)); + ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties)); + + auto properties_builder = ::parquet::WriterProperties::Builder(); + properties_builder.compression(compression); + if (compression_level.has_value()) { + properties_builder.compression_level(compression_level.value()); + } + auto writer_properties = properties_builder.memory_pool(pool_)->build(); auto arrow_writer_properties = ::parquet::default_arrow_writer_properties(); ArrowSchema c_schema; diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_properties.cc index db6adedcf..3a074a755 100644 --- a/src/iceberg/table_properties.cc +++ b/src/iceberg/table_properties.cc @@ -38,8 +38,6 @@ const std::unordered_set& TableProperties::commit_properties() { return kCommitProperties; } -TableProperties TableProperties::default_properties() { return {}; } - TableProperties TableProperties::FromMap( std::unordered_map properties) { TableProperties table_properties; diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index 5d5c17db0..6b92ea1ff 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -291,11 +291,6 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { /// \brief Get the set of commit table property keys. static const std::unordered_set& commit_properties(); - /// \brief Create a default TableProperties instance. - /// - /// \return A unique pointer to a TableProperties instance with default values - static TableProperties default_properties(); - /// \brief Create a TableProperties instance from a map of key-value pairs. /// /// \param properties The map containing property key-value pairs diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 404f763aa..518948d30 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -142,8 +142,8 @@ class AvroReaderTest : public ::testing::Test { ASSERT_THAT(writer->Close(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto written_length, writer->length()); - auto reader_properties = ReaderProperties::default_properties(); - reader_properties->Set(ReaderProperties::kAvroSkipDatum, skip_datum_); + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kAvroSkipDatum, skip_datum_); auto reader_result = ReaderFactoryRegistry::Open( FileFormatType::kAvro, {.path = temp_avro_file_, @@ -210,8 +210,8 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared())}); - auto reader_properties = ReaderProperties::default_properties(); - reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2}); + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kBatchSize, int64_t{2}); auto reader_result = ReaderFactoryRegistry::Open( FileFormatType::kAvro, {.path = temp_avro_file_, @@ -228,25 +228,25 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { TEST_F(AvroReaderTest, BufferSizeConfiguration) { // Test default buffer size - auto properties1 = ReaderProperties::default_properties(); - ASSERT_EQ(properties1->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024); + ReaderProperties properties1; + ASSERT_EQ(properties1.Get(ReaderProperties::kAvroBufferSize), 1024 * 1024); // Test setting custom buffer size - auto properties2 = ReaderProperties::default_properties(); + ReaderProperties properties2; constexpr int64_t kCustomBufferSize = 2 * 1024 * 1024; // 2MB - properties2->Set(ReaderProperties::kAvroBufferSize, kCustomBufferSize); - ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize), kCustomBufferSize); + properties2.Set(ReaderProperties::kAvroBufferSize, kCustomBufferSize); + ASSERT_EQ(properties2.Get(ReaderProperties::kAvroBufferSize), kCustomBufferSize); // Test setting via FromMap std::unordered_map config_map = { {"read.avro.buffer-size", "4194304"} // 4MB }; auto properties3 = ReaderProperties::FromMap(config_map); - ASSERT_EQ(properties3->Get(ReaderProperties::kAvroBufferSize), 4194304); + ASSERT_EQ(properties3.Get(ReaderProperties::kAvroBufferSize), 4194304); // Test that unset returns to default - properties2->Unset(ReaderProperties::kAvroBufferSize); - ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024); + properties2.Unset(ReaderProperties::kAvroBufferSize); + ASSERT_EQ(properties2.Get(ReaderProperties::kAvroBufferSize), 1024 * 1024); } // Parameterized test fixture for testing both DirectDecoder and GenericDatum modes @@ -665,10 +665,10 @@ class AvroWriterTest : public ::testing::Test, std::unordered_map metadata = { {"writer_test", "direct_encoder"}}; - auto writer_properties = WriterProperties::default_properties(); - writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_); + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kAvroSkipDatum, skip_datum_); for (const auto& [key, value] : extra_properties) { - writer_properties->mutable_configs().emplace(key, value); + writer_properties.mutable_configs().emplace(key, value); } auto writer_result = WriterFactoryRegistry::Open( diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index a5e056d48..0d983db58 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -94,9 +94,16 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s std::unordered_map metadata = {{"k1", "v1"}, {"k2", "v2"}}; + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + auto writer_data = WriterFactoryRegistry::Open( - FileFormatType::kParquet, - {.path = basePath, .schema = schema, .io = file_io, .metadata = metadata}); + FileFormatType::kParquet, {.path = basePath, + .schema = schema, + .io = file_io, + .metadata = metadata, + .properties = std::move(writer_properties)}); ASSERT_THAT(writer_data, IsOk()) << "Failed to create writer: " << writer_data.error().message; auto writer = std::move(writer_data.value()); @@ -139,8 +146,14 @@ class ParquetReaderTest : public ::testing::Test { R"([[1, "Foo"],[2, "Bar"],[3, "Baz"]])") .ValueOrDie(); - ASSERT_TRUE(WriteArray( - array, {.path = temp_parquet_file_, .schema = schema, .io = file_io_})); + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + + ASSERT_TRUE(WriteArray(array, {.path = temp_parquet_file_, + .schema = schema, + .io = file_io_, + .properties = std::move(writer_properties)})); } void CreateSplitParquetFile() { @@ -244,8 +257,8 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) { auto schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32())}); - auto reader_properties = ReaderProperties::default_properties(); - reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2}); + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kBatchSize, int64_t{2}); auto reader_result = ReaderFactoryRegistry::Open( FileFormatType::kParquet, {.path = temp_parquet_file_, @@ -289,9 +302,8 @@ TEST_F(ParquetReaderTest, ReadSplit) { R"([[1, 0], [2, 1], [3, 2]])", R"([[1, 0], [2, 1]])", R"([[3, 2]])", "", "", }; - std::shared_ptr reader_properties = - ReaderProperties::default_properties(); - reader_properties->Set(ReaderProperties::kBatchSize, int64_t{100}); + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kBatchSize, int64_t{100}); for (size_t i = 0; i < splits.size(); ++i) { auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kParquet, @@ -345,8 +357,8 @@ TEST_F(ParquetReaderTest, ReadRowPositionWithBatchSize) { MetadataColumns::kRowPosition, }); - auto reader_properties = ReaderProperties::default_properties(); - reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2}); + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kBatchSize, int64_t{2}); ICEBERG_UNWRAP_OR_FAIL(auto reader, ReaderFactoryRegistry::Open( FileFormatType::kParquet, diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc index e4e17251e..a47b403da 100644 --- a/src/iceberg/test/snapshot_util_test.cc +++ b/src/iceberg/test/snapshot_util_test.cc @@ -83,7 +83,6 @@ std::shared_ptr CreateTableMetadataWithSnapshots( metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; - metadata->properties = TableProperties::default_properties(); // Create snapshots: base -> main1 -> main2 auto base_snapshot = CreateSnapshot(base_snapshot_id, std::nullopt, 1, base_timestamp); diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index 22df74302..4146cb01f 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -84,7 +84,6 @@ std::unique_ptr CreateBaseMetadata( metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; - metadata->properties = TableProperties::default_properties(); return metadata; } diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc index 914a28e8b..fc316aae2 100644 --- a/src/iceberg/test/update_partition_spec_test.cc +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -134,7 +134,6 @@ class UpdatePartitionSpecTest : public ::testing::TestWithParam { metadata->default_sort_order_id = SortOrder::kUnsortedOrderId; metadata->sort_orders.push_back(SortOrder::Unsorted()); metadata->next_row_id = TableMetadata::kInitialRowId; - metadata->properties = TableProperties::default_properties(); metadata->partition_specs.push_back(std::move(spec)); return metadata; }