diff --git a/src/paimon/common/fs/external_path_provider.h b/src/paimon/common/fs/external_path_provider.h index acfa1315c..54c8f8861 100644 --- a/src/paimon/common/fs/external_path_provider.h +++ b/src/paimon/common/fs/external_path_provider.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -45,12 +46,9 @@ class ExternalPathProvider { /// /// @return the next external data path std::string GetNextExternalDataPath(const std::string& file_name) { - position_++; - if (position_ == external_table_paths_.size()) { - position_ = 0; - } + size_t position = (++position_) % external_table_paths_.size(); return PathUtil::JoinPath( - PathUtil::JoinPath(external_table_paths_[position_], relative_bucket_path_), file_name); + PathUtil::JoinPath(external_table_paths_[position], relative_bucket_path_), file_name); } private: @@ -66,6 +64,6 @@ class ExternalPathProvider { private: std::vector external_table_paths_; std::string relative_bucket_path_; - size_t position_; + std::atomic position_; }; } // namespace paimon diff --git a/src/paimon/common/fs/external_path_provider_test.cpp b/src/paimon/common/fs/external_path_provider_test.cpp index 70258a8f3..d75e9fbed 100644 --- a/src/paimon/common/fs/external_path_provider_test.cpp +++ b/src/paimon/common/fs/external_path_provider_test.cpp @@ -16,7 +16,11 @@ #include "paimon/common/fs/external_path_provider.h" +#include +#include +#include #include +#include #include "gtest/gtest.h" #include "paimon/testing/utils/testharness.h" @@ -62,4 +66,49 @@ TEST(ExternalPathProviderTest, TestGetNextExternalDataPath2) { "/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc", })); } + +TEST(ExternalPathProviderTest, TestGetNextExternalDataPathConcurrently) { + std::vector external_table_paths; + external_table_paths.emplace_back("/tmp/external_path_a/"); + external_table_paths.emplace_back("/tmp/external_path_b/"); + external_table_paths.emplace_back("/tmp/external_path_c/"); + std::string relative_bucket_path = "p0=1/p1=0/bucket-0"; + + ASSERT_OK_AND_ASSIGN(std::unique_ptr provider, + ExternalPathProvider::Create(external_table_paths, relative_bucket_path)); + + const std::set expected_data_paths = { + "/tmp/external_path_a/p0=1/p1=0/bucket-0/file.orc", + "/tmp/external_path_b/p0=1/p1=0/bucket-0/file.orc", + "/tmp/external_path_c/p0=1/p1=0/bucket-0/file.orc", + }; + std::mutex mutex; + std::vector result_data_paths; + constexpr int32_t kThreadCount = 8; + constexpr int32_t kPathCountPerThread = 1000; + + std::vector> futures; + futures.reserve(kThreadCount); + for (int32_t i = 0; i < kThreadCount; ++i) { + futures.emplace_back(std::async(std::launch::async, [&]() { + std::vector local_paths; + local_paths.reserve(kPathCountPerThread); + for (int32_t j = 0; j < kPathCountPerThread; ++j) { + local_paths.push_back(provider->GetNextExternalDataPath("file.orc")); + } + + std::lock_guard lock(mutex); + result_data_paths.insert(result_data_paths.end(), local_paths.begin(), + local_paths.end()); + })); + } + for (auto& future : futures) { + future.get(); + } + + ASSERT_EQ(result_data_paths.size(), kThreadCount * kPathCountPerThread); + for (const auto& data_path : result_data_paths) { + ASSERT_TRUE(expected_data_paths.count(data_path)) << data_path; + } +} } // namespace paimon::test diff --git a/src/paimon/common/fs/file_system_test.cpp b/src/paimon/common/fs/file_system_test.cpp index bf177d1ab..0dcb3d670 100644 --- a/src/paimon/common/fs/file_system_test.cpp +++ b/src/paimon/common/fs/file_system_test.cpp @@ -159,7 +159,12 @@ class FileSystemTest : public ::testing::Test, public ::testing::WithParamInterf std::string GetTestDir() const { std::string file_system = GetParam(); if (file_system == "local") { - return paimon::test::GetDataDir(); + std::string data_dir = paimon::test::GetDataDir(); + if (data_dir.empty() || data_dir[0] != '/') { + EXPECT_OK_AND_ASSIGN(std::string current_path, PathUtil::GetWorkingDirectory()); + data_dir = PathUtil::JoinPath(current_path, data_dir); + } + return data_dir; } else if (file_system == "jindo") { return "oss://paimon-unittest/test_data/"; } @@ -209,6 +214,28 @@ TEST_P(FileSystemTest, TestCreate) { ASSERT_NOK_WITH_MSG(fs_->Create(path, /*overwrite=*/false), "already exists"); } +TEST_P(FileSystemTest, TestCreateRelativeFileInCurrentDirectory) { + if (GetParam() != "local") { + GTEST_SKIP() << "this test is only tested for the local file system"; + } + + std::string path = "relative_file_" + RandomName(); + ASSERT_OK_AND_ASSIGN(auto out, fs_->Create(path, /*overwrite=*/true)); + std::string content = "content"; + ASSERT_OK_AND_ASSIGN(int32_t write_len, out->Write(content.data(), content.size())); + ASSERT_EQ(write_len, content.size()); + ASSERT_OK_AND_ASSIGN(std::string uri, out->GetUri()); + ASSERT_FALSE(uri.empty()); + ASSERT_EQ(uri[0], '/'); + ASSERT_EQ(PathUtil::GetName(uri), path); + ASSERT_OK(out->Close()); + + std::string read_content; + ASSERT_OK(fs_->ReadFile(path, &read_content)); + ASSERT_EQ(read_content, content); + ASSERT_OK(fs_->Delete(path)); +} + // --- write&read TEST_P(FileSystemTest, TestSimpleWriteAndRead) { std::string content = "abcdefghijk"; @@ -648,6 +675,27 @@ TEST_P(FileSystemTest, TestRename) { "src file is not a dir"); } +TEST_P(FileSystemTest, TestRenameWithFileSchemeUsesNormalizedPath) { + if (GetParam() != "local") { + GTEST_SKIP() << "this test is only tested for the local file system"; + } + + const std::string src = "file:" + test_root_ + "/scheme_src.txt"; + const std::string dst = "file:" + test_root_ + "/scheme_dst.txt"; + + ASSERT_OK(fs_->WriteFile(src, "content", /*overwrite=*/false)); + ASSERT_OK(fs_->Rename(src, dst)); + + ASSERT_OK_AND_ASSIGN(bool src_exists, fs_->Exists(src)); + ASSERT_FALSE(src_exists); + ASSERT_OK_AND_ASSIGN(bool dst_exists, fs_->Exists(dst)); + ASSERT_TRUE(dst_exists); + + std::string content; + ASSERT_OK(fs_->ReadFile(dst, &content)); + ASSERT_EQ(content, "content"); +} + TEST_P(FileSystemTest, TestRename2) { { // test rename dir @@ -781,6 +829,19 @@ TEST_P(FileSystemTest, TestExists) { ASSERT_TRUE(is_exist); } +TEST_P(FileSystemTest, TestExistsInLocalFileSystem) { + if (GetParam() != "local") { + GTEST_SKIP() << "this test is only tested for the local file system"; + } + + ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists("/")); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists("")); + ASSERT_TRUE(is_exist); + ASSERT_OK_AND_ASSIGN(is_exist, fs_->Exists(".")); + ASSERT_TRUE(is_exist); +} + // --- delete TEST_P(FileSystemTest, TestExistingFileDeletion) { auto check = [&](bool recursive) { @@ -985,8 +1046,16 @@ TEST_P(FileSystemTest, TestMkdir) { ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1")); ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1")); ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/")); +} + +TEST_P(FileSystemTest, TestMkdirInLocalFileSystem) { + if (GetParam() != "local") { + GTEST_SKIP() << "this test is only tested for the local file system"; + } + ASSERT_OK(fs_->Mkdirs("/")); - ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string."); + ASSERT_OK(fs_->Mkdirs("")); + ASSERT_OK(fs_->Mkdirs(".")); } TEST_P(FileSystemTest, TestMkdir2) { @@ -1399,6 +1468,14 @@ TEST_P(FileSystemTest, TestAtomicStoreAlreadyExist) { ASSERT_TRUE(is_exist); } -INSTANTIATE_TEST_SUITE_P(UseLocal, FileSystemTest, ::testing::Values("local" /*, "jindo"*/)); +std::vector GetTestValuesForFileSystemTest() { + std::vector values; + values.emplace_back("local"); + // values.emplace_back("jindo"); + return values; +} + +INSTANTIATE_TEST_SUITE_P(FsType, FileSystemTest, + ::testing::ValuesIn(GetTestValuesForFileSystemTest())); } // namespace paimon::test diff --git a/src/paimon/common/table/special_fields.h b/src/paimon/common/table/special_fields.h index 438fb272e..e5f920185 100644 --- a/src/paimon/common/table/special_fields.h +++ b/src/paimon/common/table/special_fields.h @@ -65,7 +65,8 @@ struct SpecialFields { static bool IsSpecialFieldName(const std::string& field_name) { if (field_name == SequenceNumber().Name() || field_name == ValueKind().Name() || - field_name == RowId().Name() || field_name == IndexScore().Name()) { + field_name == RowKind().Name() || field_name == RowId().Name() || + field_name == IndexScore().Name()) { return true; } return false; diff --git a/src/paimon/common/table/special_fields_test.cpp b/src/paimon/common/table/special_fields_test.cpp index 74482fc54..b43b99310 100644 --- a/src/paimon/common/table/special_fields_test.cpp +++ b/src/paimon/common/table/special_fields_test.cpp @@ -61,6 +61,7 @@ TEST(SpecialFieldsTest, TestIsSpecialFieldName) { ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER")); ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND")); ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND")); + ASSERT_TRUE(SpecialFields::IsSpecialFieldName("rowkind")); ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID")); ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE")); } diff --git a/src/paimon/common/utils/path_util.cpp b/src/paimon/common/utils/path_util.cpp index f4e61f499..495e41c3f 100644 --- a/src/paimon/common/utils/path_util.cpp +++ b/src/paimon/common/utils/path_util.cpp @@ -16,8 +16,13 @@ #include "paimon/common/utils/path_util.h" +#include + +#include +#include #include #include +#include #include #include "fmt/format.h" @@ -142,6 +147,17 @@ void PathUtil::TrimLastDelim(std::string* dir_path) noexcept { } } +Result PathUtil::GetWorkingDirectory() noexcept { + char* path = getcwd(nullptr, 0); + if (path != nullptr) { + std::string ret(path); + free(path); + return ret; + } + return Status::IOError( + fmt::format("get working directory failed, ec: {}", std::strerror(errno))); +} + Result PathUtil::CreateTempPath(const std::string& path) noexcept { std::string uuid; if (!UUID::Generate(&uuid)) { diff --git a/src/paimon/common/utils/path_util.h b/src/paimon/common/utils/path_util.h index 1d3c921cb..47eefef17 100644 --- a/src/paimon/common/utils/path_util.h +++ b/src/paimon/common/utils/path_util.h @@ -44,6 +44,7 @@ class PAIMON_EXPORT PathUtil { static std::string GetParentDirPath(const std::string& path) noexcept; static std::string GetName(const std::string& path) noexcept; static void TrimLastDelim(std::string* dir_path) noexcept; + static Result GetWorkingDirectory() noexcept; static Result CreateTempPath(const std::string& path) noexcept; static Result ToPath(const std::string& path) noexcept; static Result NormalizePath(const std::string& path) noexcept; diff --git a/src/paimon/common/utils/path_util_test.cpp b/src/paimon/common/utils/path_util_test.cpp index 3058f39db..f400c0609 100644 --- a/src/paimon/common/utils/path_util_test.cpp +++ b/src/paimon/common/utils/path_util_test.cpp @@ -91,6 +91,12 @@ TEST(PathUtilsTest, TestTrimLastDelim) { } } +TEST(PathUtilsTest, TestGetWorkingDirectory) { + ASSERT_OK_AND_ASSIGN(std::string current_path, PathUtil::GetWorkingDirectory()); + ASSERT_FALSE(current_path.empty()); + ASSERT_EQ(current_path[0], '/'); +} + TEST(PathUtilsTest, TestToPath) { { std::string test_path = ""; @@ -128,6 +134,22 @@ TEST(PathUtilsTest, TestToPath) { ASSERT_EQ(path.path, "/tmp/index"); ASSERT_EQ(path.ToString(), "/tmp/index"); } + { + std::string test_path = "."; + ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path)); + ASSERT_EQ(path.scheme, ""); + ASSERT_EQ(path.authority, ""); + ASSERT_EQ(path.path, "."); + ASSERT_EQ(path.ToString(), "."); + } + { + std::string test_path = "relative/path"; + ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path)); + ASSERT_EQ(path.scheme, ""); + ASSERT_EQ(path.authority, ""); + ASSERT_EQ(path.path, "relative/path"); + ASSERT_EQ(path.ToString(), "relative/path"); + } } TEST(PathUtilsTest, TestGetName) { diff --git a/src/paimon/fs/local/local_file.cpp b/src/paimon/fs/local/local_file.cpp index d67be522f..4d8233415 100644 --- a/src/paimon/fs/local/local_file.cpp +++ b/src/paimon/fs/local/local_file.cpp @@ -27,6 +27,7 @@ #include "fmt/format.h" #include "paimon/common/factories/io_hook.h" #include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" #include "paimon/fs/local/local_file_status.h" namespace paimon { @@ -37,6 +38,26 @@ namespace paimon { PAIMON_RETURN_NOT_OK(hook_->Try(path_)); \ } +Result> LocalFile::Create(const std::string& path_string) { + if (path_string.empty()) { + PAIMON_ASSIGN_OR_RAISE(std::string current_path, PathUtil::GetWorkingDirectory()); + return std::unique_ptr(new LocalFile(current_path)); + } + + // local file system does not support path_string with scheme, e.g., "file:/tmp" will be + // rewritten to "/tmp" + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string)); + if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != "file") { + return Status::Invalid(fmt::format("invalid scheme {} for local file system", path.scheme)); + } + if (path.path.empty() || path.path[0] != '/') { + PAIMON_ASSIGN_OR_RAISE(std::string current_path, PathUtil::GetWorkingDirectory()); + return std::unique_ptr( + new LocalFile(PathUtil::JoinPath(current_path, path.path))); + } + return std::unique_ptr(new LocalFile(path.path)); +} + LocalFile::LocalFile(const std::string& path) : path_(path), hook_(IOHook::GetInstance()) {} Result LocalFile::Exists() const { @@ -104,7 +125,7 @@ Status LocalFile::ListFiles(std::vector* file_list) const { std::vector file_names; PAIMON_RETURN_NOT_OK(List(&file_names)); for (const auto& file_name : file_names) { - file_list->emplace_back(PathUtil::JoinPath(path_, file_name)); + file_list->push_back(LocalFile(PathUtil::JoinPath(path_, file_name))); } return Status::OK(); } @@ -162,7 +183,7 @@ LocalFile LocalFile::GetParentFile() const { } } -const std::string& LocalFile::GetAbsolutePath() const { +const std::string& LocalFile::GetPath() const { return path_; } @@ -181,9 +202,8 @@ Result LocalFile::Read(char* buffer, uint32_t length, uint64_t offset) while (more > 0) { ret = ::pread(fd, buffer + off, more, offset + off); if (ret == -1) { - return Status::IOError( - fmt::format("pread file '{}' fail at off {}, with error {}, ec: {}", path_, off, - strerror(errno), std::strerror(errno))); + return Status::IOError(fmt::format("pread file '{}' fail at off {}, ec: {}", path_, + off, std::strerror(errno))); } if (ret == 0) { break; @@ -211,9 +231,8 @@ Result LocalFile::Read(char* buffer, uint32_t length) { while (more > 0) { ret = fread(buffer + off, 1, more, file_); if (ferror(file_) != 0) { - return Status::IOError( - fmt::format("read file '{}' fail at off {}, with error {}, ec: {}", path_, off, - strerror(errno), std::strerror(errno))); + return Status::IOError(fmt::format("read file '{}' fail at off {}, ec: {}", path_, + off, std::strerror(errno))); } more -= ret; off += ret; @@ -242,9 +261,8 @@ Result LocalFile::Write(const char* buffer, uint32_t length) { while (more > 0) { ret = fwrite(buffer + off, 1, more, file_); if (ferror(file_) != 0) { - return Status::IOError(fmt::format("write file '{}' fail, with error {}, ec: {}", - path_, off, strerror(errno), - std::strerror(errno))); + return Status::IOError(fmt::format("write file '{}' fail at off {}, ec: {}", path_, + off, std::strerror(errno))); } more -= ret; off += ret; diff --git a/src/paimon/fs/local/local_file.h b/src/paimon/fs/local/local_file.h index cb4b3532f..e7b0cf9cd 100644 --- a/src/paimon/fs/local/local_file.h +++ b/src/paimon/fs/local/local_file.h @@ -36,7 +36,7 @@ class LocalFileStatus; class LocalFile { public: - explicit LocalFile(const std::string& path); + static Result> Create(const std::string& path_string); ~LocalFile() = default; Result Exists() const; @@ -45,16 +45,13 @@ class LocalFile { Status List(std::vector* file_list) const; Status ListFiles(std::vector* file_list) const; Status Delete() const; - const std::string& GetAbsolutePath() const; + const std::string& GetPath() const; LocalFile GetParentFile() const; Result Mkdir() const; Result> GetFileStatus() const; Result Length() const; Result LastModifiedTimeMs() const; Status OpenFile(bool is_read_file); - Result Read() { - return Status::NotImplemented(""); - } Result Read(char* buffer, uint32_t length); Result Read(char* buffer, uint32_t length, uint64_t offset); Result Write(const char* buffer, uint32_t length); @@ -68,6 +65,8 @@ class LocalFile { } private: + explicit LocalFile(const std::string& path); + const std::string path_; FILE* file_ = nullptr; IOHook* hook_; diff --git a/src/paimon/fs/local/local_file_system.cpp b/src/paimon/fs/local/local_file_system.cpp index 07a3fe34c..669e85101 100644 --- a/src/paimon/fs/local/local_file_system.cpp +++ b/src/paimon/fs/local/local_file_system.cpp @@ -26,24 +26,13 @@ #include "fmt/format.h" #include "paimon/common/utils/path_util.h" -#include "paimon/common/utils/string_utils.h" #include "paimon/fs/local/local_file_status.h" namespace paimon { -Result LocalFileSystem::ToFile(const std::string& path_string) const { - // local file system does not support path_string with scheme, e.g., "file:/tmp" will be - // rewritten to "/tmp" - PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(path_string)); - if (!path.scheme.empty() && StringUtils::ToLowerCase(path.scheme) != "file") { - return Status::Invalid(fmt::format("invalid scheme {} for local file system", path.scheme)); - } - return LocalFile(path.path); -} - Result LocalFileSystem::Exists(const std::string& path) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - return file.Exists(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + return file->Exists(); } Result> LocalFileSystem::Open(const std::string& path) const { @@ -51,8 +40,8 @@ Result> LocalFileSystem::Open(const std::string& pa if (!is_exist) { return Status::NotExist(fmt::format("File '{}' not exists", path)); } - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr in, LocalInputStream::Create(file)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr in, LocalInputStream::Create(*file)); return in; } @@ -63,16 +52,17 @@ Result> LocalFileSystem::Create(const std::string& return Status::Invalid( fmt::format("do not allow overwrite, but the file {} already exists", path)); } - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - LocalFile parent = file.GetParentFile(); - PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath())); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr out, LocalOutputStream::Create(file)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + LocalFile parent = file->GetParentFile(); + PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetPath())); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr out, + LocalOutputStream::Create(*file)); return out; } Status LocalFileSystem::Mkdirs(const std::string& path) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - return MkdirsInternal(file); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + return MkdirsInternal(*file); } Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const { @@ -85,8 +75,8 @@ Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const { return Status::OK(); } else { // exists and is not a directory -> is a regular file - return Status::IOError(fmt::format("file {} already exists and is not a directory", - file.GetAbsolutePath())); + return Status::IOError( + fmt::format("file {} already exists and is not a directory", file.GetPath())); } } @@ -100,41 +90,40 @@ Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const { if (is_dir) { return Status::OK(); } else { - return Status::IOError( - fmt::format("create directory '{}' failed", file.GetAbsolutePath())); + return Status::IOError(fmt::format("create directory '{}' failed", file.GetPath())); } } return Status::OK(); } Result> LocalFileSystem::GetFileStatus(const std::string& path) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists()); if (is_exist) { - return file.GetFileStatus(); + return file->GetFileStatus(); } else { return Status::NotExist( fmt::format("File {} does not exist or the user running " "Paimon has insufficient permissions to access it.", - file.GetAbsolutePath())); + file->GetPath())); } } Status LocalFileSystem::ListDir( const std::string& directory, std::vector>* file_status_list) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(directory)); - PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(directory)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists()); if (!is_exist) { return Status::OK(); } - PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile()); if (is_file) { return Status::IOError( - fmt::format("file {} already exists and is not a directory", file.GetAbsolutePath())); + fmt::format("file {} already exists and is not a directory", file->GetPath())); } else { std::vector file_list; - PAIMON_RETURN_NOT_OK(file.List(&file_list)); + PAIMON_RETURN_NOT_OK(file->List(&file_list)); file_status_list->reserve(file_status_list->size() + file_list.size()); for (const auto& f : file_list) { Result> file_status = @@ -152,18 +141,18 @@ Status LocalFileSystem::ListDir( Status LocalFileSystem::ListFileStatus( const std::string& path, std::vector>* file_status_list) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, file->Exists()); if (!is_exist) { return Status::OK(); } - PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile()); if (is_file) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, file.GetFileStatus()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, file->GetFileStatus()); file_status_list->emplace_back(std::move(file_status)); } else { std::vector file_list; - PAIMON_RETURN_NOT_OK(file.List(&file_list)); + PAIMON_RETURN_NOT_OK(file->List(&file_list)); file_status_list->reserve(file_status_list->size() + file_list.size()); for (const auto& f : file_list) { Result> file_status = @@ -179,12 +168,12 @@ Status LocalFileSystem::ListFileStatus( } Status LocalFileSystem::Delete(const std::string& path, bool recursive) const { - PAIMON_ASSIGN_OR_RAISE(LocalFile file, ToFile(path)); - PAIMON_ASSIGN_OR_RAISE(bool is_file, file.IsFile()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file, LocalFile::Create(path)); + PAIMON_ASSIGN_OR_RAISE(bool is_file, file->IsFile()); if (is_file) { - return file.Delete(); + return file->Delete(); } - return Delete(file, recursive); + return Delete(*file, recursive); } Status LocalFileSystem::Delete(const LocalFile& f, bool recursive) const { @@ -194,7 +183,7 @@ Status LocalFileSystem::Delete(const LocalFile& f, bool recursive) const { PAIMON_RETURN_NOT_OK(f.ListFiles(&files)); if (recursive == false && !files.empty()) { return Status::IOError( - fmt::format("cannot delete {}, directory is not empty", f.GetAbsolutePath())); + fmt::format("cannot delete {}, directory is not empty", f.GetPath())); } for (const auto& file : files) { PAIMON_RETURN_NOT_OK(Delete(file)); @@ -214,17 +203,17 @@ Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) c if (is_dst_exist) { return Status::Invalid(err_msg, "dst file already exist"); } - PAIMON_ASSIGN_OR_RAISE(LocalFile src_file, ToFile(src)); - PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file.IsFile()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr src_file, LocalFile::Create(src)); + PAIMON_ASSIGN_OR_RAISE(bool is_file, src_file->IsFile()); std::string new_file_name = dst; if (is_file && new_file_name[new_file_name.length() - 1] == '/') { return Status::Invalid(err_msg, "src file is not a dir"); } - PAIMON_ASSIGN_OR_RAISE(LocalFile dst_file, ToFile(dst)); - auto parent = dst_file.GetParentFile(); - PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath())); - if (::rename(src.c_str(), dst.c_str()) != 0) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr dst_file, LocalFile::Create(dst)); + auto parent = dst_file->GetParentFile(); + PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetPath())); + if (::rename(src_file->GetPath().c_str(), dst_file->GetPath().c_str()) != 0) { int32_t cur_errno = errno; return Status::IOError(err_msg, std::strerror(cur_errno)); } @@ -266,8 +255,8 @@ Result LocalInputStream::GetPos() const { Result LocalInputStream::Read(char* buffer, uint32_t size) { PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size)); if (read_length != static_cast(size)) { - return Status::IOError(fmt::format("file '{}' read size {} != expected {}", - file_.GetAbsolutePath(), read_length, size)); + return Status::IOError(fmt::format("file '{}' read size {} != expected {}", file_.GetPath(), + read_length, size)); } return read_length; } @@ -275,8 +264,8 @@ Result LocalInputStream::Read(char* buffer, uint32_t size) { Result LocalInputStream::Read(char* buffer, uint32_t size, uint64_t offset) { PAIMON_ASSIGN_OR_RAISE(int32_t read_length, file_.Read(buffer, size, offset)); if (read_length != static_cast(size)) { - return Status::IOError(fmt::format("file '{}' read size {} != expected {}", - file_.GetAbsolutePath(), read_length, size)); + return Status::IOError(fmt::format("file '{}' read size {} != expected {}", file_.GetPath(), + read_length, size)); } return read_length; } diff --git a/src/paimon/fs/local/local_file_system.h b/src/paimon/fs/local/local_file_system.h index 9ab4bb1f2..892f14a6b 100644 --- a/src/paimon/fs/local/local_file_system.h +++ b/src/paimon/fs/local/local_file_system.h @@ -52,11 +52,6 @@ class LocalFileSystem : public FileSystem { std::vector>* file_status_list) const override; Result Exists(const std::string& path) const override; - /// Converts the given %Path to a File for this file system. If the path is empty, - /// we will return `new File(".")` instead of `new File("")`, since - /// the latter returns `false` for `isDirectory` judgement. - Result ToFile(const std::string& path) const; - private: // the lock to ensure atomic renaming static const std::mutex RENAME_LOCK; @@ -79,7 +74,7 @@ class LocalInputStream : public InputStream { Status Close() override; Result GetUri() const override { - return file_.GetAbsolutePath(); + return file_.GetPath(); } Result Length() const override; @@ -98,7 +93,7 @@ class LocalOutputStream : public OutputStream { Status Flush() override; Status Close() override; Result GetUri() const override { - return file_.GetAbsolutePath(); + return file_.GetPath(); } private: diff --git a/src/paimon/fs/local/local_file_test.cpp b/src/paimon/fs/local/local_file_test.cpp index 66489456a..6e8f5c3ce 100644 --- a/src/paimon/fs/local/local_file_test.cpp +++ b/src/paimon/fs/local/local_file_test.cpp @@ -29,36 +29,36 @@ TEST(LocalFileTest, TestReadWriteEmptyContent) { auto test_root_dir = UniqueTestDirectory::Create(); ASSERT_TRUE(test_root_dir); std::string test_root = test_root_dir->Str(); - LocalFile dir = LocalFile(test_root); - if (dir.Exists().ok()) { - ASSERT_TRUE(dir.Delete().ok()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root)); + if (dir->Exists().ok()) { + ASSERT_TRUE(dir->Delete().ok()); } - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); std::string path = test_root + "/test.txt"; - LocalFile file = LocalFile(path); - if (file.Exists().ok()) { - ASSERT_TRUE(file.Delete().ok()); + ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path)); + if (file->Exists().ok()) { + ASSERT_TRUE(file->Delete().ok()); } - ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists()); ASSERT_FALSE(is_exist); - ASSERT_OK(file.OpenFile(/*is_read_file=*/false)); + ASSERT_OK(file->OpenFile(/*is_read_file=*/false)); const char* str = ""; const int32_t str_size = 0; - ASSERT_OK_AND_ASSIGN(int32_t write_size, file.Write(str, str_size)); + ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size)); ASSERT_EQ(write_size, str_size); - ASSERT_OK(file.Flush()); - ASSERT_TRUE(file.Exists().value()); + ASSERT_OK(file->Flush()); + ASSERT_TRUE(file->Exists().value()); - ASSERT_OK(file.Close()); + ASSERT_OK(file->Close()); - LocalFile file2(path); - ASSERT_OK(file2.OpenFile(/*is_read_file=*/true)); + ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path)); + ASSERT_OK(file2->OpenFile(/*is_read_file=*/true)); char buffer[10]; - ASSERT_OK_AND_ASSIGN(int32_t read_len, file2.Read(buffer, 10)); + ASSERT_OK_AND_ASSIGN(int32_t read_len, file2->Read(buffer, 10)); ASSERT_EQ(0, read_len); } @@ -66,127 +66,127 @@ TEST(LocalFileTest, TestSimple) { auto test_root_dir = UniqueTestDirectory::Create(); ASSERT_TRUE(test_root_dir); std::string test_root = test_root_dir->Str(); - LocalFile dir = LocalFile(test_root); - if (dir.Exists().ok()) { - ASSERT_OK(dir.Delete()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root)); + if (dir->Exists().ok()) { + ASSERT_OK(dir->Delete()); } - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); std::string path = test_root + "/test.txt"; - LocalFile file = LocalFile(path); - if (file.Exists().ok()) { - ASSERT_OK(file.Delete()); + ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path)); + if (file->Exists().ok()) { + ASSERT_OK(file->Delete()); } - ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists()); ASSERT_FALSE(is_exist); - ASSERT_OK(file.OpenFile(/*is_read_file=*/false)); + ASSERT_OK(file->OpenFile(/*is_read_file=*/false)); const char* str = "test_data"; const int32_t str_size = 9; - ASSERT_OK_AND_ASSIGN(int32_t write_size, file.Write(str, str_size)); + ASSERT_OK_AND_ASSIGN(int32_t write_size, file->Write(str, str_size)); ASSERT_EQ(write_size, str_size); - ASSERT_OK(file.Flush()); - ASSERT_OK(file.Close()); + ASSERT_OK(file->Flush()); + ASSERT_OK(file->Close()); - ASSERT_OK_AND_ASSIGN(bool is_file, file.IsFile()); + ASSERT_OK_AND_ASSIGN(bool is_file, file->IsFile()); ASSERT_TRUE(is_file); - ASSERT_OK_AND_ASSIGN(bool is_dir, file.IsDir()); + ASSERT_OK_AND_ASSIGN(bool is_dir, file->IsDir()); ASSERT_FALSE(is_dir); std::vector file_list; - ASSERT_NOK(file.List(&file_list)); + ASSERT_NOK(file->List(&file_list)); - ASSERT_OK_AND_ASSIGN(size_t len, file.Length()); + ASSERT_OK_AND_ASSIGN(size_t len, file->Length()); ASSERT_EQ(len, str_size); - LocalFile file2 = LocalFile(path); - ASSERT_OK(file2.Exists()); + ASSERT_OK_AND_ASSIGN(auto file2, LocalFile::Create(path)); + ASSERT_OK(file2->Exists()); - ASSERT_OK(file2.OpenFile(true)); + ASSERT_OK(file2->OpenFile(true)); char str_read[str_size + 1]; { - ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, 4)); + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4)); ASSERT_EQ(read_size, 4); str_read[read_size] = '\0'; ASSERT_EQ(strcmp(str_read, "test"), 0); } { - ASSERT_OK_AND_ASSIGN(int64_t pos, file2.Tell()); + ASSERT_OK_AND_ASSIGN(int64_t pos, file2->Tell()); ASSERT_EQ(pos, 4); - ASSERT_OK(file2.Seek(5, SEEK_SET)); - ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, 4)); + ASSERT_OK(file2->Seek(5, SEEK_SET)); + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, 4)); ASSERT_EQ(read_size, 4); str_read[read_size] = '\0'; ASSERT_EQ(strcmp(str_read, "data"), 0); } { - ASSERT_OK_AND_ASSIGN(int32_t read_size, file2.Read(str_read, str_size, 0)); + ASSERT_OK_AND_ASSIGN(int32_t read_size, file2->Read(str_read, str_size, 0)); ASSERT_EQ(read_size, str_size); str_read[read_size] = '\0'; ASSERT_EQ(strcmp(str_read, "test_data"), 0); } // dir already exists - ASSERT_OK_AND_ASSIGN(success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(success, dir->Mkdir()); ASSERT_FALSE(success); - ASSERT_OK(file2.Delete()); - ASSERT_FALSE(file2.Exists().value()); + ASSERT_OK(file2->Delete()); + ASSERT_FALSE(file2->Exists().value()); } TEST(LocalFileTest, TestUsage) { std::string test_root = "local_file_test_usage"; - LocalFile dir = LocalFile(test_root); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root)); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); std::vector file_list; - ASSERT_OK(dir.List(&file_list)); + ASSERT_OK(dir->List(&file_list)); std::string path_deep_dir = test_root + "/tmp2"; - LocalFile deep_dir = LocalFile(path_deep_dir); - ASSERT_OK_AND_ASSIGN(success, deep_dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto deep_dir, LocalFile::Create(path_deep_dir)); + ASSERT_OK_AND_ASSIGN(success, deep_dir->Mkdir()); ASSERT_TRUE(success); - LocalFile parent_deep_dir = deep_dir.GetParentFile(); - ASSERT_EQ(parent_deep_dir.GetAbsolutePath(), test_root); - ASSERT_OK(deep_dir.Delete()); + LocalFile parent_deep_dir = deep_dir->GetParentFile(); + ASSERT_EQ(parent_deep_dir.GetPath(), dir->GetPath()); + ASSERT_OK(deep_dir->Delete()); ASSERT_OK(parent_deep_dir.Delete()); - ASSERT_OK(dir.Delete()); + ASSERT_OK(dir->Delete()); } TEST(LocalFileTest, TestOpenFile) { auto test_root_dir = UniqueTestDirectory::Create(); ASSERT_TRUE(test_root_dir); std::string test_root = test_root_dir->Str(); - LocalFile dir = LocalFile(test_root); - if (dir.Exists().ok()) { - ASSERT_OK(dir.Delete()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root)); + if (dir->Exists().ok()) { + ASSERT_OK(dir->Delete()); } - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); std::string path = test_root + "/test.txt"; - LocalFile file = LocalFile(path); - if (file.Exists().ok()) { - ASSERT_OK(file.Delete()); + ASSERT_OK_AND_ASSIGN(auto file, LocalFile::Create(path)); + if (file->Exists().ok()) { + ASSERT_OK(file->Delete()); } - ASSERT_OK_AND_ASSIGN(bool is_exist, file.Exists()); + ASSERT_OK_AND_ASSIGN(bool is_exist, file->Exists()); ASSERT_FALSE(is_exist); - ASSERT_NOK_WITH_MSG(file.OpenFile(/*is_read_file=*/true), "file not exist"); - ASSERT_NOK_WITH_MSG(dir.OpenFile(/*is_read_file=*/true), "cannot open a directory"); + ASSERT_NOK_WITH_MSG(file->OpenFile(/*is_read_file=*/true), "file not exist"); + ASSERT_NOK_WITH_MSG(dir->OpenFile(/*is_read_file=*/true), "cannot open a directory"); std::string path3 = "test.txt"; - LocalFile file3 = LocalFile(path3); - ASSERT_OK(file3.OpenFile(/*is_read_file=*/false)); - ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3.LastModifiedTimeMs()); + ASSERT_OK_AND_ASSIGN(auto file3, LocalFile::Create(path3)); + ASSERT_OK(file3->OpenFile(/*is_read_file=*/false)); + ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3->LastModifiedTimeMs()); ASSERT_GE(modify_time, -1); - LocalFile dir2 = LocalFile("/"); - ASSERT_OK_AND_ASSIGN(success, dir2.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir2, LocalFile::Create("/")); + ASSERT_OK_AND_ASSIGN(success, dir2->Mkdir()); ASSERT_FALSE(success); - LocalFile dir3 = LocalFile(test_root + "/"); - ASSERT_OK_AND_ASSIGN(success, dir3.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir3, LocalFile::Create(test_root + "/")); + ASSERT_OK_AND_ASSIGN(success, dir3->Mkdir()); ASSERT_FALSE(success); } @@ -195,28 +195,28 @@ TEST(LocalFileTest, TestMkdir) { ASSERT_TRUE(test_root_dir); std::string test_root = test_root_dir->Str(); { - LocalFile dir = LocalFile(test_root + "tmp/local/f/1"); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + "tmp/local/f/1")); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_FALSE(success); } { - LocalFile dir = LocalFile(test_root + "tmp1"); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + "tmp1")); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); } { - LocalFile dir = LocalFile(test_root + "tmp1/f2/"); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create(test_root + "tmp1/f2/")); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_TRUE(success); } { - LocalFile dir = LocalFile("/"); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create("/")); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_FALSE(success); } { - LocalFile dir = LocalFile(""); - ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir()); + ASSERT_OK_AND_ASSIGN(auto dir, LocalFile::Create("")); + ASSERT_OK_AND_ASSIGN(bool success, dir->Mkdir()); ASSERT_FALSE(success); } }