Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 181 additions & 21 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ mod management;
mod raw_table;
mod table_info;

use alloc::{rc::Rc, vec::Vec};
use core::{ffi::c_void, fmt::Write};

use alloc::{format, rc::Rc, vec::Vec};
pub use common::{ColumnFilter, SchemaTable};
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
pub use raw_table::InferredSchemaCache;
Expand All @@ -17,9 +19,12 @@ pub use table_info::{

use crate::{
error::{PSResult, PowerSyncError},
schema::raw_table::generate_raw_table_trigger,
ext::SafeManagedStmt,
schema::raw_table::{InferredTableStructure, generate_raw_table_trigger},
state::DatabaseState,
utils::WriteType,
sync::RawTableWithCachedStatements,
utils::{SqlBuffer, WriteType, verify_in_transaction},
views::table_columns_to_json_object,
};

#[derive(Deserialize, Default)]
Expand All @@ -29,27 +34,159 @@ pub struct Schema {
pub raw_tables: Vec<table_info::RawTable>,
}

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
management::register(db, state)?;
impl Schema {
pub fn find_raw_table(&self, name: &str) -> Option<&table_info::RawTable> {
self.raw_tables.iter().find(|tbl| tbl.name == name)
}
}

{
fn create_trigger(
context: *mut sqlite::context,
args: &[*mut sqlite::value],
) -> Result<(), PowerSyncError> {
// Args: Table (JSON), trigger_name, write_type
let table: RawTable =
serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?;
let trigger_name = args[1].text();
let write_type: WriteType = args[2].text().parse()?;

let db = context.db_handle();
let create_trigger_stmt =
generate_raw_table_trigger(db, &table, trigger_name, write_type)?;
db.exec_safe(&create_trigger_stmt).into_db_result(db)?;
Ok(())
fn create_trigger(
context: *mut sqlite::context,
args: &[*mut sqlite::value],
) -> Result<(), PowerSyncError> {
// Args: Table (JSON), trigger_name, write_type
let table: RawTable =
serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?;
let trigger_name = args[1].text();
let write_type: WriteType = args[2].text().parse()?;

let db = context.db_handle();
let create_trigger_stmt = generate_raw_table_trigger(db, &table, trigger_name, write_type)?;
db.exec_safe(&create_trigger_stmt).into_db_result(db)?;
Ok(())
}

enum RawTableMigration<'a> {
CreateFromUntyped { table_name: &'a str },
DropIntoUntyped { table: RawTable },
}

impl<'a> RawTableMigration<'a> {
fn from_args(args: &'a [*mut sqlite::value]) -> Result<Self, PowerSyncError> {
match args[0].text() {
"create" => Ok(RawTableMigration::CreateFromUntyped {
table_name: args[1].text(),
}),
"drop" => {
let table: RawTable = serde_json::from_str(args[1].text())
.map_err(PowerSyncError::as_argument_error)?;
Ok(RawTableMigration::DropIntoUntyped { table })
}
_ => Err(PowerSyncError::argument_error(
"Unknown action for powersync_raw_table_migrate",
)),
}
}
}

/// Utility to help with migrations involving raw tables.
///
/// `powersync_raw_table_migrate('create', $name)` moves existing rows matching the table from
/// `ps_untyped` into the raw table (to be called after adding the raw table to the schema).
///
/// `powersync_raw_table_migrate('drop', $json)` moves rows from the raw table back into
/// `ps_untyped` (call before removing the raw table from the schema), where `$json` is a JSON
/// representation of the [`RawTable`] being dropped.
fn raw_table_migration(
context: *mut sqlite::context,
args: &[*mut sqlite::value],
) -> Result<(), PowerSyncError> {
let db = context.db_handle();
verify_in_transaction(db)?;

let context = unsafe { DatabaseState::from_context(&context) };
let action = RawTableMigration::from_args(args)?;

// Disable triggers forwarding writes to ps_crud.
let _guard = context.sync_local_guard();

let schema_version = InferredSchemaCache::current_schema_version(db)?;

match action {
RawTableMigration::CreateFromUntyped { table_name } => {
// Move data from ps_untyped into this raw table.
let Some(schema) = context.view_schema() else {
return Err(PowerSyncError::state_error("Schema not initialized"));
};

let Some(table) = schema.find_raw_table(table_name) else {
return Err(PowerSyncError::argument_error(format!(
"Raw table {table_name} not found"
)));
};
let delete_untyped =
db.prepare_v2("DELETE FROM ps_untyped WHERE type = ? RETURNING id, data")?;
delete_untyped.bind_text(1, table_name, powersync_sqlite_nostd::Destructor::STATIC)?;
let mut table_statements = RawTableWithCachedStatements::new(table);

while let ResultCode::ROW = delete_untyped.step()? {
let put = table_statements.put_statement(
db,
schema_version,
&context.inferred_schema_cache,
)?;

let id = delete_untyped.column_text(0)?;
let data_text = delete_untyped.column_text(1)?;
let parsed: serde_json::Value =
serde_json::from_str(data_text).map_err(PowerSyncError::json_local_error)?;
let json_object = parsed.as_object().ok_or_else(|| {
PowerSyncError::argument_error("expected oplog data to be an object")
})?;

let rest = put.render_rest_object(&json_object)?;
put.bind_for_put(id, &json_object, &rest)?;
put.exec(db, table_name, id, Some(&parsed))?;
}
}
RawTableMigration::DropIntoUntyped { table } => {
// Copy data from the raw table into ps_untyped, ignoring local-only columns.
let local_table_name = table.require_table_name()?;
let resolved_table = InferredTableStructure::read_from_database(
local_table_name,
db,
&table.schema.synced_columns,
)?;
let as_schema_table = SchemaTable::Raw {
definition: &table,
schema: &resolved_table,
};

let sql = {
// Generate an INSERT INTO ps_untyped with a SELECT source transforming existing
// rows into JSON objects.
let mut buffer = SqlBuffer::new();
let fragment = table_columns_to_json_object(local_table_name, &as_schema_table)?;
buffer.push_str("INSERT INTO ps_untyped (type, id, data) SELECT ?, id, ");
buffer.push_str(&fragment);
buffer.push_str(" FROM ");
let _ = buffer.identifier().write_str(local_table_name);
buffer.push_char(';');
buffer.sql
};
{
let stmt = db.prepare_v2(&sql).into_db_result(db)?;
stmt.bind_text(1, &table.name, powersync_sqlite_nostd::Destructor::STATIC)?;
stmt.exec().into_db_result(db)?;
}

// Then, delete raw table contents.
{
let mut truncate_buf = SqlBuffer::new();
truncate_buf.push_str("DELETE FROM ");
let _ = truncate_buf.identifier().write_str(local_table_name);
db.exec_safe(&truncate_buf.sql).into_db_result(db)?;
}
}
}

Ok(())
}

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
management::register(db, state.clone())?;

{
extern "C" fn create_raw_trigger_sqlite(
context: *mut sqlite::context,
argc: i32,
Expand All @@ -61,6 +198,17 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
}
}

extern "C" fn raw_table_migrate_sqlite(
context: *mut sqlite::context,
argc: i32,
args: *mut *mut sqlite::value,
) {
let args = args!(argc, args);
if let Err(e) = raw_table_migration(context, args) {
e.apply_to_ctx("powersync_raw_table_migrate", context);
}
}

db.create_function_v2(
"powersync_create_raw_table_crud_trigger",
3,
Expand All @@ -71,6 +219,18 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
None,
None,
)?;

db.create_function_v2(
"powersync_raw_table_migrate",
2,
sqlite::UTF8,
Some(Rc::into_raw(state) as *mut c_void),
Some(raw_table_migrate_sqlite),
None,
None,
Some(DatabaseState::destroy_rc),
)?;
}

Ok(())
}
1 change: 1 addition & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod sync_status;

pub use bucket_priority::BucketPriority;
pub use checksum::Checksum;
pub use sync_local::RawTableWithCachedStatements;

use crate::state::DatabaseState;
pub use streaming_sync::SyncClient;
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/sync/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,21 @@ struct ParsedSchemaTable<'a> {
raw: Option<RawTableWithCachedStatements<'a>>,
}

