Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/paimon/common/fs/external_path_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <atomic>
#include <cstddef>
#include <memory>
#include <random>
Expand Down Expand Up @@ -45,12 +46,9 @@ class ExternalPathProvider {
///
/// @return the next external data path
std::string GetNextExternalDataPath(const std::string& file_name) {
position_++;
if (position_ == external_table_paths_.size()) {
position_ = 0;
}
size_t position = (++position_) % external_table_paths_.size();
return PathUtil::JoinPath(
PathUtil::JoinPath(external_table_paths_[position_], relative_bucket_path_), file_name);
PathUtil::JoinPath(external_table_paths_[position], relative_bucket_path_), file_name);
}

private:
Expand All @@ -66,6 +64,6 @@ class ExternalPathProvider {
private:
std::vector<std::string> external_table_paths_;
std::string relative_bucket_path_;
size_t position_;
std::atomic<size_t> position_;
};
} // namespace paimon
49 changes: 49 additions & 0 deletions src/paimon/common/fs/external_path_provider_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

#include "paimon/common/fs/external_path_provider.h"

#include <cstdint>
#include <future>
#include <mutex>
#include <set>
#include <vector>

#include "gtest/gtest.h"
#include "paimon/testing/utils/testharness.h"
Expand Down Expand Up @@ -62,4 +66,49 @@ TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) {
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
}));
}

TEST(ExternalPathProviderTest, TestGetNextExternalDataPathConcurrently) {
std::vector<std::string> external_table_paths;
external_table_paths.emplace_back("/tmp/external_path_a/");
external_table_paths.emplace_back("/tmp/external_path_b/");
external_table_paths.emplace_back("/tmp/external_path_c/");
std::string relative_bucket_path = "p0=1/p1=0/bucket-0";

ASSERT_OK_AND_ASSIGN(std::unique_ptr<ExternalPathProvider> provider,
ExternalPathProvider::Create(external_table_paths, relative_bucket_path));

const std::set<std::string> expected_data_paths = {
"/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc",
"/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc",
"/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc",
};
std::mutex mutex;
std::vector<std::string> result_data_paths;
constexpr int32_t kThreadCount = 8;
constexpr int32_t kPathCountPerThread = 1000;

std::vector<std::future<void>> futures;
futures.reserve(kThreadCount);
for (int32_t i = 0; i < kThreadCount; ++i) {
futures.emplace_back(std::async(std::launch::async, [&]() {
std::vector<std::string> local_paths;
local_paths.reserve(kPathCountPerThread);
for (int32_t j = 0; j < kPathCountPerThread; ++j) {
local_paths.push_back(provider->GetNextExternalDataPath("file.orc"));
}

std::lock_guard<std::mutex> lock(mutex);
result_data_paths.insert(result_data_paths.end(), local_paths.begin(),
local_paths.end());
}));
}
for (auto& future : futures) {
future.get();
}

ASSERT_EQ(result_data_paths.size(), kThreadCount * kPathCountPerThread);
for (const auto& data_path : result_data_paths) {
ASSERT_TRUE(expected_data_paths.count(data_path)) << data_path;
}
}
} // namespace paimon::test
83 changes: 80 additions & 3 deletions src/paimon/common/fs/file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ class FileSystemTest : public ::testing::Test, public ::testing::WithParamInterf
std::string GetTestDir() const {
std::string file_system = GetParam();
if (file_system == "local") {
return paimon::test::GetDataDir();
std::string data_dir = paimon::test::GetDataDir();
if (data_dir.empty() || data_dir[0] != '/') {
EXPECT_OK_AND_ASSIGN(std::string current_path, PathUtil::GetCurrentPath());
data_dir = PathUtil::JoinPath(current_path, data_dir);
}
return data_dir;
} else if (file_system == "jindo") {
return "oss://paimon-unittest/test_data/";
}
Expand Down Expand Up @@ -209,6 +214,28 @@ TEST_P(FileSystemTest, TestCreate) {
ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already exists");
}

TEST_P(FileSystemTest, TestCreateRelativeFileInCurrentDirectory) {
if (GetParam() != "local") {
GTEST_SKIP() << "this test is only tested for the local file system";
}

std::string path = "relative_file_" + RandomName();
ASSERT_OK_AND_ASSIGN(auto out, fs_->Create(path, /*overwrite=*/true));
std::string content = "content";
ASSERT_OK_AND_ASSIGN(int32_t write_len, out->Write(content.data(), content.size()));
ASSERT_EQ(write_len, content.size());
ASSERT_OK_AND_ASSIGN(std::string uri, out->GetUri());
ASSERT_FALSE(uri.empty());
ASSERT_EQ(uri[0], '/');
ASSERT_EQ(PathUtil::GetName(uri), path);
ASSERT_OK(out->Close());

std::string read_content;
ASSERT_OK(fs_->ReadFile(path, &read_content));
ASSERT_EQ(read_content, content);
ASSERT_OK(fs_->Delete(path));
}

// --- write&read
TEST_P(FileSystemTest, TestSimpleWriteAndRead) {
std::string content = "abcdefghijk";
Expand Down Expand Up @@ -648,6 +675,27 @@ TEST_P(FileSystemTest, TestRename) {
"src file is not a dir");
}

TEST_P(FileSystemTest, TestRenameWithFileSchemeUsesNormalizedPath) {
if (GetParam() != "local") {
GTEST_SKIP() << "this test is only tested for the local file system";
}

const std::string src = "file:" + test_root_ + "/scheme_src.txt";
const std::string dst = "file:" + test_root_ + "/scheme_dst.txt";

ASSERT_OK(fs_->WriteFile(src, "content", /*overwrite=*/false));
ASSERT_OK(fs_->Rename(src, dst));

ASSERT_OK_AND_ASSIGN(bool src_exists, fs_->Exists(src));
ASSERT_FALSE(src_exists);
ASSERT_OK_AND_ASSIGN(bool dst_exists, fs_->Exists(dst));
ASSERT_TRUE(dst_exists);

std::string content;
ASSERT_OK(fs_->ReadFile(dst, &content));
ASSERT_EQ(content, "content");
}

TEST_P(FileSystemTest, TestRename2) {
{
// test rename dir
Expand Down Expand Up @@ -781,6 +829,19 @@ TEST_P(FileSystemTest, TestExists) {
ASSERT_TRUE(is_exist);
}

TEST_P(FileSystemTest, TestExistsInLocalFileSystem) {
if (GetParam() != "local") {
GTEST_SKIP() << "this test is only tested for the local file system";
}

ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists("/"));
ASSERT_TRUE(is_exist);
ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(""));
ASSERT_TRUE(is_exist);
ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists("."));
ASSERT_TRUE(is_exist);
}

