diff --git a/.github/workflows/pg-extension-build.yaml b/.github/workflows/pg-extension-build.yaml index 5091bfc035..68ad7992ec 100644 --- a/.github/workflows/pg-extension-build.yaml +++ b/.github/workflows/pg-extension-build.yaml @@ -8,12 +8,11 @@ on: workflow_dispatch: inputs: pg_version: - description: "PostgreSQL version to build (16, 17, 18, or all)" + description: "PostgreSQL version to build (17, 18, or all)" required: false default: "18" type: choice options: - - "16" - "17" - "18" - all @@ -50,8 +49,8 @@ jobs: id: set-versions run: |- if [ "${PG_VERSION}" == "all" ]; then - echo "versions=[\"16\",\"17\",\"18\"]" >> "${GITHUB_OUTPUT}" - echo "versions-list=16,17,18" >> "${GITHUB_OUTPUT}" + echo "versions=[\"17\",\"18\"]" >> "${GITHUB_OUTPUT}" + echo "versions-list=17,18" >> "${GITHUB_OUTPUT}" else echo "versions=[\"${PG_VERSION}\"]" >> "${GITHUB_OUTPUT}" echo "versions-list=${PG_VERSION}" >> "${GITHUB_OUTPUT}" diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index a0dcc19c13..65e198d775 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -1,4 +1,3 @@ -option(BUILD_PG_16 "Build PostgreSQL 16 extension" OFF) option(BUILD_PG_17 "Build PostgreSQL 17 extension" OFF) option(BUILD_PG_18 "Build PostgreSQL 18 extension" ON) option(USE_DEEPLAKE_SHARED "Use shared library for deeplake_api (default: auto-detect)" OFF) @@ -6,10 +5,6 @@ option(USE_DEEPLAKE_SHARED "Use shared library for deeplake_api (default: auto-d set(PG_MODULE deeplake_pg) set(PG_VERSIONS) -if(BUILD_PG_16) - list(APPEND PG_VERSIONS 16) -endif() - if(BUILD_PG_17) list(APPEND PG_VERSIONS 17) endif() diff --git a/cpp/cmake/modules/FindPostgres.cmake b/cpp/cmake/modules/FindPostgres.cmake index 62531f6250..88e87b0c17 100644 --- a/cpp/cmake/modules/FindPostgres.cmake +++ b/cpp/cmake/modules/FindPostgres.cmake @@ -3,14 +3,12 @@ include(ExternalProject) # Define PostgreSQL versions set(postgres_versions - "REL_16_0" "REL_17_0" "REL_18_0" ) # Define corresponding SHA256 checksums for each version set(postgres_SHA256_CHECKSUMS - "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf" "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34" "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc" ) @@ -47,6 +45,5 @@ foreach(postgres_version IN LISTS postgres_versions) ) endforeach() -set(postgres_INSTALL_DIR_REL_16_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_16_0/install) set(postgres_INSTALL_DIR_REL_17_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_17_0/install) set(postgres_INSTALL_DIR_REL_18_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_18_0/install) diff --git a/cpp/deeplake_pg/dl_catalog.cpp b/cpp/deeplake_pg/dl_catalog.cpp deleted file mode 100644 index 53db8eead6..0000000000 --- a/cpp/deeplake_pg/dl_catalog.cpp +++ /dev/null @@ -1,906 +0,0 @@ -#include "dl_catalog.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -extern "C" { -#include -#include -} - -namespace pg::dl_catalog { - -namespace { - -constexpr const char* k_catalog_dir = "__deeplake_catalog"; -constexpr const char* k_tables_name = "tables"; -constexpr const char* k_columns_name = "columns"; -constexpr const char* k_indexes_name = "indexes"; -constexpr const char* k_meta_name = "meta"; -constexpr const char* k_schemas_name = "schemas"; -constexpr const char* k_databases_name = "databases"; - -// Shared (cluster-wide) path: {root}/__deeplake_catalog/{name} -std::string join_path(const std::string& root, const std::string& name) -{ - if (!root.empty() && root.back() == '/') { - return root + k_catalog_dir + "/" + name; - } - return root + "/" + k_catalog_dir + "/" + name; -} - -// Per-database path: {root}/{db_name}/__deeplake_catalog/{name} -std::string join_db_path(const std::string& root, const std::string& db_name, const std::string& name) -{ - std::string base = root; - if (!base.empty() && base.back() == '/') { - base.pop_back(); - } - return base + "/" + db_name + "/" + k_catalog_dir + "/" + name; -} - -// Cache for catalog table handles to avoid repeated S3 opens -struct catalog_table_cache -{ - std::string root_path; - std::shared_ptr meta_table; - - static catalog_table_cache& instance() - { - static thread_local catalog_table_cache cache; - return cache; - } - - std::shared_ptr get_meta_table(const std::string& path, icm::string_map<> creds) - { - if (path != root_path || !meta_table) { - // Cache miss or path changed - open and cache - root_path = path; - const auto meta_path = join_path(path, k_meta_name); - meta_table = deeplake_api::open_catalog_table(meta_path, std::move(creds)).get_future().get(); - } - return meta_table; - } - - void invalidate() - { - root_path.clear(); - meta_table.reset(); - } -}; - -int64_t now_ms() -{ - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()).count(); -} - -// Open a shared (cluster-wide) catalog table -std::shared_ptr -open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds) -{ - const auto path = join_path(root_path, name); - return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); -} - -// Open a per-database catalog table -std::shared_ptr -open_db_catalog_table(const std::string& root_path, const std::string& db_name, const std::string& name, icm::string_map<> creds) -{ - const auto path = join_db_path(root_path, db_name, name); - return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); -} - -template -std::vector load_vector(const nd::array& arr) -{ - std::vector out; - out.reserve(static_cast(arr.volume())); - for (int64_t i = 0; i < arr.volume(); ++i) { - out.push_back(arr.value(i)); - } - return out; -} - -std::vector load_int64_vector(const nd::array& arr) -{ - std::vector out; - out.reserve(static_cast(arr.volume())); - bool is_numeric = false; - try { - is_numeric = nd::dtype_is_numeric(arr.dtype()); - } catch (...) { - is_numeric = false; - } - if (is_numeric) { - try { - for (int64_t i = 0; i < arr.volume(); ++i) { - out.push_back(arr.value(i)); - } - return out; - } catch (...) { - out.clear(); - } - } - for (int64_t i = 0; i < arr.volume(); ++i) { - auto v = arr.value(i); - try { - out.push_back(std::stoll(std::string(v))); - } catch (...) { - out.push_back(0); - } - } - return out; -} - -// Build the tables schema (shared between ensure_db_catalog and schema definitions) -deeplake_api::catalog_table_schema make_tables_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("schema_name", deeplake_core::type::text(codecs::compression::null)) - .add("table_name", deeplake_core::type::text(codecs::compression::null)) - .add("dataset_path", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("db_name", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("table_id"); - return schema; -} - -deeplake_api::catalog_table_schema make_columns_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("column_id", deeplake_core::type::text(codecs::compression::null)) - .add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_name", deeplake_core::type::text(codecs::compression::null)) - .add("pg_type", deeplake_core::type::text(codecs::compression::null)) - .add("dl_type_json", deeplake_core::type::text(codecs::compression::null)) - .add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean))) - .add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("column_id"); - return schema; -} - -deeplake_api::catalog_table_schema make_indexes_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_names", deeplake_core::type::text(codecs::compression::null)) - .add("index_type", deeplake_core::type::text(codecs::compression::null)) - .add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("table_id"); - return schema; -} - -deeplake_api::catalog_table_schema make_schemas_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("schema_name", deeplake_core::type::text(codecs::compression::null)) - .add("owner", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("schema_name"); - return schema; -} - -deeplake_api::catalog_table_schema make_meta_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("catalog_version"); - return schema; -} - -deeplake_api::catalog_table_schema make_databases_schema() -{ - deeplake_api::catalog_table_schema schema; - schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) - .add("owner", deeplake_core::type::text(codecs::compression::null)) - .add("encoding", deeplake_core::type::text(codecs::compression::null)) - .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) - .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) - .add("template_db", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("db_name"); - return schema; -} - -} // namespace - -int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) -{ - if (root_path.empty()) { - return 0; - } - const auto meta_path = join_path(root_path, k_meta_name); - const auto databases_path = join_path(root_path, k_databases_name); - - try { - // Launch shared catalog table creation in parallel (meta + databases only) - icm::vector>> promises; - promises.reserve(2); - promises.push_back( - deeplake_api::open_or_create_catalog_table(meta_path, make_meta_schema(), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(databases_path, make_databases_schema(), icm::string_map<>(creds))); - - // Wait for all to complete - auto results = async::combine(std::move(promises)).get_future().get(); - if (results.size() != 2) { - elog(ERROR, - "Failed to initialize shared catalog at %s: expected 2 catalog tables, got %zu", - root_path.c_str(), - static_cast(results.size())); - } - - // Initialize meta table if empty (index 0 is meta) - auto& meta_table = results[0]; - if (meta_table) { - auto snapshot = meta_table->read().get_future().get(); - if (snapshot.row_count() == 0) { - icm::string_map row; - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - meta_table->insert(std::move(row)).get_future().get(); - } - } - // Get version from the meta table we already have open - if (meta_table) { - return static_cast(meta_table->version().get_future().get()); - } - return 0; - } catch (const std::exception& e) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure shared catalog at %s: %s", root_path.c_str(), e.what()); - return 0; - } catch (...) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure shared catalog at %s: unknown error", root_path.c_str()); - return 0; - } -} - -int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - if (root_path.empty() || db_name.empty()) { - return 0; - } - - const auto tables_path = join_db_path(root_path, db_name, k_tables_name); - const auto columns_path = join_db_path(root_path, db_name, k_columns_name); - const auto indexes_path = join_db_path(root_path, db_name, k_indexes_name); - const auto schemas_path = join_db_path(root_path, db_name, k_schemas_name); - const auto meta_path = join_db_path(root_path, db_name, k_meta_name); - - try { - // Launch all 5 per-database catalog table creation in parallel - icm::vector>> promises; - promises.reserve(5); - promises.push_back( - deeplake_api::open_or_create_catalog_table(tables_path, make_tables_schema(), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(columns_path, make_columns_schema(), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(indexes_path, make_indexes_schema(), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(schemas_path, make_schemas_schema(), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(meta_path, make_meta_schema(), icm::string_map<>(creds))); - - auto results = async::combine(std::move(promises)).get_future().get(); - if (results.size() != 5) { - elog(ERROR, - "Failed to initialize per-db catalog at %s/%s: expected 5 catalog tables, got %zu", - root_path.c_str(), - db_name.c_str(), - static_cast(results.size())); - } - - // Initialize per-db meta table if empty (index 4 is meta) - auto& meta_table = results[4]; - if (meta_table) { - auto snapshot = meta_table->read().get_future().get(); - if (snapshot.row_count() == 0) { - icm::string_map row; - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - meta_table->insert(std::move(row)).get_future().get(); - } - } - if (meta_table) { - return static_cast(meta_table->version().get_future().get()); - } - return 0; - } catch (const std::exception& e) { - elog(ERROR, "Failed to ensure per-db catalog at %s/%s: %s", root_path.c_str(), db_name.c_str(), e.what()); - return 0; - } catch (...) { - elog(ERROR, "Failed to ensure per-db catalog at %s/%s: unknown error", root_path.c_str(), db_name.c_str()); - return 0; - } -} - -std::vector load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_db_catalog_table(root_path, db_name, k_tables_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - std::unordered_map latest; - for (const auto& row : snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto schema_it = row.find("schema_name"); - auto table_it = row.find("table_name"); - auto path_it = row.find("dataset_path"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() || path_it == row.end() || - state_it == row.end() || updated_it == row.end()) { - continue; - } - - table_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.schema_name = deeplake_api::array_to_string(schema_it->second); - meta.table_name = deeplake_api::array_to_string(table_it->second); - meta.dataset_path = deeplake_api::array_to_string(path_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - meta.db_name = db_name; - auto db_name_it = row.find("db_name"); - if (db_name_it != row.end()) { - meta.db_name = deeplake_api::array_to_string(db_name_it->second); - } - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - - auto it = latest.find(meta.table_id); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.table_id] = std::move(meta); - } - } - - out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - out.push_back(std::move(meta)); - } - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables for db '%s': %s", db_name.c_str(), e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog tables for db '%s': unknown error", db_name.c_str()); - return out; - } -} - -std::vector load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_db_catalog_table(root_path, db_name, k_columns_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - for (const auto& row : snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto column_name_it = row.find("column_name"); - auto pg_type_it = row.find("pg_type"); - auto dl_type_it = row.find("dl_type_json"); - auto nullable_it = row.find("nullable"); - auto position_it = row.find("position"); - - if (table_id_it == row.end() || column_name_it == row.end() || pg_type_it == row.end()) { - continue; - } - - column_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.column_name = deeplake_api::array_to_string(column_name_it->second); - meta.pg_type = deeplake_api::array_to_string(pg_type_it->second); - if (dl_type_it != row.end()) { - meta.dl_type_json = deeplake_api::array_to_string(dl_type_it->second); - } - if (nullable_it != row.end()) { - try { - meta.nullable = nullable_it->second.value(0); - } catch (...) { - meta.nullable = true; - } - } - if (position_it != row.end()) { - try { - meta.position = position_it->second.value(0); - } catch (...) { - auto pos_vec = load_int64_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : static_cast(pos_vec.front()); - } - } - - out.push_back(std::move(meta)); - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog columns for db '%s': %s", db_name.c_str(), e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog columns for db '%s': unknown error", db_name.c_str()); - return out; - } -} - -std::vector load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - for (const auto& row : snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto column_names_it = row.find("column_names"); - auto index_type_it = row.find("index_type"); - auto order_type_it = row.find("order_type"); - - if (table_id_it == row.end() || column_names_it == row.end() || index_type_it == row.end()) { - continue; - } - - index_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.column_names = deeplake_api::array_to_string(column_names_it->second); - meta.index_type = deeplake_api::array_to_string(index_type_it->second); - if (order_type_it != row.end()) { - try { - auto order_vec = load_vector(order_type_it->second); - meta.order_type = order_vec.empty() ? 0 : order_vec.front(); - } catch (...) { - meta.order_type = 0; - } - } - - out.push_back(std::move(meta)); - } - return out; - } catch (const std::exception& e) { - elog(DEBUG1, "Failed to load catalog indexes for db '%s': %s", db_name.c_str(), e.what()); - return out; - } catch (...) { - elog(DEBUG1, "Failed to load catalog indexes for db '%s': unknown error", db_name.c_str()); - return out; - } -} - -std::vector load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - std::unordered_map latest; - for (const auto& row : snapshot.rows()) { - auto schema_name_it = row.find("schema_name"); - auto state_it = row.find("state"); - if (schema_name_it == row.end() || state_it == row.end()) { - continue; - } - - schema_meta meta; - meta.schema_name = deeplake_api::array_to_string(schema_name_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - auto owner_it = row.find("owner"); - if (owner_it != row.end()) { - meta.owner = deeplake_api::array_to_string(owner_it->second); - } - auto updated_it = row.find("updated_at"); - if (updated_it != row.end()) { - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - } - - auto it = latest.find(meta.schema_name); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.schema_name] = std::move(meta); - } - } - - out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - out.push_back(std::move(meta)); - } - } - return out; - } catch (const std::exception& e) { - elog(DEBUG1, "Failed to load catalog schemas for db '%s': %s (may be old catalog)", db_name.c_str(), e.what()); - return out; - } catch (...) { - elog(DEBUG1, "Failed to load catalog schemas for db '%s': unknown error (may be old catalog)", db_name.c_str()); - return out; - } -} - -void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta) -{ - auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds)); - icm::string_map row; - row["schema_name"] = nd::adapt(meta.schema_name); - row["owner"] = nd::adapt(meta.owner); - row["state"] = nd::adapt(meta.state); - row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); - table->upsert(std::move(row)).get_future().get(); -} - -std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - std::vector tables_out; - std::vector columns_out; - - try { - // Open both per-database catalog tables in parallel - auto tables_promise = deeplake_api::open_catalog_table(join_db_path(root_path, db_name, k_tables_name), icm::string_map<>(creds)); - auto columns_promise = deeplake_api::open_catalog_table(join_db_path(root_path, db_name, k_columns_name), icm::string_map<>(creds)); - - icm::vector>> open_promises; - open_promises.push_back(std::move(tables_promise)); - open_promises.push_back(std::move(columns_promise)); - - auto catalog_tables = async::combine(std::move(open_promises)).get_future().get(); - - auto& tables_table = catalog_tables[0]; - auto& columns_table = catalog_tables[1]; - - if (!tables_table || !columns_table) { - return {tables_out, columns_out}; - } - - // Read both snapshots in parallel - auto tables_read_promise = tables_table->read(); - auto columns_read_promise = columns_table->read(); - - icm::vector> read_promises; - read_promises.push_back(std::move(tables_read_promise)); - read_promises.push_back(std::move(columns_read_promise)); - - auto snapshots = async::combine(std::move(read_promises)).get_future().get(); - - auto& tables_snapshot = snapshots[0]; - auto& columns_snapshot = snapshots[1]; - - // Process tables - if (tables_snapshot.row_count() > 0) { - std::unordered_map latest; - for (const auto& row : tables_snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto schema_it = row.find("schema_name"); - auto table_it = row.find("table_name"); - auto path_it = row.find("dataset_path"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() || path_it == row.end() || - state_it == row.end() || updated_it == row.end()) { - continue; - } - - table_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.schema_name = deeplake_api::array_to_string(schema_it->second); - meta.table_name = deeplake_api::array_to_string(table_it->second); - meta.dataset_path = deeplake_api::array_to_string(path_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - meta.db_name = db_name; - auto db_name_it = row.find("db_name"); - if (db_name_it != row.end()) { - meta.db_name = deeplake_api::array_to_string(db_name_it->second); - } - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - - auto it = latest.find(meta.table_id); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.table_id] = std::move(meta); - } - } - - tables_out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - tables_out.push_back(std::move(meta)); - } - } - } - - // Process columns - if (columns_snapshot.row_count() > 0) { - for (const auto& row : columns_snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto column_name_it = row.find("column_name"); - auto pg_type_it = row.find("pg_type"); - auto dl_type_it = row.find("dl_type_json"); - auto nullable_it = row.find("nullable"); - auto position_it = row.find("position"); - - if (table_id_it == row.end() || column_name_it == row.end() || pg_type_it == row.end()) { - continue; - } - - column_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.column_name = deeplake_api::array_to_string(column_name_it->second); - meta.pg_type = deeplake_api::array_to_string(pg_type_it->second); - if (dl_type_it != row.end()) { - meta.dl_type_json = deeplake_api::array_to_string(dl_type_it->second); - } - if (nullable_it != row.end()) { - try { - auto nullable_vec = load_vector(nullable_it->second); - meta.nullable = !nullable_vec.empty() && nullable_vec.front() != 0; - } catch (...) { - meta.nullable = true; - } - } - if (position_it != row.end()) { - try { - auto pos_vec = load_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : pos_vec.front(); - } catch (...) { - auto pos_vec = load_int64_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : static_cast(pos_vec.front()); - } - } - - columns_out.push_back(std::move(meta)); - } - } - - return {tables_out, columns_out}; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables and columns for db '%s': %s", db_name.c_str(), e.what()); - return {tables_out, columns_out}; - } catch (...) { - elog(WARNING, "Failed to load catalog tables and columns for db '%s': unknown error", db_name.c_str()); - return {tables_out, columns_out}; - } -} - -void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta) -{ - auto table = open_db_catalog_table(root_path, db_name, k_tables_name, std::move(creds)); - icm::string_map row; - row["table_id"] = nd::adapt(meta.table_id); - row["schema_name"] = nd::adapt(meta.schema_name); - row["table_name"] = nd::adapt(meta.table_name); - row["dataset_path"] = nd::adapt(meta.dataset_path); - row["state"] = nd::adapt(meta.state); - row["db_name"] = nd::adapt(meta.db_name.empty() ? db_name : meta.db_name); - row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); - table->upsert(std::move(row)).get_future().get(); -} - -void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& columns) -{ - if (columns.empty()) { - return; - } - auto table = open_db_catalog_table(root_path, db_name, k_columns_name, std::move(creds)); - icm::vector> rows; - rows.reserve(columns.size()); - for (const auto& col : columns) { - icm::string_map row; - // column_id is the composite key: table_id:column_name - row["column_id"] = nd::adapt(col.table_id + ":" + col.column_name); - row["table_id"] = nd::adapt(col.table_id); - row["column_name"] = nd::adapt(col.column_name); - row["pg_type"] = nd::adapt(col.pg_type); - row["dl_type_json"] = nd::adapt(col.dl_type_json); - row["nullable"] = nd::adapt(col.nullable); - row["position"] = nd::adapt(col.position); - rows.push_back(std::move(row)); - } - table->upsert_many(std::move(rows)).get_future().get(); -} - -void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& indexes) -{ - if (indexes.empty()) { - return; - } - auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds)); - icm::vector> rows; - rows.reserve(indexes.size()); - for (const auto& idx : indexes) { - icm::string_map row; - row["table_id"] = nd::adapt(idx.table_id); - row["column_names"] = nd::adapt(idx.column_names); - row["index_type"] = nd::adapt(idx.index_type); - row["order_type"] = nd::adapt(idx.order_type); - rows.push_back(std::move(row)); - } - table->upsert_many(std::move(rows)).get_future().get(); -} - -std::vector load_databases(const std::string& root_path, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - std::unordered_map latest; - for (const auto& row : snapshot.rows()) { - auto db_name_it = row.find("db_name"); - auto owner_it = row.find("owner"); - auto encoding_it = row.find("encoding"); - auto lc_collate_it = row.find("lc_collate"); - auto lc_ctype_it = row.find("lc_ctype"); - auto template_it = row.find("template_db"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (db_name_it == row.end() || state_it == row.end()) { - continue; - } - - database_meta meta; - meta.db_name = deeplake_api::array_to_string(db_name_it->second); - if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second); - if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second); - if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second); - if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second); - if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - if (updated_it != row.end()) { - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - } - - auto it = latest.find(meta.db_name); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.db_name] = std::move(meta); - } - } - - out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - out.push_back(std::move(meta)); - } - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog databases: %s", e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog databases: unknown error"); - return out; - } -} - -void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta) -{ - auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); - icm::string_map row; - row["db_name"] = nd::adapt(meta.db_name); - row["owner"] = nd::adapt(meta.owner); - row["encoding"] = nd::adapt(meta.encoding); - row["lc_collate"] = nd::adapt(meta.lc_collate); - row["lc_ctype"] = nd::adapt(meta.lc_ctype); - row["template_db"] = nd::adapt(meta.template_db); - row["state"] = nd::adapt(meta.state); - row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); - table->upsert(std::move(row)).get_future().get(); -} - -int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds) -{ - try { - // Use cached meta table handle to avoid repeated S3 opens - auto table = catalog_table_cache::instance().get_meta_table(root_path, std::move(creds)); - if (!table) { - return 0; - } - // Use version() for fast HEAD request instead of reading the whole table. - // Returns a hash of the ETag which changes whenever the table is modified. - return static_cast(table->version().get_future().get()); - } catch (const std::exception& e) { - elog(WARNING, "Failed to read catalog version: %s", e.what()); - catalog_table_cache::instance().invalidate(); - return 0; - } catch (...) { - elog(WARNING, "Failed to read catalog version: unknown error"); - catalog_table_cache::instance().invalidate(); - return 0; - } -} - -void bump_catalog_version(const std::string& root_path, icm::string_map<> creds) -{ - auto table = open_catalog_table(root_path, k_meta_name, std::move(creds)); - icm::string_map row; - // Use a fixed key and upsert - the updated_at timestamp change will trigger - // a new ETag, which is what get_catalog_version() now detects via version(). - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - table->upsert(std::move(row)).get_future().get(); -} - -int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - try { - auto table = open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); - if (!table) { - return 0; - } - return static_cast(table->version().get_future().get()); - } catch (const std::exception& e) { - elog(WARNING, "Failed to read per-db catalog version for '%s': %s", db_name.c_str(), e.what()); - return 0; - } catch (...) { - elog(WARNING, "Failed to read per-db catalog version for '%s': unknown error", db_name.c_str()); - return 0; - } -} - -void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - auto table = open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); - icm::string_map row; - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - table->upsert(std::move(row)).get_future().get(); -} - -std::shared_ptr -open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) -{ - return open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); -} - -} // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/dl_catalog.hpp b/cpp/deeplake_pg/dl_catalog.hpp deleted file mode 100644 index 08ba6f468f..0000000000 --- a/cpp/deeplake_pg/dl_catalog.hpp +++ /dev/null @@ -1,103 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -namespace deeplake_api { class catalog_table; } - -namespace pg::dl_catalog { - -struct table_meta -{ - std::string table_id; - std::string schema_name; - std::string table_name; - std::string dataset_path; - std::string state; - std::string db_name; - int64_t updated_at = 0; -}; - -struct column_meta -{ - std::string table_id; - std::string column_name; - std::string pg_type; - std::string dl_type_json; - bool nullable = true; - int32_t position = 0; -}; - -struct index_meta -{ - std::string table_id; - std::string column_names; - std::string index_type; - int32_t order_type = 0; -}; - -struct schema_meta -{ - std::string schema_name; // PK - std::string owner; - std::string state; // "ready" or "dropping" - int64_t updated_at = 0; -}; - -struct database_meta -{ - std::string db_name; // PK - std::string owner; - std::string encoding; - std::string lc_collate; - std::string lc_ctype; - std::string template_db; - std::string state; // "ready" or "dropping" - int64_t updated_at = 0; -}; - -// Shared (cluster-wide) catalog: meta + databases -int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds); - -// Per-database catalog: tables + columns + indexes + meta -int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); - -// Per-database loaders (read from {root}/{db_name}/__deeplake_catalog/) -std::vector load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); -std::vector load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); -std::vector load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); - -// Load tables and columns in parallel for better performance -std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); - -// Per-database schema catalog -std::vector load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); -void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta); - -// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/) -void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta); -void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& columns); -void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& indexes); - -// Shared (cluster-wide) database catalog -std::vector load_databases(const std::string& root_path, icm::string_map<> creds); -void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); - -// Global (shared) catalog version -int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds); -void bump_catalog_version(const std::string& root_path, icm::string_map<> creds); - -// Per-database catalog version -int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); -void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); - -// Open the per-database meta table handle (for parallel .version() calls in sync worker) -std::shared_ptr -open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); - -} // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/dl_wal.cpp b/cpp/deeplake_pg/dl_wal.cpp new file mode 100644 index 0000000000..aab73be0af --- /dev/null +++ b/cpp/deeplake_pg/dl_wal.cpp @@ -0,0 +1,416 @@ +#include "dl_wal.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +} + +namespace pg::dl_wal { + +namespace { + +constexpr const char* k_catalog_dir = "__deeplake_catalog"; +constexpr const char* k_databases_name = "databases"; +constexpr const char* k_ddl_log_name = "__wal_table"; + +// Shared (cluster-wide) path: {root}/__deeplake_catalog/{name} +std::string join_path(const std::string& root, const std::string& name) +{ + if (!root.empty() && root.back() == '/') { + return root + k_catalog_dir + "/" + name; + } + return root + "/" + k_catalog_dir + "/" + name; +} + +// Per-database path: {root}/{db_name}/__deeplake_catalog/{name} +std::string join_db_path(const std::string& root, const std::string& db_name, const std::string& name) +{ + std::string base = root; + if (!base.empty() && base.back() == '/') { + base.pop_back(); + } + return base + "/" + db_name + "/" + k_catalog_dir + "/" + name; +} + +int64_t now_ms() +{ + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +} + +// Open a shared (cluster-wide) catalog table +std::shared_ptr +open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds) +{ + const auto path = join_path(root_path, name); + return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); +} + +// Create the WAL dataset with schema. Called once from ensure_db_catalog. +void create_ddl_dataset(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + const auto path = join_db_path(root_path, db_name, k_ddl_log_name); + bool exists = false; + try { + exists = deeplake_api::exists(path, icm::string_map<>(creds)).get_future().get(); + } catch (...) { + exists = false; + } + if (exists) { + return; + } + + auto ds = deeplake_api::create(path, std::move(creds)).get_future().get(); + ds->add_column("seq", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))); + ds->add_column("origin_instance_id", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("search_path", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("command_tag", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("object_identity", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("ddl_sql", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("timestamp", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))); + ds->commit().get_future().get(); +} + +// Thread-local cached WAL dataset handle for append (hot path). +struct ddl_dataset_cache +{ + std::string key; // root_path + "\t" + db_name + std::shared_ptr ds; + + static ddl_dataset_cache& instance() + { + static thread_local ddl_dataset_cache cache; + return cache; + } + + std::shared_ptr get(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) + { + const auto k = root_path + "\t" + db_name; + if (k == key && ds) { + return ds; + } + const auto path = join_db_path(root_path, db_name, k_ddl_log_name); + ds = deeplake_api::open(path, std::move(creds)).get_future().get(); + key = k; + return ds; + } + + void invalidate() + { + key.clear(); + ds.reset(); + } +}; + +std::vector load_int64_vector(const nd::array& arr) +{ + std::vector out; + out.reserve(static_cast(arr.volume())); + bool is_numeric = false; + try { + is_numeric = nd::dtype_is_numeric(arr.dtype()); + } catch (...) { + is_numeric = false; + } + if (is_numeric) { + try { + for (int64_t i = 0; i < arr.volume(); ++i) { + out.push_back(arr.value(i)); + } + return out; + } catch (...) { + out.clear(); + } + } + for (int64_t i = 0; i < arr.volume(); ++i) { + auto v = arr.value(i); + try { + out.push_back(std::stoll(std::string(v))); + } catch (...) { + out.push_back(0); + } + } + return out; +} + +deeplake_api::catalog_table_schema make_databases_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) + .add("owner", deeplake_core::type::text(codecs::compression::null)) + .add("encoding", deeplake_core::type::text(codecs::compression::null)) + .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) + .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) + .add("template_db", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("db_name"); + return schema; +} + +} // namespace + +int64_t next_ddl_seq() +{ + static thread_local int64_t counter = 0; + static thread_local int64_t last_ms = 0; + const int64_t ms = now_ms(); + if (ms == last_ms) { + counter++; + } else { + counter = 0; + last_ms = ms; + } + return ms * 1000 + counter; +} + +std::string local_instance_id() +{ + char hostname[256] = {0}; + if (gethostname(hostname, sizeof(hostname) - 1) != 0) { + strlcpy(hostname, "unknown_host", sizeof(hostname)); + } + const char* port = GetConfigOption("port", true, false); + const std::string data_dir = DataDir != nullptr ? std::string(DataDir) : std::string("unknown_data_dir"); + const std::string host_str(hostname); + const std::string port_str = port != nullptr ? std::string(port) : std::string("unknown_port"); + return host_str + ":" + port_str + ":" + data_dir; +} + +void ensure_catalog(const std::string& root_path, icm::string_map<> creds) +{ + if (root_path.empty()) { + return; + } + const auto databases_path = join_path(root_path, k_databases_name); + + try { + deeplake_api::open_or_create_catalog_table(databases_path, make_databases_schema(), std::move(creds)) + .get_future() + .get(); + } catch (const std::exception& e) { + elog(ERROR, "Failed to ensure shared catalog at %s: %s", root_path.c_str(), e.what()); + } catch (...) { + elog(ERROR, "Failed to ensure shared catalog at %s: unknown error", root_path.c_str()); + } +} + +void ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + if (root_path.empty() || db_name.empty()) { + return; + } + + try { + create_ddl_dataset(root_path, db_name, std::move(creds)); + } catch (const std::exception& e) { + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: %s", root_path.c_str(), db_name.c_str(), e.what()); + } catch (...) { + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: unknown error", root_path.c_str(), db_name.c_str()); + } +} + +std::vector load_databases(const std::string& root_path, icm::string_map<> creds) +{ + std::vector out; + try { + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + if (!table) { + return out; + } + auto snapshot = table->read().get_future().get(); + if (snapshot.row_count() == 0) { + return out; + } + + std::unordered_map latest; + for (const auto& row : snapshot.rows()) { + auto db_name_it = row.find("db_name"); + auto owner_it = row.find("owner"); + auto encoding_it = row.find("encoding"); + auto lc_collate_it = row.find("lc_collate"); + auto lc_ctype_it = row.find("lc_ctype"); + auto template_it = row.find("template_db"); + auto state_it = row.find("state"); + auto updated_it = row.find("updated_at"); + if (db_name_it == row.end() || state_it == row.end()) { + continue; + } + + database_meta meta; + meta.db_name = deeplake_api::array_to_string(db_name_it->second); + if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second); + if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second); + if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second); + if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second); + if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second); + meta.state = deeplake_api::array_to_string(state_it->second); + if (updated_it != row.end()) { + auto updated_vec = load_int64_vector(updated_it->second); + meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); + } + + auto it = latest.find(meta.db_name); + if (it == latest.end() || it->second.updated_at <= meta.updated_at) { + latest[meta.db_name] = std::move(meta); + } + } + + out.reserve(latest.size()); + for (auto& [_, meta] : latest) { + if (meta.state == "ready") { + out.push_back(std::move(meta)); + } + } + return out; + } catch (const std::exception& e) { + elog(WARNING, "Failed to load catalog databases: %s", e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load catalog databases: unknown error"); + return out; + } +} + +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta) +{ + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + icm::string_map row; + row["db_name"] = nd::adapt(meta.db_name); + row["owner"] = nd::adapt(meta.owner); + row["encoding"] = nd::adapt(meta.encoding); + row["lc_collate"] = nd::adapt(meta.lc_collate); + row["lc_ctype"] = nd::adapt(meta.lc_ctype); + row["template_db"] = nd::adapt(meta.template_db); + row["state"] = nd::adapt(meta.state); + row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); + table->upsert(std::move(row)).get_future().get(); +} + +int64_t get_databases_version(const std::string& root_path, icm::string_map<> creds) +{ + try { + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + if (!table) { + return 0; + } + return static_cast(table->version().get_future().get()); + } catch (const std::exception& e) { + elog(WARNING, "Failed to read databases catalog version: %s", e.what()); + return 0; + } catch (...) { + elog(WARNING, "Failed to read databases catalog version: unknown error"); + return 0; + } +} + +std::shared_ptr +open_ddl_log_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + return ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); +} + +void append_ddl_log(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, + const ddl_log_entry& entry) +{ + try { + auto ds = ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); + ds->set_auto_commit_enabled(false).get_future().get(); + icm::string_map row; + row["seq"] = nd::adapt(entry.seq); + row["origin_instance_id"] = nd::adapt(entry.origin_instance_id); + row["search_path"] = nd::adapt(entry.search_path); + row["command_tag"] = nd::adapt(entry.command_tag); + row["object_identity"] = nd::adapt(entry.object_identity); + row["ddl_sql"] = nd::adapt(entry.ddl_sql); + row["timestamp"] = nd::adapt(entry.timestamp == 0 ? now_ms() : entry.timestamp); + ds->append_row(row).get_future().get(); + ds->commit().get_future().get(); + } catch (...) { + ddl_dataset_cache::instance().invalidate(); + throw; + } +} + +std::vector load_ddl_log(const std::string& root_path, const std::string& db_name, + icm::string_map<> creds, int64_t after_seq) +{ + std::vector out; + try { + auto ds = ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); + if (!ds) { + return out; + } + ds->refresh().get_future().get(); + const int64_t row_count = ds->num_rows(); + if (row_count == 0) { + return out; + } + + auto seq_arr = ds->get_column("seq").request_range(0, row_count, {}).get_future().get(); + auto origin_arr = ds->get_column("origin_instance_id").request_range(0, row_count, {}).get_future().get(); + auto search_path_arr = ds->get_column("search_path").request_range(0, row_count, {}).get_future().get(); + auto tag_arr = ds->get_column("command_tag").request_range(0, row_count, {}).get_future().get(); + auto object_arr = ds->get_column("object_identity").request_range(0, row_count, {}).get_future().get(); + auto sql_arr = ds->get_column("ddl_sql").request_range(0, row_count, {}).get_future().get(); + auto ts_arr = ds->get_column("timestamp").request_range(0, row_count, {}).get_future().get(); + + auto seq_vec = load_int64_vector(seq_arr); + auto ts_vec = load_int64_vector(ts_arr); + for (int64_t i = 0; i < row_count; ++i) { + ddl_log_entry entry; + entry.seq = i < static_cast(seq_vec.size()) ? seq_vec[static_cast(i)] : 0; + if (entry.seq <= after_seq) { + continue; + } + auto read_string = [](const nd::array& arr, int64_t idx) -> std::string { + try { + auto sub = arr[idx]; + auto bytes = sub.data(); + return std::string(reinterpret_cast(bytes.data()), bytes.size()); + } catch (...) { + try { + return std::string(arr.value(idx)); + } catch (...) { + return {}; + } + } + }; + entry.origin_instance_id = read_string(origin_arr, i); + entry.search_path = read_string(search_path_arr, i); + entry.command_tag = read_string(tag_arr, i); + entry.object_identity = read_string(object_arr, i); + entry.ddl_sql = read_string(sql_arr, i); + entry.timestamp = i < static_cast(ts_vec.size()) ? ts_vec[static_cast(i)] : 0; + out.push_back(std::move(entry)); + } + std::sort(out.begin(), out.end(), [](const ddl_log_entry& a, const ddl_log_entry& b) { + return a.seq < b.seq; + }); + return out; + } catch (const std::exception& e) { + elog(WARNING, "Failed to load DDL log for db '%s': %s", db_name.c_str(), e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load DDL log for db '%s': unknown error", db_name.c_str()); + return out; + } +} + +} // namespace pg::dl_wal diff --git a/cpp/deeplake_pg/dl_wal.hpp b/cpp/deeplake_pg/dl_wal.hpp new file mode 100644 index 0000000000..c0d386cc69 --- /dev/null +++ b/cpp/deeplake_pg/dl_wal.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace deeplake_api { +class dataset; +} + +namespace pg::dl_wal { + +struct database_meta +{ + std::string db_name; // PK + std::string owner; + std::string encoding; + std::string lc_collate; + std::string lc_ctype; + std::string template_db; + std::string state; // "ready" or "dropping" + int64_t updated_at = 0; +}; + +struct ddl_log_entry +{ + int64_t seq = 0; // Primary key + std::string origin_instance_id; + std::string search_path; + std::string command_tag; + std::string object_identity; + std::string ddl_sql; + int64_t timestamp = 0; +}; + +// Shared (cluster-wide) catalog: databases catalog_table +void ensure_catalog(const std::string& root_path, icm::string_map<> creds); + +// Per-database catalog: __wal_table dataset +void ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +// Shared (cluster-wide) database catalog +std::vector load_databases(const std::string& root_path, icm::string_map<> creds); +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); + +// Global version check via databases catalog_table +int64_t get_databases_version(const std::string& root_path, icm::string_map<> creds); + +std::shared_ptr +open_ddl_log_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +void append_ddl_log(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, + const ddl_log_entry& entry); + +std::vector load_ddl_log(const std::string& root_path, const std::string& db_name, + icm::string_map<> creds, int64_t after_seq = 0); + +int64_t next_ddl_seq(); + +// Unique identifier for this PostgreSQL instance: "hostname:port:datadir" +std::string local_instance_id(); + +} // namespace pg::dl_wal diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index 2d531c7537..45552de334 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -11,6 +11,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -27,7 +28,7 @@ extern "C" { #include "column_statistics.hpp" #include "deeplake_executor.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "pg_deeplake.hpp" #include "pg_version_compat.h" #include "sync_worker.hpp" @@ -44,6 +45,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -72,6 +74,58 @@ bool stateless_enabled = false; // Enable stateless catalog sync across ins namespace { +bool is_ddl_log_suppressed() +{ + if (pg::table_storage::is_catalog_only_create()) { + return true; + } + if (creating_extension) { + return true; + } + const char* app_name = GetConfigOption("application_name", true, false); + return app_name && strcmp(app_name, "pg_deeplake_sync") == 0; +} + +void append_to_ddl_log_if_needed(const char* command_tag, const char* object_identity, const char* query_string) +{ + if (!pg::stateless_enabled || is_ddl_log_suppressed()) { + return; + } + if (query_string == nullptr || query_string[0] == '\0') { + return; + } + + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (root_path.empty()) { + return; + } + + try { + auto creds = pg::session_credentials::get_credentials(); + const char* dbname = get_database_name(MyDatabaseId); + std::string db_name = dbname ? dbname : "postgres"; + if (dbname) { + pfree(const_cast(dbname)); + } + + pg::dl_wal::ddl_log_entry entry; + entry.seq = pg::dl_wal::next_ddl_seq(); + entry.origin_instance_id = pg::dl_wal::local_instance_id(); + const char* current_search_path = GetConfigOption("search_path", true, false); + entry.search_path = current_search_path != nullptr ? current_search_path : ""; + entry.command_tag = command_tag != nullptr ? command_tag : ""; + entry.object_identity = object_identity != nullptr ? object_identity : ""; + entry.ddl_sql = query_string; + + pg::dl_wal::append_ddl_log(root_path, db_name, creds, entry); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake: failed to append DDL to WAL log: %s", e.what()); + } +} + bool is_count_star(TargetEntry* node) { if (node == nullptr || node->expr == nullptr || !IsA(node->expr, Aggref)) { @@ -589,34 +643,6 @@ static void process_utility(PlannedStmt* pstmt, } } - // Mark schema as "dropping" in the S3 catalog - if (pg::stateless_enabled) { - try { - auto root_path = pg::session_credentials::get_root_path(); - if (root_path.empty()) { - root_path = pg::utils::get_deeplake_root_directory(); - } - if (!root_path.empty()) { - auto creds = pg::session_credentials::get_credentials(); - const char* dbname = get_database_name(MyDatabaseId); - std::string db_name = dbname ? dbname : "postgres"; - if (dbname) pfree(const_cast(dbname)); - - pg::dl_catalog::ensure_catalog(root_path, creds); - pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds); - - pg::dl_catalog::schema_meta s_meta; - s_meta.schema_name = schema_name; - s_meta.state = "dropping"; - pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta); - - pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials()); - pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials()); - } - } catch (const std::exception& e) { - elog(WARNING, "pg_deeplake: failed to mark schema '%s' as dropping in catalog: %s", schema_name, e.what()); - } - } } } else if (stmt->removeType == OBJECT_DATABASE) { const char* query = "SELECT nspname, relname " @@ -722,12 +748,11 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); - pg::dl_catalog::ensure_catalog(root_path, creds); - pg::dl_catalog::database_meta db_meta; + pg::dl_wal::ensure_catalog(root_path, creds); + pg::dl_wal::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "dropping"; - pg::dl_catalog::upsert_database(root_path, creds, db_meta); - pg::dl_catalog::bump_catalog_version(root_path, creds); + pg::dl_wal::upsert_database(root_path, creds, db_meta); elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname); } } catch (const std::exception& e) { @@ -759,8 +784,8 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); - pg::dl_catalog::ensure_catalog(root_path, creds); - pg::dl_catalog::database_meta db_meta; + pg::dl_wal::ensure_catalog(root_path, creds); + pg::dl_wal::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "ready"; @@ -781,8 +806,7 @@ static void process_utility(PlannedStmt* pstmt, } } - pg::dl_catalog::upsert_database(root_path, creds, db_meta); - pg::dl_catalog::bump_catalog_version(root_path, creds); + pg::dl_wal::upsert_database(root_path, creds, db_meta); elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname); } } catch (const std::exception& e) { @@ -791,37 +815,11 @@ static void process_utility(PlannedStmt* pstmt, } } - // Post-hook: record CREATE SCHEMA in S3 catalog for multi-instance sync - if (IsA(pstmt->utilityStmt, CreateSchemaStmt) && pg::stateless_enabled) { + // Post-hook: record CREATE SCHEMA in DDL WAL log + if (IsA(pstmt->utilityStmt, CreateSchemaStmt)) { CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt; - try { - auto root_path = pg::session_credentials::get_root_path(); - if (root_path.empty()) { - root_path = pg::utils::get_deeplake_root_directory(); - } - if (!root_path.empty() && schemastmt->schemaname != nullptr) { - auto creds = pg::session_credentials::get_credentials(); - const char* dbname = get_database_name(MyDatabaseId); - std::string db_name = dbname ? dbname : "postgres"; - if (dbname) pfree(const_cast(dbname)); - - pg::dl_catalog::ensure_catalog(root_path, creds); - pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds); - - pg::dl_catalog::schema_meta s_meta; - s_meta.schema_name = schemastmt->schemaname; - s_meta.state = "ready"; - if (schemastmt->authrole != nullptr) { - s_meta.owner = schemastmt->authrole->rolename; - } - pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta); - - pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials()); - pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials()); - elog(DEBUG1, "pg_deeplake: recorded CREATE SCHEMA '%s' in catalog", schemastmt->schemaname); - } - } catch (const std::exception& e) { - elog(DEBUG1, "pg_deeplake: failed to record CREATE SCHEMA in catalog: %s", e.what()); + if (schemastmt->schemaname != nullptr) { + append_to_ddl_log_if_needed("CREATE SCHEMA", schemastmt->schemaname, queryString); } } @@ -1323,6 +1321,59 @@ static void process_utility(PlannedStmt* pstmt, } } } + + if (nodeTag(pstmt->utilityStmt) == T_DropStmt) { + DropStmt* stmt = (DropStmt*)pstmt->utilityStmt; + if (stmt->removeType == OBJECT_SCHEMA) { + append_to_ddl_log_if_needed("DROP SCHEMA", nullptr, queryString); + } else if (stmt->removeType == OBJECT_TABLE) { + append_to_ddl_log_if_needed("DROP TABLE", nullptr, queryString); + } else if (stmt->removeType == OBJECT_INDEX) { + append_to_ddl_log_if_needed("DROP INDEX", nullptr, queryString); + } else if (stmt->removeType == OBJECT_VIEW) { + append_to_ddl_log_if_needed("DROP VIEW", nullptr, queryString); + } + } + + if (IsA(pstmt->utilityStmt, CreateStmt)) { + CreateStmt* stmt = (CreateStmt*)pstmt->utilityStmt; + if (stmt->accessMethod != nullptr && std::strcmp(stmt->accessMethod, "deeplake") == 0) { + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("CREATE TABLE", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, AlterTableStmt)) { + if (queryString != nullptr && strncasecmp(queryString, "ALTER TABLE", 11) == 0) { + AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("ALTER TABLE", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, IndexStmt)) { + if (queryString && strncasecmp(queryString, "CREATE", 6) == 0 && + strncasecmp(queryString, "CREATE TABLE", 12) != 0) { + IndexStmt* stmt = (IndexStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("CREATE INDEX", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, ViewStmt)) { + ViewStmt* stmt = (ViewStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->view->schemaname ? stmt->view->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->view->relname; + append_to_ddl_log_if_needed("CREATE VIEW", object_id.c_str(), queryString); + } + + if (IsA(pstmt->utilityStmt, RenameStmt)) { + append_to_ddl_log_if_needed("RENAME", nullptr, queryString); + } + if (IsA(pstmt->utilityStmt, VariableSetStmt)) { VariableSetStmt* vstmt = (VariableSetStmt*)pstmt->utilityStmt; if (vstmt->name != nullptr && pg_strcasecmp(vstmt->name, "search_path") == 0) { @@ -1600,26 +1651,6 @@ static void executor_end(QueryDesc* query_desc) } if (pg::query_info::is_in_executor_context(query_desc)) { - if (query_desc->operation == CMD_INSERT || query_desc->operation == CMD_UPDATE || - query_desc->operation == CMD_DELETE || query_desc->operation == CMD_UTILITY) { - // Use PG_TRY/CATCH to handle errors during flush without cascading aborts - PG_TRY(); - { - if (!pg::table_storage::instance().flush_all()) { - pg::table_storage::instance().rollback_all(); - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to flush table storage"))); - } - } - PG_CATCH(); - { - // Error occurred during flush - rollback and suppress to prevent cascade - // This prevents "Deeplake does not support transaction aborts" cascade - pg::table_storage::instance().rollback_all(); - // Don't re-throw - let the transaction abort naturally - FlushErrorState(); - } - PG_END_TRY(); - } pg::query_info::pop_context(query_desc); pg::table_storage::instance().reset_requested_columns(); } diff --git a/cpp/deeplake_pg/pg_deeplake.cpp b/cpp/deeplake_pg/pg_deeplake.cpp index 1453800c9e..405fe08e12 100644 --- a/cpp/deeplake_pg/pg_deeplake.cpp +++ b/cpp/deeplake_pg/pg_deeplake.cpp @@ -1,5 +1,5 @@ #include "pg_deeplake.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "logger.hpp" #include "table_storage.hpp" #include "utils.hpp" @@ -11,6 +11,7 @@ extern "C" { #endif +#include #include #include #include @@ -264,41 +265,6 @@ void save_index_metadata(Oid oid) if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata"))); } - - // Persist index to shared catalog for stateless multi-instance sync. - // Skip when in catalog-only mode — the table was synced FROM the catalog, - // so writing back would be redundant and cause version bump loops. - if (pg::stateless_enabled && !pg::table_storage::is_catalog_only_create()) { - try { - auto root_dir = pg::session_credentials::get_root_path(); - if (root_dir.empty()) { - root_dir = pg::utils::get_deeplake_root_directory(); - } - if (!root_dir.empty()) { - auto creds = pg::session_credentials::get_credentials(); - const char* dbname = get_database_name(MyDatabaseId); - std::string db_name = dbname ? dbname : "postgres"; - if (dbname) pfree(const_cast(dbname)); - - const std::string& table_id = idx_info.table_name(); // already schema-qualified - - pg::dl_catalog::index_meta idx_meta; - idx_meta.table_id = table_id; - idx_meta.column_names = idx_info.get_column_names_string(); - idx_meta.index_type = std::string(deeplake_core::deeplake_index_type::to_string(idx_info.index_type())); - idx_meta.order_type = static_cast(idx_info.order_type()); - - std::vector indexes = {idx_meta}; - pg::dl_catalog::upsert_indexes(root_dir, db_name, creds, indexes); - pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, creds); - pg::dl_catalog::bump_catalog_version(root_dir, creds); - } - } catch (const std::exception& e) { - elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: %s", e.what()); - } catch (...) { - elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: unknown error"); - } - } } void load_index_metadata() @@ -392,6 +358,23 @@ void deeplake_xact_callback(XactEvent event, void *arg) } } +void deeplake_subxact_callback(SubXactEvent event, + SubTransactionId my_subid, + SubTransactionId parent_subid, + void* arg) +{ + switch (event) { + case SUBXACT_EVENT_ABORT_SUB: + pg::table_storage::instance().rollback_subxact(my_subid); + break; + case SUBXACT_EVENT_COMMIT_SUB: + pg::table_storage::instance().commit_subxact(my_subid, parent_subid); + break; + default: + break; + } +} + void init_deeplake() { static bool initialized = false; @@ -416,6 +399,7 @@ void init_deeplake() pg::table_storage::instance(); /// initialize table storage RegisterXactCallback(deeplake_xact_callback, nullptr); + RegisterSubXactCallback(deeplake_subxact_callback, nullptr); } } // namespace pg diff --git a/cpp/deeplake_pg/pg_version_compat.h b/cpp/deeplake_pg/pg_version_compat.h index efb09e90f4..3b65c53fab 100644 --- a/cpp/deeplake_pg/pg_version_compat.h +++ b/cpp/deeplake_pg/pg_version_compat.h @@ -1,6 +1,5 @@ #pragma once -#define PG_VERSION_NUM_16 160000 #define PG_VERSION_NUM_17 170000 #define PG_VERSION_NUM_18 180000 diff --git a/cpp/deeplake_pg/sync_worker.cpp b/cpp/deeplake_pg/sync_worker.cpp index f56fbd935f..1b328b8d52 100644 --- a/cpp/deeplake_pg/sync_worker.cpp +++ b/cpp/deeplake_pg/sync_worker.cpp @@ -28,7 +28,7 @@ extern "C" { #include "sync_worker.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "table_storage.hpp" #include "utils.hpp" @@ -37,7 +37,11 @@ extern "C" { #include #include +#include #include +#include +#include +#include #include #include @@ -130,6 +134,75 @@ void pending_install_queue::drain_and_install() namespace { +std::string wal_checkpoint_file_path() +{ + if (DataDir == nullptr) { + return "/tmp/pg_deeplake_wal_checkpoints.tsv"; + } + return std::string(DataDir) + "/pg_deeplake_wal_checkpoints.tsv"; +} + +std::string checkpoint_key(const std::string& root_path, const std::string& db_name) +{ + return root_path + "\t" + db_name; +} + +void load_wal_checkpoints(std::unordered_map& checkpoints) +{ + checkpoints.clear(); + std::ifstream in(wal_checkpoint_file_path()); + if (!in.is_open()) { + return; + } + + std::string line; + while (std::getline(in, line)) { + if (line.empty()) { + continue; + } + std::istringstream ss(line); + std::string root_path; + std::string db_name; + std::string seq_str; + if (!std::getline(ss, root_path, '\t') || + !std::getline(ss, db_name, '\t') || + !std::getline(ss, seq_str)) { + continue; + } + try { + checkpoints[checkpoint_key(root_path, db_name)] = std::stoll(seq_str); + } catch (...) { + } + } +} + +void persist_wal_checkpoints(const std::unordered_map& checkpoints) +{ + const std::string path = wal_checkpoint_file_path(); + const std::string tmp_path = path + ".tmp"; + + std::ofstream out(tmp_path, std::ios::trunc); + if (!out.is_open()) { + elog(WARNING, "pg_deeplake sync: failed to open WAL checkpoint tmp file: %s", tmp_path.c_str()); + return; + } + + for (const auto& kv : checkpoints) { + const size_t sep = kv.first.find('\t'); + if (sep == std::string::npos) { + continue; + } + out << kv.first.substr(0, sep) << '\t' + << kv.first.substr(sep + 1) << '\t' + << kv.second << '\n'; + } + out.close(); + + if (std::rename(tmp_path.c_str(), path.c_str()) != 0) { + elog(WARNING, "pg_deeplake sync: failed to persist WAL checkpoints to %s", path.c_str()); + } +} + // Worker state - use sig_atomic_t for signal safety volatile sig_atomic_t got_sigterm = false; volatile sig_atomic_t got_sighup = false; @@ -217,7 +290,7 @@ bool execute_via_libpq(const char* dbname, const char* sql) */ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::string_map<> creds) { - auto catalog_databases = pg::dl_catalog::load_databases(root_path, creds); + auto catalog_databases = pg::dl_wal::load_databases(root_path, creds); for (const auto& db : catalog_databases) { // Skip system databases @@ -309,170 +382,114 @@ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::str } } -/** - * Sync schemas for a specific database from pre-loaded catalog data via libpq. - * Creates missing schemas in the target database. - */ -void deeplake_sync_schemas_for_db(const std::string& db_name, - const std::vector& schemas) +bool is_harmless_replay_error(const char* sqlstate) { - for (const auto& meta : schemas) { - if (meta.state == "dropping") { - continue; - } - - // Skip system schemas - if (meta.schema_name == "public" || meta.schema_name == "pg_catalog" || - meta.schema_name == "information_schema" || - meta.schema_name.substr(0, 3) == "pg_") { - continue; - } - - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", - quote_identifier(meta.schema_name.c_str())); - - if (execute_via_libpq(db_name.c_str(), buf.data)) { - elog(LOG, "pg_deeplake sync: created schema '%s' in database '%s'", - meta.schema_name.c_str(), db_name.c_str()); - } - - pfree(buf.data); + if (sqlstate == nullptr) { + return false; } + return strcmp(sqlstate, "42P07") == 0 || // duplicate_table + strcmp(sqlstate, "42P06") == 0 || // duplicate_schema + strcmp(sqlstate, "42701") == 0 || // duplicate_column + strcmp(sqlstate, "42710") == 0 || // duplicate_object + strcmp(sqlstate, "42704") == 0 || // undefined_object + strcmp(sqlstate, "42P01") == 0 || // undefined_table + strcmp(sqlstate, "3F000") == 0; // invalid_schema_name } -/** - * Sync tables for a specific database from pre-loaded catalog data via libpq. - * Creates missing tables in the target database. - */ -/** - * Parse comma-separated column names string into a vector. - * The column_names string uses trailing comma format: "col1,col2," - */ -std::vector parse_column_names(const std::string& column_names) +void deeplake_replay_ddl_log_for_db(const std::string& db_name, const std::string& root_path, + icm::string_map<> creds, int64_t& last_seq) { - std::vector result; - std::string current; - for (char c : column_names) { - if (c == ',') { - if (!current.empty()) { - result.push_back(current); - current.clear(); - } - } else { - current += c; + auto entries = pg::dl_wal::load_ddl_log(root_path, db_name, creds, last_seq); + for (const auto& entry : entries) { + if (entry.seq > last_seq) { + last_seq = entry.seq; } - } - if (!current.empty()) { - result.push_back(current); - } - return result; -} - -void deeplake_sync_tables_for_db(const std::string& db_name, - const std::vector& tables, - const std::vector& columns, - const std::vector& indexes) -{ - for (const auto& meta : tables) { - if (meta.state == "dropping") { + if (entry.origin_instance_id == pg::dl_wal::local_instance_id()) { continue; } - const std::string qualified_name = meta.schema_name + "." + meta.table_name; - - // Gather columns for this table, sorted by position - std::vector table_columns; - for (const auto& col : columns) { - if (col.table_id == meta.table_id) { - table_columns.push_back(col); - } + StringInfoData sql; + initStringInfo(&sql); + appendStringInfo(&sql, "SET application_name = 'pg_deeplake_sync'; "); + if (!entry.search_path.empty()) { + appendStringInfo(&sql, + "SELECT pg_catalog.set_config('search_path', %s, false); ", + quote_literal_cstr(entry.search_path.c_str())); } - std::sort(table_columns.begin(), table_columns.end(), - [](const auto& a, const auto& b) { return a.position < b.position; }); + appendStringInfoString(&sql, entry.ddl_sql.c_str()); - if (table_columns.empty()) { - elog(DEBUG1, "pg_deeplake sync: no columns for %s in db %s, skipping", - qualified_name.c_str(), db_name.c_str()); - continue; - } + const char* port = GetConfigOption("port", true, false); + const char* socket_dir = GetConfigOption("unix_socket_directories", true, false); - // Find indexes for this table - std::vector table_indexes; - for (const auto& idx : indexes) { - if (idx.table_id == meta.table_id) { - table_indexes.push_back(idx); - } + StringInfoData conninfo; + initStringInfo(&conninfo); + appendStringInfo(&conninfo, "dbname=%s", db_name.c_str()); + if (port != nullptr) { + appendStringInfo(&conninfo, " port=%s", port); } - - // Determine which columns are part of a primary key (inverted_index on non-nullable columns) - // The primary key columns are stored as comma-separated names in column_names - std::vector pk_columns; - for (const auto& idx : table_indexes) { - if (idx.index_type == "inverted_index") { - pk_columns = parse_column_names(idx.column_names); - break; + if (socket_dir != nullptr) { + char* dir_copy = pstrdup(socket_dir); + char* comma = strchr(dir_copy, ','); + if (comma != nullptr) { + *comma = '\0'; + } + char* dir = dir_copy; + while (*dir == ' ') { + dir++; } + appendStringInfo(&conninfo, " host=%s", dir); + pfree(dir_copy); } - const char* qschema = quote_identifier(meta.schema_name.c_str()); - const char* qtable = quote_identifier(meta.table_name.c_str()); - - // Combine schema + table creation into a single SQL statement - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s; ", qschema); - appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); - - bool first = true; - for (const auto& col : table_columns) { - if (!first) { - appendStringInfoString(&buf, ", "); - } - first = false; - appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); + PGconn* conn = PQconnectdb(conninfo.data); + pfree(conninfo.data); + if (PQstatus(conn) != CONNECTION_OK) { + elog(WARNING, "pg_deeplake sync: libpq connect failed for '%s': %s", db_name.c_str(), PQerrorMessage(conn)); + PQfinish(conn); + pfree(sql.data); + continue; } - // Add PRIMARY KEY table constraint if we have PK columns - if (!pk_columns.empty()) { - appendStringInfoString(&buf, ", PRIMARY KEY ("); - for (size_t i = 0; i < pk_columns.size(); ++i) { - if (i > 0) { - appendStringInfoString(&buf, ", "); - } - appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str())); + PGresult* res = PQexec(conn, sql.data); + const ExecStatusType status = PQresultStatus(res); + bool ok = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK; + if (!ok) { + const char* sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + if (!is_harmless_replay_error(sqlstate)) { + elog(WARNING, + "pg_deeplake sync: DDL WAL replay failed in '%s' [%s]: %s (SQL: %.200s)", + db_name.c_str(), + entry.command_tag.c_str(), + PQerrorMessage(conn), + entry.ddl_sql.c_str()); + } else { + ok = true; } - appendStringInfoChar(&buf, ')'); } - - appendStringInfo(&buf, ") USING deeplake"); - - if (execute_via_libpq(db_name.c_str(), buf.data)) { - elog(LOG, "pg_deeplake sync: created table %s in database %s", - qualified_name.c_str(), db_name.c_str()); + if (ok) { + elog(LOG, "pg_deeplake sync: replayed %s in '%s'", entry.command_tag.c_str(), db_name.c_str()); } - - pfree(buf.data); + PQclear(res); + PQfinish(conn); + pfree(sql.data); } } /** - * Sync all databases: check per-db versions in parallel, load changed ones, - * create missing tables via libpq. + * Sync all databases: check per-db versions in parallel, replay new DDL WAL entries. * * Called OUTSIDE transaction context. */ void sync_all_databases( const std::string& root_path, icm::string_map<> creds, - std::unordered_map& last_db_versions) + std::unordered_map& last_db_seqs) { // Step 1: Sync databases (create missing ones, install extension) deeplake_sync_databases_from_catalog(root_path, creds); // Step 2: Get list of all databases from the shared catalog - auto databases = pg::dl_catalog::load_databases(root_path, creds); + auto databases = pg::dl_wal::load_databases(root_path, creds); // Always include "postgres" which may not be in the databases catalog bool has_postgres = false; @@ -480,78 +497,37 @@ void sync_all_databases( if (db.db_name == "postgres") { has_postgres = true; break; } } if (!has_postgres) { - pg::dl_catalog::database_meta pg_meta; + pg::dl_wal::database_meta pg_meta; pg_meta.db_name = "postgres"; pg_meta.state = "ready"; databases.push_back(std::move(pg_meta)); } - // Step 3: Open per-db meta tables and check versions in parallel - std::vector db_names; - std::vector> meta_handles; - + // Step 3: For each database, replay DDL WAL entries + // (cheap if no new entries due to after_seq filtering) + bool checkpoints_updated = false; for (const auto& db : databases) { if (db.db_name == "template0" || db.db_name == "template1") { continue; } try { - auto handle = pg::dl_catalog::open_db_meta_table(root_path, db.db_name, creds); - if (handle) { - db_names.push_back(db.db_name); - meta_handles.push_back(std::move(handle)); + const std::string key = checkpoint_key(root_path, db.db_name); + int64_t& last_seq = last_db_seqs[key]; + const int64_t prev_seq = last_seq; + deeplake_replay_ddl_log_for_db(db.db_name, root_path, creds, last_seq); + if (last_seq != prev_seq) { + checkpoints_updated = true; } + elog(LOG, "pg_deeplake sync: replayed DDL WAL for '%s' (last_seq=%ld)", db.db_name.c_str(), last_seq); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db.db_name.c_str(), e.what()); } catch (...) { - // Per-db catalog may not exist yet — skip silently - elog(DEBUG1, "pg_deeplake sync: no per-db catalog for '%s', skipping", db.db_name.c_str()); - } - } - - if (db_names.empty()) { - return; - } - - // Fire all version() promises in parallel (1 round-trip wall-clock) - icm::vector> version_promises; - version_promises.reserve(db_names.size()); - for (auto& handle : meta_handles) { - version_promises.push_back(handle->version()); - } - auto versions = async::combine(std::move(version_promises)).get_future().get(); - - // Step 4: Identify databases whose version changed since last sync - std::vector changed_dbs; - for (size_t i = 0; i < db_names.size(); ++i) { - int64_t ver = static_cast(versions[i]); - auto it = last_db_versions.find(db_names[i]); - if (it == last_db_versions.end() || it->second != ver) { - changed_dbs.push_back(db_names[i]); - last_db_versions[db_names[i]] = ver; + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': unknown error", db.db_name.c_str()); } } - if (changed_dbs.empty()) { - return; - } - - // Step 5: For each changed database, load schemas first, then tables+columns and sync - for (const auto& db_name : changed_dbs) { - try { - // Sync schemas before tables so CREATE TABLE can find the target schema - auto schemas = pg::dl_catalog::load_schemas(root_path, db_name, creds); - if (!schemas.empty()) { - deeplake_sync_schemas_for_db(db_name, schemas); - } - - auto [tables, columns] = pg::dl_catalog::load_tables_and_columns(root_path, db_name, creds); - auto indexes = pg::dl_catalog::load_indexes(root_path, db_name, creds); - deeplake_sync_tables_for_db(db_name, tables, columns, indexes); - elog(LOG, "pg_deeplake sync: synced %zu schemas, %zu tables, %zu indexes for database '%s'", - schemas.size(), tables.size(), indexes.size(), db_name.c_str()); - } catch (const std::exception& e) { - elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db_name.c_str(), e.what()); - } catch (...) { - elog(WARNING, "pg_deeplake sync: failed to sync database '%s': unknown error", db_name.c_str()); - } + if (checkpoints_updated) { + persist_wal_checkpoints(last_db_seqs); } } @@ -575,8 +551,8 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) int64_t last_catalog_version = 0; std::string last_root_path; // Track root_path to detect changes - std::unordered_map last_db_versions; - + std::unordered_map last_db_seqs; + load_wal_checkpoints(last_db_seqs); while (!got_sigterm) { // Process pending interrupts (including ProcSignalBarrier from DROP DATABASE) @@ -588,6 +564,10 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) ProcessConfigFile(PGC_SIGHUP); } + // Always drain pending extension installs first so CREATE DATABASE + // async installs are not starved behind expensive sync work. + pg::pending_install_queue::drain_and_install(); + // Variables to carry state across transaction boundaries // (declared before goto target to avoid crossing initialization) std::string sync_root_path; @@ -622,13 +602,13 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) if (!last_root_path.empty()) { pg::table_storage::instance().reset_and_load_table_metadata(); last_catalog_version = 0; - last_db_versions.clear(); + last_db_seqs.clear(); } last_root_path = root_path; } - // Fast global version check (single HEAD request via cached meta table) - int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds); + // Fast global version check via databases catalog_table + int64_t current_version = pg::dl_wal::get_databases_version(root_path, creds); if (current_version != last_catalog_version) { // Save state for sync (which happens outside transaction) @@ -653,7 +633,7 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) // All sync happens OUTSIDE transaction context via libpq if (need_sync && !sync_root_path.empty()) { try { - sync_all_databases(sync_root_path, sync_creds, last_db_versions); + sync_all_databases(sync_root_path, sync_creds, last_db_seqs); elog(DEBUG1, "pg_deeplake sync: completed (global version %ld)", last_catalog_version); } catch (const std::exception& e) { elog(WARNING, "pg_deeplake sync: sync failed: %s", e.what()); @@ -664,9 +644,6 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) pgstat_report_stat(true); - // Drain any databases queued for async extension install - pg::pending_install_queue::drain_and_install(); - wait_for_latch: // Wait for latch or timeout (void)WaitLatch(MyLatch, diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index 5625826e2d..cc0242a19e 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -313,7 +313,6 @@ bool deeplake_scan_analyze_next_tuple( return true; } -#if PG_VERSION_NUM >= PG_VERSION_NUM_17 bool deeplake_scan_analyze_next_block(TableScanDesc scan, ReadStream* stream) { DeeplakeScanData* scan_data = get_scan_data(scan); @@ -349,7 +348,6 @@ bool deeplake_scan_analyze_next_block(TableScanDesc scan, ReadStream* stream) return true; // Indicate we have data to process } -#endif double deeplake_index_build_range_scan(Relation heap_rel, Relation index_rel, @@ -657,9 +655,7 @@ void deeplake_table_am_routine::initialize() routine.relation_size = deeplake_relation_size; routine.relation_estimate_size = deeplake_estimate_rel_size; routine.scan_analyze_next_tuple = deeplake_scan_analyze_next_tuple; -#if PG_VERSION_NUM >= PG_VERSION_NUM_17 routine.scan_analyze_next_block = deeplake_scan_analyze_next_block; -#endif routine.parallelscan_initialize = parallelscan_initialize; routine.parallelscan_estimate = parallelscan_estimate; diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 2bd45b63f3..a3bbd9115e 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,14 @@ using string_stream_array_holder = nd::string_stream_array_holder; struct table_data { + struct tx_snapshot + { + icm::string_map insert_rows_sizes; + size_t delete_rows_size = 0; + size_t update_rows_size = 0; + int64_t num_total_rows = 0; + }; + inline table_data(Oid table_oid, const std::string& table_name, TupleDesc tupdesc, @@ -79,6 +88,9 @@ struct table_data inline void clear_delete_rows() noexcept; inline void add_update_row(int64_t row_id, icm::string_map update_row); inline void clear_update_rows() noexcept; + inline tx_snapshot capture_tx_snapshot() const; + inline void restore_tx_snapshot(const tx_snapshot& snapshot); + inline void rollback(); inline Oid get_table_oid() const noexcept; inline bool flush(); diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index a88bb6e0e8..4575ef2c0f 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -84,12 +84,19 @@ inline table_data::table_data( inline void table_data::commit(bool show_progress) { - if (dataset_ == nullptr || !dataset_->has_uncommitted_changes()) { + if (dataset_ == nullptr) { + return; + } + const bool has_pending_ops = !insert_rows_.empty() || !delete_rows_.empty() || + !update_rows_.empty() || !insert_promises_.empty(); + if (!has_pending_ops && !dataset_->has_uncommitted_changes()) { return; } try { flush(); - impl::commit_dataset(get_dataset(), show_progress); + if (dataset_->has_uncommitted_changes()) { + impl::commit_dataset(get_dataset(), show_progress); + } } catch (const std::exception& e) { reset_insert_rows(); clear_delete_rows(); @@ -105,6 +112,66 @@ inline void table_data::commit(bool show_progress) force_refresh(); } +inline void table_data::rollback() +{ + // Clear any local staged operations first. + reset_insert_rows(); + clear_delete_rows(); + clear_update_rows(); + streamers_.reset(); + + // Reopen dataset handles to drop any in-memory uncommitted state associated + // with the current object instance. + if (dataset_) { + dataset_.reset(); + refreshing_dataset_.reset(); + open_dataset(false); + } + + // Align cached row count with the current committed dataset state. + if (dataset_) { + num_total_rows_ = dataset_->num_rows(); + } +} + +inline table_data::tx_snapshot table_data::capture_tx_snapshot() const +{ + tx_snapshot snapshot; + snapshot.delete_rows_size = delete_rows_.size(); + snapshot.update_rows_size = update_rows_.size(); + snapshot.num_total_rows = num_total_rows_; + + for (const auto& [column_name, values] : insert_rows_) { + snapshot.insert_rows_sizes[column_name] = values.size(); + } + return snapshot; +} + +inline void table_data::restore_tx_snapshot(const tx_snapshot& snapshot) +{ + // Remove or truncate staged inserts to the savepoint snapshot. + for (auto it = insert_rows_.begin(); it != insert_rows_.end();) { + auto snapshot_it = snapshot.insert_rows_sizes.find(it->first); + if (snapshot_it == snapshot.insert_rows_sizes.end()) { + it = insert_rows_.erase(it); + continue; + } + if (it->second.size() > snapshot_it->second) { + it->second.resize(snapshot_it->second); + } + ++it; + } + + if (delete_rows_.size() > snapshot.delete_rows_size) { + delete_rows_.resize(snapshot.delete_rows_size); + } + if (update_rows_.size() > snapshot.update_rows_size) { + update_rows_.resize(snapshot.update_rows_size); + } + num_total_rows_ = snapshot.num_total_rows; + streamers_.reset(); +} + inline void table_data::open_dataset(bool create) { elog(DEBUG1, "Opening dataset at path: %s (create=%s)", dataset_path_.url().c_str(), create ? "true" : "false"); @@ -116,6 +183,9 @@ inline void table_data::open_dataset(bool create) dataset_ = deeplake_api::open(dataset_path_, std::move(creds)).get_future().get(); } ASSERT(dataset_ != nullptr); + // PostgreSQL transaction boundaries must control durability. Disable + // DeepLake auto-commit so writes are only persisted on explicit commit(). + dataset_->set_auto_commit_enabled(false).get_future().get(); num_total_rows_ = dataset_->num_rows(); // Enable logging if GUC parameter is set @@ -347,7 +417,7 @@ inline void table_data::add_insert_slots(int32_t nslots, TupleTableSlot** slots) } num_total_rows_ += nslots; const auto num_inserts = insert_rows_.begin()->second.size(); - if (num_inserts >= 512) { + if (num_inserts >= 512 && GetCurrentTransactionNestLevel() <= 1) { flush_inserts(); } } diff --git a/cpp/deeplake_pg/table_storage.cpp b/cpp/deeplake_pg/table_storage.cpp index b18b84b406..61e2acf474 100644 --- a/cpp/deeplake_pg/table_storage.cpp +++ b/cpp/deeplake_pg/table_storage.cpp @@ -32,11 +32,12 @@ extern "C" { #include "table_storage.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "exceptions.hpp" #include "logger.hpp" #include "memory_tracker.hpp" #include "nd_utils.hpp" +#include "pg_version_compat.h" #include "table_ddl_lock.hpp" #include "table_scan.hpp" #include "utils.hpp" @@ -239,74 +240,6 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) } return true; }); - - // Also write into Deep Lake catalog for stateless multi-instance support. - // Skip when in catalog-only mode — the data was synced FROM the S3 catalog, - // so writing back would be redundant and add unnecessary S3 latency. - if (pg::stateless_enabled && !is_catalog_only_create()) { - const auto root_dir = []() { - auto root = session_credentials::get_root_path(); - if (root.empty()) { - root = pg::utils::get_deeplake_root_directory(); - } - return root; - }(); - if (root_dir.empty()) { - return; - } - auto creds = session_credentials::get_credentials(); - const auto db_name = get_current_database_name(); - pg::dl_catalog::ensure_catalog(root_dir, creds); - pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); - - auto [schema_name, simple_table_name] = split_table_name(table_name); - const std::string table_id = schema_name + "." + simple_table_name; - - pg::dl_catalog::table_meta meta; - meta.table_id = table_id; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = ds_path; - meta.state = "ready"; - meta.db_name = db_name; - pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); - - // Save column metadata to catalog - TupleDesc tupdesc = table_data.get_tuple_descriptor(); - std::vector columns; - for (int i = 0; i < tupdesc->natts; i++) { - Form_pg_attribute attr = TupleDescAttr(tupdesc, i); - if (attr->attisdropped) { - continue; - } - pg::dl_catalog::column_meta col; - col.table_id = table_id; - col.column_name = NameStr(attr->attname); - col.pg_type = format_type_with_typemod(attr->atttypid, attr->atttypmod); - col.nullable = !attr->attnotnull; - col.position = i; - columns.push_back(std::move(col)); - } - pg::dl_catalog::upsert_columns(root_dir, db_name, creds, columns); - - // Belt-and-suspenders: ensure the schema is recorded even if - // the CREATE SCHEMA hook was missed (e.g., schema created before - // the extension was loaded, or via a different code path). - if (schema_name != "public") { - try { - pg::dl_catalog::schema_meta s_meta; - s_meta.schema_name = schema_name; - s_meta.state = "ready"; - pg::dl_catalog::upsert_schema(root_dir, db_name, creds, s_meta); - } catch (...) { - elog(DEBUG1, "pg_deeplake: failed to upsert schema '%s' in catalog (non-fatal)", schema_name.c_str()); - } - } - - pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - } } void table_storage::load_table_metadata() @@ -328,204 +261,74 @@ void table_storage::load_table_metadata() }(); auto creds = session_credentials::get_credentials(); - // Stateless catalog sync (only when enabled and root_dir is configured) + // Stateless sync via DDL WAL replay (only when enabled and root_dir is configured) if (pg::stateless_enabled && !root_dir.empty()) { const auto db_name = get_current_database_name(); - // Fast path: if already loaded, just check per-db version - if (tables_loaded_) { - const auto current_version = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, creds); - if (current_version == catalog_version_) { - return; - } - // Version changed, need to reload - tables_.clear(); - views_.clear(); - tables_loaded_ = false; - catalog_version_ = current_version; - } - // Ensure both shared and per-database catalogs exist - pg::dl_catalog::ensure_catalog(root_dir, creds); - const auto version = pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); - if (catalog_version_ == 0) { - catalog_version_ = version; - } - tables_loaded_ = true; - - // Load tables, columns, and indexes from per-database path - auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_dir, db_name, creds); - auto catalog_indexes = pg::dl_catalog::load_indexes(root_dir, db_name, creds); - - if (!catalog_tables.empty()) { - for (const auto& meta : catalog_tables) { - if (meta.state == "dropping") { + pg::dl_wal::ensure_catalog(root_dir, creds); + pg::dl_wal::ensure_db_catalog(root_dir, db_name, creds); + + auto is_sync_replay_backend = []() { + const char* app_name = GetConfigOption("application_name", true, false); + return app_name != nullptr && strcmp(app_name, "pg_deeplake_sync") == 0; + }; + if (!in_ddl_context() && !AmBackgroundWorkerProcess() && !is_sync_replay_backend()) { + auto entries = pg::dl_wal::load_ddl_log(root_dir, db_name, creds, ddl_log_last_seq_); + if (!entries.empty()) { + elog(LOG, "pg_deeplake: DDL WAL replay: %zu entries to process for db '%s' (after seq %ld)", + entries.size(), db_name.c_str(), ddl_log_last_seq_); + } + for (const auto& entry : entries) { + if (entry.seq > ddl_log_last_seq_) { + ddl_log_last_seq_ = entry.seq; + } + if (entry.origin_instance_id == pg::dl_wal::local_instance_id()) { continue; } - const std::string qualified_name = meta.schema_name + "." + meta.table_name; - auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1); - Oid relid = RangeVarGetRelid(rel, NoLock, true); - if (!OidIsValid(relid)) { - // Table exists in catalog but not in PostgreSQL. - if (in_ddl_context()) { - // During DDL (CREATE TABLE), skip auto-creation to avoid races. - // The table might be in the middle of being created by another backend. - continue; - } - // Gather columns for this table, sorted by position - std::vector table_columns; - for (const auto& col : catalog_columns) { - if (col.table_id == meta.table_id) { - table_columns.push_back(col); - } - } - std::sort(table_columns.begin(), table_columns.end(), - [](const auto& a, const auto& b) { return a.position < b.position; }); - - if (table_columns.empty()) { - elog(WARNING, "No columns found for catalog table %s, skipping", qualified_name.c_str()); - continue; - } - - // Find primary key columns from indexes - std::vector pk_columns; - for (const auto& idx : catalog_indexes) { - if (idx.table_id == meta.table_id && idx.index_type == "inverted_index") { - // Parse comma-separated column names - std::string current; - for (char c : idx.column_names) { - if (c == ',') { - if (!current.empty()) { - pk_columns.push_back(current); - current.clear(); - } - } else { - current += c; - } - } - if (!current.empty()) { - pk_columns.push_back(current); - } - break; - } - } - - // Build CREATE TABLE IF NOT EXISTS from catalog metadata. - // Wrap in a subtransaction so that if another backend concurrently - // creates the same table (race on composite type), the error is - // caught and we continue instead of aborting the session. - const char* qschema = quote_identifier(meta.schema_name.c_str()); - const char* qtable = quote_identifier(meta.table_name.c_str()); - - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); - - bool first = true; - for (const auto& col : table_columns) { - if (!first) { - appendStringInfoString(&buf, ", "); - } - first = false; - appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); - } - - // Add PRIMARY KEY table constraint if found in catalog indexes - if (!pk_columns.empty()) { - appendStringInfoString(&buf, ", PRIMARY KEY ("); - for (size_t i = 0; i < pk_columns.size(); ++i) { - if (i > 0) { - appendStringInfoString(&buf, ", "); - } - appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str())); - } - appendStringInfoChar(&buf, ')'); - } - - appendStringInfo(&buf, ") USING deeplake"); - - MemoryContext saved_context = CurrentMemoryContext; - ResourceOwner saved_owner = CurrentResourceOwner; - - BeginInternalSubTransaction(NULL); - PG_TRY(); - { - table_storage::set_catalog_only_create(true); - if (SPI_connect() != SPI_OK_CONNECT) { - elog(ERROR, "Could not connect to SPI manager"); - } - bool pushed_snapshot = false; - if (!ActiveSnapshotSet()) { - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_snapshot = true; - } - - // Create schema if needed - StringInfoData schema_buf; - initStringInfo(&schema_buf); - appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema); - SPI_execute(schema_buf.data, false, 0); - pfree(schema_buf.data); - - SPI_execute(buf.data, false, 0); - - if (pushed_snapshot) { - PopActiveSnapshot(); - } - - SPI_finish(); - table_storage::set_catalog_only_create(false); - ReleaseCurrentSubTransaction(); + MemoryContext saved_context = CurrentMemoryContext; + ResourceOwner saved_owner = CurrentResourceOwner; + BeginInternalSubTransaction(nullptr); + PG_TRY(); + { + set_catalog_only_create(true); + pg::utils::spi_connector connector; + bool pushed_snapshot = false; + if (!ActiveSnapshotSet()) { + PushActiveSnapshot(GetTransactionSnapshot()); + pushed_snapshot = true; } - PG_CATCH(); - { - // Another backend created this table concurrently — not an error. - table_storage::set_catalog_only_create(false); - MemoryContextSwitchTo(saved_context); - CurrentResourceOwner = saved_owner; - RollbackAndReleaseCurrentSubTransaction(); - FlushErrorState(); - elog(DEBUG1, "Concurrent table creation for %s, skipping", qualified_name.c_str()); + SPI_execute(entry.ddl_sql.c_str(), false, 0); + if (pushed_snapshot) { + PopActiveSnapshot(); } - PG_END_TRY(); - - pfree(buf.data); - - relid = RangeVarGetRelid(rel, NoLock, true); - } - if (!OidIsValid(relid)) { - elog(WARNING, "Catalog table %s does not exist in PG instance", qualified_name.c_str()); - continue; - } - Relation relation = try_relation_open(relid, NoLock); - if (relation == nullptr) { - elog(WARNING, "Could not open relation for table %s", qualified_name.c_str()); - continue; + set_catalog_only_create(false); + ReleaseCurrentSubTransaction(); } + PG_CATCH(); { - pg::utils::memory_context_switcher context_switcher(TopMemoryContext); - table_data td( - relid, qualified_name, CreateTupleDescCopy(RelationGetDescr(relation)), meta.dataset_path, creds); - auto it2status = tables_.emplace(relid, std::move(td)); - up_to_date_ = false; - ASSERT(it2status.second); + set_catalog_only_create(false); + MemoryContextSwitchTo(saved_context); + CurrentResourceOwner = saved_owner; + RollbackAndReleaseCurrentSubTransaction(); + FlushErrorState(); + elog(WARNING, "pg_deeplake: DDL WAL replay failed (seq=%ld, tag=%s): %.200s", + entry.seq, entry.command_tag.c_str(), entry.ddl_sql.c_str()); } - relation_close(relation, NoLock); + PG_END_TRY(); } - - load_schema_name(); - return; } } - // Non-stateless path: load from local pg_deeplake_tables + // Load from local pg_deeplake_tables if (tables_loaded_) { return; } tables_loaded_ = true; if (!pg::utils::check_table_exists("pg_deeplake_tables")) { + elog(LOG, "pg_deeplake: pg_deeplake_tables does not exist, skipping local scan"); return; } @@ -600,8 +403,6 @@ void table_storage::load_table_metadata() creds = session_credentials::get_credentials(); std::vector invalid_table_oids; - bool catalog_seeded = false; - for (auto i = 0; i < proc; ++i) { HeapTuple tuple = tuptable->vals[i]; bool is_null = false; @@ -617,21 +418,6 @@ void table_storage::load_table_metadata() continue; } try { - // Seed the DL catalog with legacy metadata (only when stateless is enabled). - if (pg::stateless_enabled && !root_dir.empty()) { - const auto db_name = get_current_database_name(); - auto [schema_name, simple_table_name] = split_table_name(table_name); - pg::dl_catalog::table_meta meta; - meta.table_id = schema_name + "." + simple_table_name; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = ds_path; - meta.state = "ready"; - meta.db_name = db_name; - pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); - catalog_seeded = true; - } - // Get the relation and its tuple descriptor Relation rel = try_relation_open(relid, NoLock); if (rel == nullptr) { @@ -666,12 +452,6 @@ void table_storage::load_table_metadata() base::log_channel::generic, "Failed to delete invalid table metadata for table_oid: {}", invalid_oid); } } - if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) { - const auto db_name = get_current_database_name(); - pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - } load_views(); load_schema_name(); } @@ -1073,33 +853,6 @@ void table_storage::drop_table(const std::string& table_name) auto& table_data = get_table_data(table_name); auto creds = session_credentials::get_credentials(); - // Update stateless catalog if enabled - if (pg::stateless_enabled) { - const auto root_dir = []() { - auto root = session_credentials::get_root_path(); - if (root.empty()) { - root = pg::utils::get_deeplake_root_directory(); - } - return root; - }(); - if (!root_dir.empty()) { - const auto db_name = get_current_database_name(); - pg::dl_catalog::ensure_catalog(root_dir, creds); - pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); - auto [schema_name, simple_table_name] = split_table_name(table_name); - pg::dl_catalog::table_meta meta; - meta.table_id = schema_name + "." + simple_table_name; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = table_data.get_dataset_path().url(); - meta.state = "dropping"; - meta.db_name = db_name; - pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); - pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); - } - } try { table_data.commit(); // Ensure all changes are committed before deletion @@ -1124,14 +877,64 @@ void table_storage::insert_slot(Oid table_id, TupleTableSlot* slot) insert_slots(table_id, 1, &slot); } +void table_storage::mark_subxact_change(Oid table_id) +{ + if (GetCurrentTransactionNestLevel() <= 1) { + return; + } + + auto& table_data = get_table_data(table_id); + const SubTransactionId sub_id = GetCurrentSubTransactionId(); + auto& sub_snapshots = subxact_snapshots_[sub_id]; + if (!sub_snapshots.contains(table_id)) { + sub_snapshots.emplace(table_id, table_data.capture_tx_snapshot()); + } +} + +void table_storage::rollback_subxact(SubTransactionId sub_id) +{ + auto it = subxact_snapshots_.find(sub_id); + if (it == subxact_snapshots_.end()) { + return; + } + + for (const auto& [table_id, snapshot] : it->second) { + auto table_it = tables_.find(table_id); + if (table_it != tables_.end()) { + table_it->second.restore_tx_snapshot(snapshot); + } + } + subxact_snapshots_.erase(it); +} + +void table_storage::commit_subxact(SubTransactionId sub_id, SubTransactionId parent_sub_id) +{ + auto it = subxact_snapshots_.find(sub_id); + if (it == subxact_snapshots_.end()) { + return; + } + + if (parent_sub_id != InvalidSubTransactionId) { + auto& parent_snapshots = subxact_snapshots_[parent_sub_id]; + for (const auto& [table_id, snapshot] : it->second) { + if (!parent_snapshots.contains(table_id)) { + parent_snapshots.emplace(table_id, snapshot); + } + } + } + subxact_snapshots_.erase(it); +} + void table_storage::insert_slots(Oid table_id, int32_t nslots, TupleTableSlot** slots) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); table_data.add_insert_slots(nslots, slots); } bool table_storage::delete_tuple(Oid table_id, ItemPointer tid) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); try { const auto row_number = utils::tid_to_row_number(tid); @@ -1154,6 +957,7 @@ bool table_storage::delete_tuple(Oid table_id, ItemPointer tid) bool table_storage::update_tuple(Oid table_id, ItemPointer tid, HeapTuple new_tuple) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); TupleDesc tupdesc = table_data.get_tuple_descriptor(); diff --git a/cpp/deeplake_pg/table_storage.hpp b/cpp/deeplake_pg/table_storage.hpp index 62ee23ecb1..3e6383d4dc 100644 --- a/cpp/deeplake_pg/table_storage.hpp +++ b/cpp/deeplake_pg/table_storage.hpp @@ -129,7 +129,10 @@ class table_storage inline void refresh_table(Oid table_id) { - tables_.at(table_id).refresh(); + auto it = tables_.find(table_id); + if (it != tables_.end()) { + it->second.refresh(); + } } // Data operations @@ -138,6 +141,9 @@ class table_storage bool delete_tuple(Oid table_id, ItemPointer tid); bool update_tuple(Oid table_id, ItemPointer tid, HeapTuple new_tuple); bool fetch_tuple(Oid table_id, ItemPointer tid, TupleTableSlot* slot); + void mark_subxact_change(Oid table_id); + void rollback_subxact(SubTransactionId sub_id); + void commit_subxact(SubTransactionId sub_id, SubTransactionId parent_sub_id); bool flush_all() { @@ -156,15 +162,16 @@ class table_storage for (auto& [_, table_data] : tables_) { table_data.commit(); } + subxact_snapshots_.clear(); } void rollback_all() { // Rollback all changes in all tables for (auto& [_, table_data] : tables_) { - table_data.reset_insert_rows(); - table_data.clear_delete_rows(); + table_data.rollback(); } + subxact_snapshots_.clear(); } inline auto& get_tables() noexcept @@ -195,7 +202,7 @@ class table_storage tables_.clear(); views_.clear(); tables_loaded_ = false; - catalog_version_ = 0; + ddl_log_last_seq_ = 0; load_table_metadata(); } void mark_metadata_stale() noexcept @@ -312,11 +319,12 @@ class table_storage void erase_table_metadata(const std::string& table_name); std::unordered_map tables_; + std::unordered_map> subxact_snapshots_; std::unordered_map> views_; std::string schema_name_ = "public"; bool tables_loaded_ = false; bool up_to_date_ = true; - int64_t catalog_version_ = 0; + int64_t ddl_log_last_seq_ = 0; }; } // namespace pg diff --git a/postgres/scripts/build_deb.sh b/postgres/scripts/build_deb.sh index 24d3d1c073..0e50671a47 100644 --- a/postgres/scripts/build_deb.sh +++ b/postgres/scripts/build_deb.sh @@ -42,10 +42,10 @@ usage() { echo -e " repository: Repository directory path" echo -e " architecture: amd64 or arm64" echo -e " gpg-keyid: GPG key ID for signing" - echo -e " supported-versions: Comma-separated PostgreSQL versions (e.g., 16,17,18)" + echo -e " supported-versions: Comma-separated PostgreSQL versions (e.g., 17,18)" echo -e "\nExamples:" - echo -e " bash $0 4.4.4-1 /tmp/repo arm64 1F8B584DBEA11E9D 16,17,18" - echo -e " bash $0 /tmp/repo arm64 1F8B584DBEA11E9D 16,17,18 # Auto-detect version" + echo -e " bash $0 4.4.4-1 /tmp/repo arm64 1F8B584DBEA11E9D 17,18" + echo -e " bash $0 /tmp/repo arm64 1F8B584DBEA11E9D 17,18 # Auto-detect version" } # Error handling function diff --git a/postgres/scripts/install.sh b/postgres/scripts/install.sh index 7e92983922..d405c9860d 100644 --- a/postgres/scripts/install.sh +++ b/postgres/scripts/install.sh @@ -63,11 +63,11 @@ check_postgres() { if command_exists psql; then psql_version=$(psql -V | awk '{print $3}' | cut -d'.' -f1) case "$psql_version" in - 14 | 15 | 16 | 17) + 17 | 18) log "PostgreSQL version $psql_version detected." ;; *) - handle_error 1 "Unsupported PostgreSQL version: $psql_version. Only versions 14, 15, 16, and 17 are supported." + handle_error 1 "Unsupported PostgreSQL version: $psql_version. Only versions 17 and 18 are supported." ;; esac else diff --git a/postgres/tests/py_tests/test_create_database.py b/postgres/tests/py_tests/test_create_database.py index 9c0c20f6a6..78f767a74e 100644 --- a/postgres/tests/py_tests/test_create_database.py +++ b/postgres/tests/py_tests/test_create_database.py @@ -231,7 +231,7 @@ async def test_create_multiple_databases(pg_server): # --------------------------------------------------------------------------- -async def poll_for_extension(conn, timeout=10.0): +async def poll_for_extension(conn, timeout=20.0): """Poll for pg_deeplake extension. Returns True if found, False on timeout.""" deadline = asyncio.get_event_loop().time() + timeout while asyncio.get_event_loop().time() < deadline: @@ -265,7 +265,7 @@ async def test_async_extension_auto_install(pg_server): installed = await poll_for_extension(target_conn) assert installed, ( - "Sync worker should have auto-installed pg_deeplake within 10s" + "Sync worker should have auto-installed pg_deeplake within 20s" ) finally: if target_conn is not None: @@ -295,7 +295,7 @@ async def test_async_extension_auto_install_multiple(pg_server): # Poll all databases for extension remaining = set(db_names) - deadline = asyncio.get_event_loop().time() + 10.0 + deadline = asyncio.get_event_loop().time() + 20.0 while remaining and asyncio.get_event_loop().time() < deadline: for db in list(remaining): ext = await conns[db].fetchval( @@ -308,7 +308,7 @@ async def test_async_extension_auto_install_multiple(pg_server): assert not remaining, ( f"Sync worker should have installed pg_deeplake in all databases " - f"within 10s, still missing: {remaining}" + f"within 20s, still missing: {remaining}" ) finally: for c in conns.values(): diff --git a/postgres/tests/py_tests/test_stateless_reserved_schema.py b/postgres/tests/py_tests/test_stateless_reserved_schema.py index 744762c2ed..1685c3abb1 100644 --- a/postgres/tests/py_tests/test_stateless_reserved_schema.py +++ b/postgres/tests/py_tests/test_stateless_reserved_schema.py @@ -75,8 +75,17 @@ async def primary_conn(pg_server): try: await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") await conn.execute("CREATE EXTENSION pg_deeplake") + # Clean up any leftover reserved schemas from previous failed test runs + for schema in ["default", "user", "table"]: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') yield conn finally: + # Best-effort cleanup of reserved schemas on teardown + for schema in ["default", "user", "table"]: + try: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') + except Exception: + pass await conn.close() diff --git a/postgres/tests/py_tests/test_transaction_abort_handling.py b/postgres/tests/py_tests/test_transaction_abort_handling.py index be3c259b9f..462cab28cd 100644 --- a/postgres/tests/py_tests/test_transaction_abort_handling.py +++ b/postgres/tests/py_tests/test_transaction_abort_handling.py @@ -28,7 +28,6 @@ @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_error_during_insert_with_abort(db_conn: asyncpg.Connection): """ Test that errors during INSERT operations don't cause cascading aborts. @@ -104,7 +103,6 @@ async def test_guc_parameter_error_handling(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_query_error_with_pending_changes(db_conn: asyncpg.Connection): """ Test that query errors with pending changes are handled correctly. @@ -163,7 +161,6 @@ async def test_query_error_with_pending_changes(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_multiple_errors_in_sequence(db_conn: asyncpg.Connection): """ Test that multiple errors in sequence don't cause cascading issues. @@ -203,7 +200,6 @@ async def test_multiple_errors_in_sequence(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_nested_transaction_abort(db_conn: asyncpg.Connection): """ Test nested transaction (savepoint) abort handling. @@ -252,6 +248,253 @@ async def test_nested_transaction_abort(db_conn: asyncpg.Connection): pass +@pytest.mark.asyncio +async def test_nested_transaction_abort_large_inner_insert(db_conn: asyncpg.Connection): + """ + Test savepoint rollback when inner transaction has enough rows to trigger insert buffering thresholds. + """ + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_large ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_large (value) VALUES (1)") + + try: + async with db_conn.transaction(): + await db_conn.execute(""" + INSERT INTO test_nested_abort_large (value) + SELECT generate_series(2, 700) + """) + await db_conn.execute("SELECT * FROM nonexistent_table_large") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("INSERT INTO test_nested_abort_large (value) VALUES (701)") + + count = await db_conn.fetchval("SELECT COUNT(*) FROM test_nested_abort_large") + assert count == 2, f"Expected 2 rows (values 1 and 701), got {count}" + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_large ORDER BY value") + assert [r['value'] for r in values] == [1, 701], "Should have values 1 and 701" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_large") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_delete(db_conn: asyncpg.Connection): + """Inner savepoint DELETEs should rollback while outer transaction changes commit.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_delete ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + INSERT INTO test_nested_abort_delete (value) + SELECT generate_series(1, 5) + """) + + async with db_conn.transaction(): + try: + async with db_conn.transaction(): + await db_conn.execute("DELETE FROM test_nested_abort_delete WHERE value IN (2, 3)") + await db_conn.execute("SELECT * FROM nonexistent_table_delete") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("DELETE FROM test_nested_abort_delete WHERE value = 5") + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_delete ORDER BY value") + assert [r['value'] for r in values] == [1, 2, 3, 4], "Inner DELETEs must rollback, outer DELETE must commit" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_delete") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_update(db_conn: asyncpg.Connection): + """Inner savepoint UPDATEs should rollback while outer transaction updates commit.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_update ( + id INTEGER PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + INSERT INTO test_nested_abort_update (id, value) VALUES (1, 10), (2, 20) + """) + + async with db_conn.transaction(): + await db_conn.execute("UPDATE test_nested_abort_update SET value = 11 WHERE id = 1") + try: + async with db_conn.transaction(): + await db_conn.execute("UPDATE test_nested_abort_update SET value = 22 WHERE id = 2") + await db_conn.execute("SELECT * FROM nonexistent_table_update") + except asyncpg.exceptions.UndefinedTableError: + pass + await db_conn.execute("UPDATE test_nested_abort_update SET value = 12 WHERE id = 1") + + rows = await db_conn.fetch("SELECT id, value FROM test_nested_abort_update ORDER BY id") + assert [(r['id'], r['value']) for r in rows] == [(1, 12), (2, 20)], \ + "Inner UPDATE must rollback, outer UPDATE must persist" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_update") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_copy_like_insert(db_conn: asyncpg.Connection): + """COPY path in inner savepoint should rollback only inner rows on error.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_copy ( + id SERIAL PRIMARY KEY, + value INTEGER NOT NULL + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_copy (value) VALUES (1)") + try: + async with db_conn.transaction(): + await db_conn.copy_records_to_table( + "test_nested_abort_copy", + records=[(2,), (None,), (3,)], + columns=["value"], + ) + except asyncpg.exceptions.NotNullViolationError: + pass + await db_conn.execute("INSERT INTO test_nested_abort_copy (value) VALUES (4)") + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_copy ORDER BY value") + assert [r['value'] for r in values] == [1, 4], "Inner COPY rows must rollback on error" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_copy") + except: + pass + + +@pytest.mark.asyncio +async def test_multi_level_savepoint_abort(db_conn: asyncpg.Connection): + """Abort at second-level savepoint must preserve first-level savepoint and outer transaction.""" + try: + await db_conn.execute(""" + CREATE TABLE test_multi_level_savepoint ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (1)") + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (2)") + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (3)") + await db_conn.execute("SELECT * FROM nonexistent_table_sp2") + except asyncpg.exceptions.UndefinedTableError: + pass + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (4)") + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (5)") + + values = await db_conn.fetch("SELECT value FROM test_multi_level_savepoint ORDER BY value") + assert [r['value'] for r in values] == [1, 2, 4, 5], "Only deepest savepoint changes should rollback" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_multi_level_savepoint") + except: + pass + + +@pytest.mark.asyncio +async def test_top_level_abort_after_inner_savepoint_success(db_conn: asyncpg.Connection): + """Top-level abort must rollback both outer and successfully committed inner savepoint changes.""" + try: + await db_conn.execute(""" + CREATE TABLE test_top_level_abort_after_inner ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_top_level_abort_after_inner (value) VALUES (1)") + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_top_level_abort_after_inner (value) VALUES (2)") + await db_conn.execute("SELECT * FROM nonexistent_table_outer_abort") + except asyncpg.exceptions.UndefinedTableError: + pass + + count = await db_conn.fetchval("SELECT COUNT(*) FROM test_top_level_abort_after_inner") + assert count == 0, "Top-level abort must clear all changes including inner savepoint changes" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_top_level_abort_after_inner") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_cross_table(db_conn: asyncpg.Connection): + """Savepoint rollback should restore staged state across multiple deeplake tables.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_cross_a ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + CREATE TABLE test_nested_abort_cross_b ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (1)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (1)") + + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (2)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (2)") + await db_conn.execute("SELECT * FROM nonexistent_table_cross") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (3)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (3)") + + values_a = await db_conn.fetch("SELECT value FROM test_nested_abort_cross_a ORDER BY value") + values_b = await db_conn.fetch("SELECT value FROM test_nested_abort_cross_b ORDER BY value") + assert [r['value'] for r in values_a] == [1, 3], "Table A must rollback inner savepoint rows" + assert [r['value'] for r in values_b] == [1, 3], "Table B must rollback inner savepoint rows" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_cross_a") + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_cross_b") + except: + pass + + @pytest.mark.asyncio async def test_abort_during_schema_operation(db_conn: asyncpg.Connection): """ diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index 01e2e62198..ba0ee156a9 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -11,8 +11,8 @@ Usage: python3 scripts/build_pg_ext.py prod #Release build Usage: python3 scripts/build_pg_ext.py debug --deeplake-shared #Debug build with shared deeplake_api linking Usage: python3 scripts/build_pg_ext.py debug --deeplake-static #Debug build with static deeplake_api linking (force) -Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16,17,18 #Build for PostgreSQL 16, 17, and 18 -Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16 #Build for PostgreSQL 16 only +Usage: python3 scripts/build_pg_ext.py dev --pg-versions 17,18 #Build for PostgreSQL 17 and 18 +Usage: python3 scripts/build_pg_ext.py dev --pg-versions 17 #Build for PostgreSQL 17 only Usage: python3 scripts/build_pg_ext.py prod --pg-versions all #Build for all supported PostgreSQL versions """ @@ -141,7 +141,7 @@ def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_version # Add PostgreSQL version options if specified if pg_versions is not None: - supported_versions = {16, 17, 18} + supported_versions = {17, 18} # Set all versions to OFF by default for ver in supported_versions: if ver in pg_versions: @@ -234,20 +234,20 @@ def write_mode(mode: str): i += 1 elif arg == "--pg-versions": if i + 1 >= len(sys.argv): - raise Exception("--pg-versions requires a value (e.g., '16,17,18' or 'all')") + raise Exception("--pg-versions requires a value (e.g., '17,18' or 'all')") versions_str = sys.argv[i + 1] if versions_str == "all": - pg_versions = [16, 17, 18] + pg_versions = [17, 18] else: try: pg_versions = [int(v.strip()) for v in versions_str.split(',')] # Validate versions - supported = {16, 17, 18} + supported = {17, 18} invalid = set(pg_versions) - supported if invalid: raise Exception(f"Invalid PostgreSQL versions: {invalid}. Supported: {supported}") except ValueError: - raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '16,17,18') or 'all'") + raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '17,18') or 'all'") i += 2 else: raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, or --pg-versions")