Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ jniLibs/
.DS_Store
Thumbs.db
CLAUDE.md
*.o
75 changes: 47 additions & 28 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ struct cloudsync_context {
void *aux_data;

// stmts and context values
bool pragma_checked; // we need to check PRAGMAs only once per transaction
dbvm_t *schema_version_stmt;
dbvm_t *data_version_stmt;
dbvm_t *db_version_stmt;
Expand Down Expand Up @@ -255,13 +254,15 @@ const char *cloudsync_algo_name (table_algo algo) {
// MARK: - DBVM Utils -

DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
if (!stmt) return DBVM_VALUE_ERROR;

int rc = databasevm_step(stmt);
if (rc != DBRES_ROW && rc != DBRES_DONE) {
if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
databasevm_reset(stmt);
return DBVM_VALUE_ERROR;
}

DBVM_VALUE result = DBVM_VALUE_CHANGED;
if (stmt == data->data_version_stmt) {
int version = (int)database_column_int(stmt, 0);
Expand Down Expand Up @@ -365,12 +366,17 @@ int cloudsync_dbversion_rebuild (cloudsync_context *data) {
int cloudsync_dbversion_rerun (cloudsync_context *data) {
DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
if (schema_changed == DBVM_VALUE_ERROR) return -1;

if (schema_changed == DBVM_VALUE_CHANGED) {
int rc = cloudsync_dbversion_rebuild(data);
if (rc != DBRES_OK) return -1;
}


if (!data->db_version_stmt) {
data->db_version = CLOUDSYNC_MIN_DB_VERSION;
return 0;
}

DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
if (rc == DBVM_VALUE_ERROR) return -1;
return 0;
Expand Down Expand Up @@ -559,7 +565,7 @@ void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
}

void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
if (data->current_schema == schema) return;
if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
if (data->current_schema) cloudsync_memory_free(data->current_schema);
data->current_schema = NULL;
if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
Expand Down Expand Up @@ -748,7 +754,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
if (rc != DBRES_OK) goto cleanup;

// precompile the insert local sentinel statement
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);

Expand Down Expand Up @@ -920,37 +926,44 @@ int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
cloudsync_table_context *table = (cloudsync_table_context *)xdata;
cloudsync_context *data = table->context;

int index = table->ncols;
for (int i=0; i<ncols; i+=2) {
const char *name = values[i];
int cid = (int)strtol(values[i+1], NULL, 0);

table->col_id[index] = cid;
table->col_name[index] = cloudsync_string_dup_lowercase(name);
if (!table->col_name[index]) return 1;
if (!table->col_name[index]) goto error;

char *sql = table_build_mergeinsert_sql(table, name);
if (!sql) return DBRES_NOMEM;
if (!sql) goto error;
DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);

int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) return rc;
if (!table->col_merge_stmt[index]) return DBRES_MISUSE;
if (rc != DBRES_OK) goto error;
if (!table->col_merge_stmt[index]) goto error;

sql = table_build_value_sql(table, name);
if (!sql) return DBRES_NOMEM;
if (!sql) goto error;
DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);

rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) return rc;
if (!table->col_value_stmt[index]) return DBRES_MISUSE;
if (rc != DBRES_OK) goto error;
if (!table->col_value_stmt[index]) goto error;
}
table->ncols += 1;

return 0;

error:
// clean up partially-initialized entry at index
if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
return 1;
}