struct RawTableWithCachedStatements<'a> {
pub struct RawTableWithCachedStatements<'a> {
definition: &'a RawTable,
cached_put: Option<PreparedPendingStatement>,
cached_delete: Option<PreparedPendingStatement>,
}

impl<'a> RawTableWithCachedStatements<'a> {
pub fn new(definition: &'a RawTable) -> Self {
Self {
definition,
cached_put: None,
cached_delete: None,
}
}

fn prepare_lazily(
db: *mut sqlite::sqlite3,
slot: &mut Option<PreparedPendingStatement>,
Expand All @@ -454,7 +462,7 @@ impl<'a> RawTableWithCachedStatements<'a> {
})
}

fn put_statement(
pub fn put_statement(
&'_ mut self,
db: *mut sqlite::sqlite3,
schema_version: usize,
Expand Down Expand Up @@ -494,16 +502,12 @@ impl<'a> ParsedSchemaTable<'a> {

pub fn raw(definition: &'a RawTable) -> Self {
Self {
raw: Some(RawTableWithCachedStatements {
definition,
cached_put: None,
cached_delete: None,
}),
raw: Some(RawTableWithCachedStatements::new(definition)),
}
}
}

struct PreparedPendingStatement {
pub struct PreparedPendingStatement {
stmt: ManagedStmt,
definition: Rc<PendingStatement>,
}
Expand Down
2 changes: 2 additions & 0 deletions dart/test/error_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ void main() {
'SELECT powersync_clear(?)', 'powersync_clear', [0]);
expectErrorOutsideOfTransaction(
'SELECT powersync_trigger_resync(TRUE)', 'powersync_trigger_resync');
expectErrorOutsideOfTransaction('SELECT powersync_raw_table_migrate(?,?)',
'powersync_raw_table_migrate', ['create', 'a']);
});

test('contain inner SQLite descriptions', () {
Expand Down
Loading
Loading