// --- delete
TEST_P(FileSystemTest, TestExistingFileDeletion) {
auto check = [&](bool recursive) {
Expand Down Expand Up @@ -985,8 +1046,16 @@ TEST_P(FileSystemTest, TestMkdir) {
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
}

TEST_P(FileSystemTest, TestMkdirInLocalFileSystem) {
if (GetParam() != "local") {
GTEST_SKIP() << "this test is only tested for the local file system";
}

ASSERT_OK(fs_->Mkdirs("/"));
ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
ASSERT_OK(fs_->Mkdirs(""));
ASSERT_OK(fs_->Mkdirs("."));
}

TEST_P(FileSystemTest, TestMkdir2) {
Expand Down Expand Up @@ -1399,6 +1468,14 @@ TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) {
ASSERT_TRUE(is_exist);
}

INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local" /*, "jindo"*/));
std::vector<std::string> GetTestValuesForFileSystemTest() {
std::vector<std::string> values;
values.emplace_back("local");
// values.emplace_back("jindo");
return values;
}

INSTANTIATE_TEST_SUITE_P(FsType, FileSystemTest,
::testing::ValuesIn(GetTestValuesForFileSystemTest()));

} // namespace paimon::test
3 changes: 2 additions & 1 deletion src/paimon/common/table/special_fields.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ struct SpecialFields {

static bool IsSpecialFieldName(const std::string& field_name) {
if (field_name == SequenceNumber().Name() || field_name == ValueKind().Name() ||
field_name == RowId().Name() || field_name == IndexScore().Name()) {
field_name == RowKind().Name() || field_name == RowId().Name() ||
field_name == IndexScore().Name()) {
return true;
}
return false;
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/table/special_fields_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ TEST(SpecialFieldsTest, TestIsSpecialFieldName) {
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND"));
ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("rowkind"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID"));
ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE"));
}
Expand Down
13 changes: 13 additions & 0 deletions src/paimon/common/utils/path_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

#include "paimon/common/utils/path_util.h"

#include <unistd.h>

#include <cerrno>
#include <climits>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <utility>

#include "fmt/format.h"
Expand Down Expand Up @@ -142,6 +147,14 @@ void PathUtil::TrimLastDelim(std::string* dir_path) noexcept {
}
}

Result<std::string> PathUtil::GetCurrentPath() noexcept {
char cwd[PATH_MAX];
if (getcwd(cwd, sizeof(cwd)) != nullptr) {
return std::string(cwd);
}
return Status::IOError(fmt::format("get current path failed, ec: {}", std::strerror(errno)));
}
Comment on lines +150 to +156

Result<std::string> PathUtil::CreateTempPath(const std::string& path) noexcept {
std::string uuid;
if (!UUID::Generate(&uuid)) {
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/utils/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class PAIMON_EXPORT PathUtil {
static std::string GetParentDirPath(const std::string& path) noexcept;
static std::string GetName(const std::string& path) noexcept;
static void TrimLastDelim(std::string* dir_path) noexcept;
static Result<std::string> GetCurrentPath() noexcept;
static Result<std::string> CreateTempPath(const std::string& path) noexcept;
static Result<Path> ToPath(const std::string& path) noexcept;
static Result<std::string> NormalizePath(const std::string& path) noexcept;
Expand Down
22 changes: 22 additions & 0 deletions src/paimon/common/utils/path_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ TEST(PathUtilsTest, TestTrimLastDelim) {
}
}

TEST(PathUtilsTest, TestGetCurrentPath) {
ASSERT_OK_AND_ASSIGN(std::string current_path, PathUtil::GetCurrentPath());
ASSERT_FALSE(current_path.empty());
ASSERT_EQ(current_path[0], '/');
}

TEST(PathUtilsTest, TestToPath) {
{
std::string test_path = "";
Expand Down Expand Up @@ -128,6 +134,22 @@ TEST(PathUtilsTest, TestToPath) {
ASSERT_EQ(path.path, "/tmp/index");
ASSERT_EQ(path.ToString(), "/tmp/index");
}
{
std::string test_path = ".";
ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
ASSERT_EQ(path.scheme, "");
ASSERT_EQ(path.authority, "");
ASSERT_EQ(path.path, ".");
ASSERT_EQ(path.ToString(), ".");
}
{
std::string test_path = "relative/path";
ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
ASSERT_EQ(path.scheme, "");
ASSERT_EQ(path.authority, "");
ASSERT_EQ(path.path, "relative/path");
ASSERT_EQ(path.ToString(), "relative/path");
}
}

TEST(PathUtilsTest, TestGetName) {
Expand Down
39 changes: 28 additions & 11 deletions src/paimon/fs/local/local_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "fmt/format.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/fs/local/local_file_status.h"

namespace paimon {
Expand All @@ -37,6 +38,25 @@ namespace paimon {
PAIMON_RETURN_NOT_OK(hook_->Try(path_)); \
}

Result<LocalFile> LocalFile::Create(const std::string& path_string) {
if (path_string.empty()) {
PAIMON_ASSIGN_OR_RAISE(std::string current_path, PathUtil::GetCurrentPath());
return LocalFile(current_path);
}

// local file system does not support path_string with scheme, e.g., "file:/tmp" will be
// rewritten to "/tmp"
PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string));
if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != "file") {
return Status::Invalid(fmt::format("invalid scheme {} for local file system", path.scheme));
}
if (path.path.empty() || path.path[0] != '/') {
PAIMON_ASSIGN_OR_RAISE(std::string current_path, PathUtil::GetCurrentPath());
return LocalFile(PathUtil::JoinPath(current_path, path.path));
}
return LocalFile(path.path);
}

LocalFile::LocalFile(const std::string& path) : path_(path), hook_(IOHook::GetInstance()) {}

Result<bool> LocalFile::Exists() const {
Expand Down Expand Up @@ -104,7 +124,7 @@ Status LocalFile::ListFiles(std::vector<LocalFile>* file_list) const {
std::vector<std::string> file_names;
PAIMON_RETURN_NOT_OK(List(&file_names));
for (const auto& file_name : file_names) {
file_list->emplace_back(PathUtil::JoinPath(path_, file_name));
file_list->push_back(LocalFile(PathUtil::JoinPath(path_, file_name)));
}
return Status::OK();
}
Expand Down Expand Up @@ -162,7 +182,7 @@ LocalFile LocalFile::GetParentFile() const {
}
}

const std::string& LocalFile::GetAbsolutePath() const {
const std::string& LocalFile::GetPath() const {
return path_;
}

Expand All @@ -181,9 +201,8 @@ Result<int32_t> LocalFile::Read(char* buffer, uint32_t length, uint64_t offset)
while (more > 0) {
ret = ::pread(fd, buffer + off, more, offset + off);
if (ret == -1) {
return Status::IOError(
fmt::format("pread file '{}' fail at off {}, with error {}, ec: {}", path_, off,
strerror(errno), std::strerror(errno)));
return Status::IOError(fmt::format("pread file '{}' fail at off {}, ec: {}", path_,
off, std::strerror(errno)));
}
if (ret == 0) {
break;
Expand Down Expand Up @@ -211,9 +230,8 @@ Result<int32_t> LocalFile::Read(char* buffer, uint32_t length) {
while (more > 0) {
ret = fread(buffer + off, 1, more, file_);
if (ferror(file_) != 0) {
return Status::IOError(
fmt::format("read file '{}' fail at off {}, with error {}, ec: {}", path_, off,
strerror(errno), std::strerror(errno)));
return Status::IOError(fmt::format("read file '{}' fail at off {}, ec: {}", path_,
off, std::strerror(errno)));
}
more -= ret;
off += ret;
Expand Down Expand Up @@ -242,9 +260,8 @@ Result<int32_t> LocalFile::Write(const char* buffer, uint32_t length) {
while (more > 0) {
ret = fwrite(buffer + off, 1, more, file_);
if (ferror(file_) != 0) {
return Status::IOError(fmt::format("write file '{}' fail, with error {}, ec: {}",
path_, off, strerror(errno),
std::strerror(errno)));
return Status::IOError(fmt::format("write file '{}' fail at off {}, ec: {}", path_,
off, std::strerror(errno)));
}
more -= ret;
off += ret;
Expand Down
Loading
Loading