Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions src/paimon/common/utils/data_converter_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include <charconv>
#include <cstdint>
#include <functional>
#include <iomanip>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <utility>

#include "arrow/type.h"
#include "fmt/core.h"
#include "fmt/format.h"
#include "fmt/ranges.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/core/casting/date_to_string_cast_executor.h"
#include "paimon/defs.h"
#include "paimon/predicate/literal.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace paimon {
class MemoryPool;

#define RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str, type) \
if ((value) == std::nullopt) { \
return Status::Invalid( \
fmt::format("cannot convert field idx {}, field value {} to type {}", field_idx, \
value_str, type)); \
}

class DataConverterUtils {
public:
DataConverterUtils() = delete;
~DataConverterUtils() = delete;

using StrToBinaryRowConverter =
std::function<Status(const std::string&, int32_t, BinaryRowWriter*)>;
using BinaryRowFieldToStrConverter =
std::function<Result<std::string>(const BinaryRow&, int32_t)>;

static Result<StrToBinaryRowConverter> CreateDataToBinaryRowConverter(arrow::Type::type type,
MemoryPool* pool) {
StrToBinaryRowConverter converter;
switch (type) {
case arrow::Type::BOOL:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
auto value = StringUtils::StringToValue<bool>(value_str);
RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
arrow::internal::ToString(arrow::Type::BOOL));
writer->WriteBoolean(field_idx, value.value());
return Status::OK();
};
break;
case arrow::Type::INT8:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
auto value = StringUtils::StringToValue<int8_t>(value_str);
RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
arrow::internal::ToString(arrow::Type::INT8));
writer->WriteByte(field_idx, value.value());
return Status::OK();
};
break;
case arrow::Type::INT16:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
auto value = StringUtils::StringToValue<int16_t>(value_str);
RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
arrow::internal::ToString(arrow::Type::INT16));
writer->WriteShort(field_idx, value.value());
return Status::OK();
};
break;
case arrow::Type::INT32:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
auto value = StringUtils::StringToValue<int32_t>(value_str);
RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
arrow::internal::ToString(arrow::Type::INT32));
writer->WriteInt(field_idx, value.value());
return Status::OK();
};
break;
case arrow::Type::INT64:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
auto value = StringUtils::StringToValue<int64_t>(value_str);
RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
arrow::internal::ToString(arrow::Type::INT64));
writer->WriteLong(field_idx, value.value());
return Status::OK();
};
break;
case arrow::Type::STRING:
converter = [pool](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
BinaryString value = BinaryString::FromString(value_str, pool);
writer->WriteString(field_idx, value);
return Status::OK();
};
break;
case arrow::Type::DATE32:
converter = [](const std::string& value_str, int32_t field_idx,
BinaryRowWriter* writer) {
PAIMON_ASSIGN_OR_RAISE(int32_t date_value,
StringUtils::StringToDate(value_str));
writer->WriteInt(field_idx, date_value);
return Status::OK();
};
break;
default:
return Status::NotImplemented(
fmt::format("Do not support type {} in partition binary row",
arrow::internal::ToString(type)));
}
return converter;
}

static Result<BinaryRowFieldToStrConverter> CreateBinaryRowFieldToStringConverter(
arrow::Type::type type, bool legacy_partition_name_enabled) {
BinaryRowFieldToStrConverter converter;
switch (type) {
case arrow::Type::BOOL:
converter = [](const BinaryRow& row, int32_t field_idx) {
bool data = row.GetBoolean(field_idx);
std::string result = data ? "true" : "false";
return result;
};
break;
case arrow::Type::INT8:
converter = [](const BinaryRow& row, int32_t field_idx) {
auto data = static_cast<int8_t>(row.GetByte(field_idx));
return std::to_string(data);
};
break;
case arrow::Type::INT16:
converter = [](const BinaryRow& row, int32_t field_idx) {
auto data = row.GetShort(field_idx);
return std::to_string(data);
};
break;
case arrow::Type::INT32:
converter = [](const BinaryRow& row, int32_t field_idx) {
auto data = row.GetInt(field_idx);
return std::to_string(data);
};
break;
case arrow::Type::INT64:
converter = [](const BinaryRow& row, int32_t field_idx) {
auto data = row.GetLong(field_idx);
return std::to_string(data);
};
break;
case arrow::Type::STRING:
converter = [](const BinaryRow& row, int32_t field_idx) {
BinaryString data = row.GetString(field_idx);
return data.ToString();
};
break;
case arrow::Type::DATE32: {
if (legacy_partition_name_enabled) {
converter = [](const BinaryRow& row, int32_t field_idx) -> Result<std::string> {
int32_t data = row.GetDate(field_idx);
return std::to_string(data);
};
} else {
auto date_to_string_cast_executor =
std::make_shared<DateToStringCastExecutor>();
converter = [date_to_string_cast_executor](
const BinaryRow& row,
int32_t field_idx) -> Result<std::string> {
int32_t data = row.GetDate(field_idx);
PAIMON_ASSIGN_OR_RAISE(Literal literal,
date_to_string_cast_executor->Cast(
Literal(FieldType::DATE, data), arrow::utf8()));
return literal.GetValue<std::string>();
};
}
break;
}
default:
return Status::NotImplemented(
fmt::format("Do not support arrow {} in partition binary row",
arrow::internal::ToString(type)));
}
return converter;
}
};

} // namespace paimon
145 changes: 145 additions & 0 deletions src/paimon/common/utils/data_converter_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "paimon/common/utils/data_converter_utils.h"