bool table_ensure_capacity (cloudsync_context *data) {
Expand Down Expand Up @@ -992,7 +1005,7 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
table->npks = count;
if (table->npks == 0) {
#if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
return false;
goto abort_add_table;
#else
table->rowid_only = true;
table->npks = 1; // rowid
Expand Down Expand Up @@ -1039,7 +1052,8 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char

dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
dbvm_t *vm = NULL;

*persistent = false;

cloudsync_table_context *table = table_lookup(data, tbl_name);
if (table) {
char *col_name = NULL;
Expand Down Expand Up @@ -1082,7 +1096,7 @@ const char *table_colname (cloudsync_table_context *table, int index) {
bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
// check if a row with the same primary key already exists
// if so, this means the row might have been previously deleted (sentinel)
return (bool)dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB);
return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
}

char **table_pknames (cloudsync_table_context *table) {
Expand Down Expand Up @@ -1373,6 +1387,10 @@ int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table,
rc = databasevm_step(vm);
if (rc == DBRES_ROW) {
const void *local_site_id = database_column_blob(vm, 0);
if (!local_site_id) {
dbvm_reset(vm);
return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
}
ret = memcmp(site_id, local_site_id, site_len);
*didwin_flag = (ret > 0);
dbvm_reset(vm);
Expand Down Expand Up @@ -1929,6 +1947,7 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
rc = databasevm_step(vm);
if (rc == DBRES_ROW) {
const char *pk = (const char *)database_column_text(vm, 0);
if (!pk) { rc = DBRES_ERROR; break; }
size_t pklen = strlen(pk);
rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
} else if (rc == DBRES_DONE) {
Expand Down Expand Up @@ -2448,8 +2467,8 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
if (in_savepoint && db_version_changed) {
rc = database_commit_savepoint(data, "cloudsync_payload_apply");
if (rc != DBRES_OK) {
if (clone) cloudsync_memory_free(clone);
return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
goto cleanup;
}
in_savepoint = false;
}
Expand All @@ -2459,8 +2478,8 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
if (!in_transaction && db_version_changed) {
rc = database_begin_savepoint(data, "cloudsync_payload_apply");
if (rc != DBRES_OK) {
if (clone) cloudsync_memory_free(clone);
return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
goto cleanup;
}
last_payload_db_version = decoded_context.db_version;
in_savepoint = true;
Expand Down Expand Up @@ -2548,7 +2567,7 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
if (rc != DBRES_OK) return rc;

// exit if there is no data to send
if (blob == NULL || *blob_size == 0) return DBRES_OK;
if (*blob == NULL || *blob_size == 0) return DBRES_OK;
return rc;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.9.110"
#define CLOUDSYNC_VERSION "0.9.111"
#define CLOUDSYNC_MAX_TABLENAME_LEN 512

#define CLOUDSYNC_VALUE_NOTSET -1
Expand Down
12 changes: 7 additions & 5 deletions src/dbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,16 @@ int dbutils_settings_init (cloudsync_context *data) {
if (rc != DBRES_OK) return rc;

// library version
char sql[1024];
snprintf(sql, sizeof(sql), SQL_INSERT_SETTINGS_STR_FORMAT, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
char *sql = cloudsync_memory_mprintf(SQL_INSERT_SETTINGS_STR_FORMAT, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
if (!sql) return DBRES_NOMEM;
rc = database_exec(data, sql);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) return rc;

// schema version
snprintf(sql, sizeof(sql), SQL_INSERT_SETTINGS_INT_FORMAT, CLOUDSYNC_KEY_SCHEMAVERSION, (long long)database_schema_version(data));
rc = database_exec(data, sql);
char sql_int[1024];
snprintf(sql_int, sizeof(sql_int), SQL_INSERT_SETTINGS_INT_FORMAT, CLOUDSYNC_KEY_SCHEMAVERSION, (long long)database_schema_version(data));
rc = database_exec(data, sql_int);
if (rc != DBRES_OK) return rc;
}

Expand Down
2 changes: 1 addition & 1 deletion src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ void cloudsync_network_logout (sqlite3_context *context, int argc, sqlite3_value
}

// run everything in a savepoint
rc = database_begin_savepoint(data, "cloudsync_logout_savepoint;");
rc = database_begin_savepoint(data, "cloudsync_logout_savepoint");
if (rc != SQLITE_OK) {
errmsg = cloudsync_memory_mprintf("Unable to create cloudsync_logout savepoint %s", cloudsync_errmsg(data));
goto finalize;
Expand Down
49 changes: 31 additions & 18 deletions src/postgresql/cloudsync_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ Datum pg_cloudsync_terminate (PG_FUNCTION_ARGS) {
PG_END_TRY();

if (spi_connected) SPI_finish();
PG_RETURN_INT32(rc);
PG_RETURN_BOOL(rc == DBRES_OK);
}

// MARK: - Settings Functions -
Expand Down Expand Up @@ -820,8 +820,7 @@ Datum cloudsync_payload_encode_transfn (PG_FUNCTION_ARGS) {
// Get or allocate aggregate state
if (PG_ARGISNULL(0)) {
MemoryContext oldContext = MemoryContextSwitchTo(aggContext);
payload = (cloudsync_payload_context *)cloudsync_memory_alloc(cloudsync_payload_context_size(NULL));
memset(payload, 0, cloudsync_payload_context_size(NULL));
payload = (cloudsync_payload_context *)palloc0(cloudsync_payload_context_size(NULL));
MemoryContextSwitchTo(oldContext);
} else {
payload = (cloudsync_payload_context *)PG_GETARG_POINTER(0);
Expand Down Expand Up @@ -1819,13 +1818,16 @@ static Oid get_column_oid(const char *schema, const char *table_name, const char
pfree(DatumGetPointer(values[1]));
if (schema) pfree(DatumGetPointer(values[2]));

if (ret != SPI_OK_SELECT || SPI_processed == 0) return InvalidOid;
if (ret != SPI_OK_SELECT || SPI_processed == 0) {
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
return InvalidOid;
}

bool isnull;
Datum col_oid = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
if (isnull) return InvalidOid;

return DatumGetObjectId(col_oid);
Oid result = isnull ? InvalidOid : DatumGetObjectId(col_oid);
SPI_freetuptable(SPI_tuptable);
return result;
}

// Decode encoded bytea into a pgvalue_t with the decoded base type.
Expand Down Expand Up @@ -1958,23 +1960,34 @@ Datum cloudsync_col_value(PG_FUNCTION_ARGS) {
}

// execute vm
Datum d = (Datum)0;
int rc = databasevm_step(vm);
if (rc == DBRES_DONE) {
rc = DBRES_OK;
PG_RETURN_CSTRING(CLOUDSYNC_RLS_RESTRICTED_VALUE);
databasevm_reset(vm);
// row not found (RLS or genuinely missing) — return the RLS sentinel as bytea
const char *rls = CLOUDSYNC_RLS_RESTRICTED_VALUE;
size_t rls_len = strlen(rls);
bytea *result = (bytea *)palloc(VARHDRSZ + rls_len);
SET_VARSIZE(result, VARHDRSZ + rls_len);
memcpy(VARDATA(result), rls, rls_len);
PG_RETURN_BYTEA_P(result);
} else if (rc == DBRES_ROW) {
// store value result
rc = DBRES_OK;
d = database_column_datum(vm, 0);
}

if (rc != DBRES_OK) {
// copy value before reset invalidates SPI tuple memory
const void *blob = database_column_blob(vm, 0);
int blob_len = database_column_bytes(vm, 0);
bytea *result = NULL;
if (blob && blob_len > 0) {
result = (bytea *)palloc(VARHDRSZ + blob_len);
SET_VARSIZE(result, VARHDRSZ + blob_len);
memcpy(VARDATA(result), blob, blob_len);
}
databasevm_reset(vm);
ereport(ERROR, (errmsg("cloudsync_col_value error: %s", cloudsync_errmsg(data))));
if (result) PG_RETURN_BYTEA_P(result);
PG_RETURN_NULL();
}

databasevm_reset(vm);
PG_RETURN_DATUM(d);
ereport(ERROR, (errmsg("cloudsync_col_value error: %s", cloudsync_errmsg(data))));
PG_RETURN_NULL(); // unreachable, silences compiler
}

// Track SRF execution state across calls
Expand Down
Loading