#include <cstddef>
#include <memory>
#include <vector>

#include "arrow/type_fwd.h"
#include "gtest/gtest.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {
TEST(DataConverterUtilsTest, TestDataToBinaryRowConverterWithLegacyPartitionName) {
auto pool = GetDefaultPool();
std::vector<std::pair<std::string, arrow::Type::type>> data = {
{"true", arrow::Type::BOOL},
{"10", arrow::Type::INT8},
{"-20", arrow::Type::INT8},
{"1556", arrow::Type::INT16},
{"-2556", arrow::Type::INT16},
{"348489", arrow::Type::INT32},
{"-448489", arrow::Type::INT32},
{"279039", arrow::Type::INT64},
{"1234567", arrow::Type::INT64},
{"abcde", arrow::Type::STRING},
{"这是一个很长很长的中文", arrow::Type::STRING},
{"10440", arrow::Type::DATE32}};

std::vector<DataConverterUtils::StrToBinaryRowConverter> converters;
std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> reconverters;
for (const auto& [value, type] : data) {
ASSERT_OK_AND_ASSIGN(auto converter,
DataConverterUtils::CreateDataToBinaryRowConverter(type, pool.get()));
converters.emplace_back(std::move(converter));
ASSERT_OK_AND_ASSIGN(auto reconverter,
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
type, /*legacy_partition_name_enabled=*/true));
reconverters.emplace_back(reconverter);
}
// test not implement type
ASSERT_NOK(DataConverterUtils::CreateDataToBinaryRowConverter(arrow::Type::LIST, pool.get()));

BinaryRow row(data.size());
BinaryRowWriter writer(&row, 0, pool.get());
for (size_t idx = 0; idx < data.size(); idx++) {
ASSERT_OK(converters[idx](data[idx].first, idx, &writer));
}
// test invalid str
ASSERT_NOK(converters[0]("abc", /*idx=*/0, &writer));
writer.Complete();

ASSERT_EQ(data.size(), row.GetFieldCount());
ASSERT_EQ(true, row.GetBoolean(0));
ASSERT_EQ(10, row.GetByte(1));
ASSERT_EQ(-20, row.GetByte(2));
ASSERT_EQ(1556, row.GetShort(3));
ASSERT_EQ(-2556, row.GetShort(4));
ASSERT_EQ(348489, row.GetInt(5));
ASSERT_EQ(-448489, row.GetInt(6));
ASSERT_EQ(279039, row.GetLong(7));
ASSERT_EQ(1234567, row.GetLong(8));
ASSERT_EQ("abcde", row.GetString(9).ToString());
ASSERT_EQ("这是一个很长很长的中文", row.GetString(10).ToString());
ASSERT_EQ(10440, row.GetDate(11));

for (size_t idx = 0; idx < data.size(); idx++) {
ASSERT_OK_AND_ASSIGN(auto partition_field_str, reconverters[idx](row, idx));
ASSERT_EQ(data[idx].first, partition_field_str);
}
}

TEST(DataConverterUtilsTest, TestDataToBinaryRowConverterWithNoLegacyPartitionName) {
auto pool = GetDefaultPool();
std::vector<std::pair<std::string, arrow::Type::type>> data = {
{"true", arrow::Type::BOOL},
{"10", arrow::Type::INT8},
{"-20", arrow::Type::INT8},
{"1556", arrow::Type::INT16},
{"-2556", arrow::Type::INT16},
{"348489", arrow::Type::INT32},
{"-448489", arrow::Type::INT32},
{"279039", arrow::Type::INT64},
{"1234567", arrow::Type::INT64},
{"abcde", arrow::Type::STRING},
{"这是一个很长很长的中文", arrow::Type::STRING},
{"1998-08-02", arrow::Type::DATE32}};

std::vector<DataConverterUtils::StrToBinaryRowConverter> converters;
std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> reconverters;
for (const auto& [value, type] : data) {
ASSERT_OK_AND_ASSIGN(auto converter,
DataConverterUtils::CreateDataToBinaryRowConverter(type, pool.get()));
converters.emplace_back(std::move(converter));
ASSERT_OK_AND_ASSIGN(auto reconverter,
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
type, /*legacy_partition_name_enabled=*/false));
reconverters.emplace_back(reconverter);
}
BinaryRow row(data.size());
BinaryRowWriter writer(&row, 0, pool.get());
for (size_t idx = 0; idx < data.size(); idx++) {
ASSERT_OK(converters[idx](data[idx].first, idx, &writer));
}
writer.Complete();

ASSERT_EQ(data.size(), row.GetFieldCount());
ASSERT_EQ(true, row.GetBoolean(0));
ASSERT_EQ(10, row.GetByte(1));
ASSERT_EQ(-20, row.GetByte(2));
ASSERT_EQ(1556, row.GetShort(3));
ASSERT_EQ(-2556, row.GetShort(4));
ASSERT_EQ(348489, row.GetInt(5));
ASSERT_EQ(-448489, row.GetInt(6));
ASSERT_EQ(279039, row.GetLong(7));
ASSERT_EQ(1234567, row.GetLong(8));
ASSERT_EQ("abcde", row.GetString(9).ToString());
ASSERT_EQ("这是一个很长很长的中文", row.GetString(10).ToString());
ASSERT_EQ(10440, row.GetDate(11));

for (size_t idx = 0; idx < data.size(); idx++) {
ASSERT_OK_AND_ASSIGN(auto partition_field_str, reconverters[idx](row, idx));
ASSERT_EQ(data[idx].first, partition_field_str);
}
}

} // namespace paimon::test
Loading