diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 50e8e5c..911d294 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -4,7 +4,9 @@ mod management; mod raw_table; mod table_info; -use alloc::{rc::Rc, vec::Vec}; +use core::{ffi::c_void, fmt::Write}; + +use alloc::{format, rc::Rc, vec::Vec}; pub use common::{ColumnFilter, SchemaTable}; use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args}; pub use raw_table::InferredSchemaCache; @@ -17,9 +19,12 @@ pub use table_info::{ use crate::{ error::{PSResult, PowerSyncError}, - schema::raw_table::generate_raw_table_trigger, + ext::SafeManagedStmt, + schema::raw_table::{InferredTableStructure, generate_raw_table_trigger}, state::DatabaseState, - utils::WriteType, + sync::RawTableWithCachedStatements, + utils::{SqlBuffer, WriteType, verify_in_transaction}, + views::table_columns_to_json_object, }; #[derive(Deserialize, Default)] @@ -29,27 +34,159 @@ pub struct Schema { pub raw_tables: Vec, } -pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { - management::register(db, state)?; +impl Schema { + pub fn find_raw_table(&self, name: &str) -> Option<&table_info::RawTable> { + self.raw_tables.iter().find(|tbl| tbl.name == name) + } +} - { - fn create_trigger( - context: *mut sqlite::context, - args: &[*mut sqlite::value], - ) -> Result<(), PowerSyncError> { - // Args: Table (JSON), trigger_name, write_type - let table: RawTable = - serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?; - let trigger_name = args[1].text(); - let write_type: WriteType = args[2].text().parse()?; - - let db = context.db_handle(); - let create_trigger_stmt = - generate_raw_table_trigger(db, &table, trigger_name, write_type)?; - db.exec_safe(&create_trigger_stmt).into_db_result(db)?; - Ok(()) +fn create_trigger( + context: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result<(), PowerSyncError> { + // Args: Table (JSON), trigger_name, write_type + let table: RawTable = + serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?; + let trigger_name = args[1].text(); + let write_type: WriteType = args[2].text().parse()?; + + let db = context.db_handle(); + let create_trigger_stmt = generate_raw_table_trigger(db, &table, trigger_name, write_type)?; + db.exec_safe(&create_trigger_stmt).into_db_result(db)?; + Ok(()) +} + +enum RawTableMigration<'a> { + CreateFromUntyped { table_name: &'a str }, + DropIntoUntyped { table: RawTable }, +} + +impl<'a> RawTableMigration<'a> { + fn from_args(args: &'a [*mut sqlite::value]) -> Result { + match args[0].text() { + "create" => Ok(RawTableMigration::CreateFromUntyped { + table_name: args[1].text(), + }), + "drop" => { + let table: RawTable = serde_json::from_str(args[1].text()) + .map_err(PowerSyncError::as_argument_error)?; + Ok(RawTableMigration::DropIntoUntyped { table }) + } + _ => Err(PowerSyncError::argument_error( + "Unknown action for powersync_raw_table_migrate", + )), } + } +} + +/// Utility to help with migrations involving raw tables. +/// +/// `powersync_raw_table_migrate('create', $name)` moves existing rows matching the table from +/// `ps_untyped` into the raw table (to be called after adding the raw table to the schema). +/// +/// `powersync_raw_table_migrate('drop', $json)` moves rows from the raw table back into +/// `ps_untyped` (call before removing the raw table from the schema), where `$json` is a JSON +/// representation of the [`RawTable`] being dropped. +fn raw_table_migration( + context: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result<(), PowerSyncError> { + let db = context.db_handle(); + verify_in_transaction(db)?; + + let context = unsafe { DatabaseState::from_context(&context) }; + let action = RawTableMigration::from_args(args)?; + + // Disable triggers forwarding writes to ps_crud. + let _guard = context.sync_local_guard(); + + let schema_version = InferredSchemaCache::current_schema_version(db)?; + + match action { + RawTableMigration::CreateFromUntyped { table_name } => { + // Move data from ps_untyped into this raw table. + let Some(schema) = context.view_schema() else { + return Err(PowerSyncError::state_error("Schema not initialized")); + }; + + let Some(table) = schema.find_raw_table(table_name) else { + return Err(PowerSyncError::argument_error(format!( + "Raw table {table_name} not found" + ))); + }; + let delete_untyped = + db.prepare_v2("DELETE FROM ps_untyped WHERE type = ? RETURNING id, data")?; + delete_untyped.bind_text(1, table_name, powersync_sqlite_nostd::Destructor::STATIC)?; + let mut table_statements = RawTableWithCachedStatements::new(table); + while let ResultCode::ROW = delete_untyped.step()? { + let put = table_statements.put_statement( + db, + schema_version, + &context.inferred_schema_cache, + )?; + + let id = delete_untyped.column_text(0)?; + let data_text = delete_untyped.column_text(1)?; + let parsed: serde_json::Value = + serde_json::from_str(data_text).map_err(PowerSyncError::json_local_error)?; + let json_object = parsed.as_object().ok_or_else(|| { + PowerSyncError::argument_error("expected oplog data to be an object") + })?; + + let rest = put.render_rest_object(&json_object)?; + put.bind_for_put(id, &json_object, &rest)?; + put.exec(db, table_name, id, Some(&parsed))?; + } + } + RawTableMigration::DropIntoUntyped { table } => { + // Copy data from the raw table into ps_untyped, ignoring local-only columns. + let local_table_name = table.require_table_name()?; + let resolved_table = InferredTableStructure::read_from_database( + local_table_name, + db, + &table.schema.synced_columns, + )?; + let as_schema_table = SchemaTable::Raw { + definition: &table, + schema: &resolved_table, + }; + + let sql = { + // Generate an INSERT INTO ps_untyped with a SELECT source transforming existing + // rows into JSON objects. + let mut buffer = SqlBuffer::new(); + let fragment = table_columns_to_json_object(local_table_name, &as_schema_table)?; + buffer.push_str("INSERT INTO ps_untyped (type, id, data) SELECT ?, id, "); + buffer.push_str(&fragment); + buffer.push_str(" FROM "); + let _ = buffer.identifier().write_str(local_table_name); + buffer.push_char(';'); + buffer.sql + }; + { + let stmt = db.prepare_v2(&sql).into_db_result(db)?; + stmt.bind_text(1, &table.name, powersync_sqlite_nostd::Destructor::STATIC)?; + stmt.exec().into_db_result(db)?; + } + + // Then, delete raw table contents. + { + let mut truncate_buf = SqlBuffer::new(); + truncate_buf.push_str("DELETE FROM "); + let _ = truncate_buf.identifier().write_str(local_table_name); + db.exec_safe(&truncate_buf.sql).into_db_result(db)?; + } + } + } + + Ok(()) +} + +pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { + management::register(db, state.clone())?; + + { extern "C" fn create_raw_trigger_sqlite( context: *mut sqlite::context, argc: i32, @@ -61,6 +198,17 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() } } + extern "C" fn raw_table_migrate_sqlite( + context: *mut sqlite::context, + argc: i32, + args: *mut *mut sqlite::value, + ) { + let args = args!(argc, args); + if let Err(e) = raw_table_migration(context, args) { + e.apply_to_ctx("powersync_raw_table_migrate", context); + } + } + db.create_function_v2( "powersync_create_raw_table_crud_trigger", 3, @@ -71,6 +219,18 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() None, None, )?; + + db.create_function_v2( + "powersync_raw_table_migrate", + 2, + sqlite::UTF8, + Some(Rc::into_raw(state) as *mut c_void), + Some(raw_table_migrate_sqlite), + None, + None, + Some(DatabaseState::destroy_rc), + )?; } + Ok(()) } diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index 874eb29..1823d19 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -16,6 +16,7 @@ mod sync_status; pub use bucket_priority::BucketPriority; pub use checksum::Checksum; +pub use sync_local::RawTableWithCachedStatements; use crate::state::DatabaseState; pub use streaming_sync::SyncClient; diff --git a/crates/core/src/sync/sync_local.rs b/crates/core/src/sync/sync_local.rs index 1293ef3..00d88e3 100644 --- a/crates/core/src/sync/sync_local.rs +++ b/crates/core/src/sync/sync_local.rs @@ -433,13 +433,21 @@ struct ParsedSchemaTable<'a> { raw: Option>, } -struct RawTableWithCachedStatements<'a> { +pub struct RawTableWithCachedStatements<'a> { definition: &'a RawTable, cached_put: Option, cached_delete: Option, } impl<'a> RawTableWithCachedStatements<'a> { + pub fn new(definition: &'a RawTable) -> Self { + Self { + definition, + cached_put: None, + cached_delete: None, + } + } + fn prepare_lazily( db: *mut sqlite::sqlite3, slot: &mut Option, @@ -454,7 +462,7 @@ impl<'a> RawTableWithCachedStatements<'a> { }) } - fn put_statement( + pub fn put_statement( &'_ mut self, db: *mut sqlite::sqlite3, schema_version: usize, @@ -494,16 +502,12 @@ impl<'a> ParsedSchemaTable<'a> { pub fn raw(definition: &'a RawTable) -> Self { Self { - raw: Some(RawTableWithCachedStatements { - definition, - cached_put: None, - cached_delete: None, - }), + raw: Some(RawTableWithCachedStatements::new(definition)), } } } -struct PreparedPendingStatement { +pub struct PreparedPendingStatement { stmt: ManagedStmt, definition: Rc, } diff --git a/dart/test/error_test.dart b/dart/test/error_test.dart index 74da34a..8409e2b 100644 --- a/dart/test/error_test.dart +++ b/dart/test/error_test.dart @@ -40,6 +40,8 @@ void main() { 'SELECT powersync_clear(?)', 'powersync_clear', [0]); expectErrorOutsideOfTransaction( 'SELECT powersync_trigger_resync(TRUE)', 'powersync_trigger_resync'); + expectErrorOutsideOfTransaction('SELECT powersync_raw_table_migrate(?,?)', + 'powersync_raw_table_migrate', ['create', 'a']); }); test('contain inner SQLite descriptions', () { diff --git a/dart/test/schema_test.dart b/dart/test/schema_test.dart index dfbd4ff..9ac1a1b 100644 --- a/dart/test/schema_test.dart +++ b/dart/test/schema_test.dart @@ -322,6 +322,164 @@ END''', test('#$i', () => testCase.testWith(db)); } }); + + group('powersync_raw_table_migrate', () { + const syncName = 'synced_table'; + + Object singleRawTableSchema(Object rawTable) { + return { + 'tables': [], + 'raw_tables': [rawTable] + }; + } + + setUp(() => db.executeInTx('SELECT powersync_init();')); + + group('create', () { + void createUntypedItem(String id, Object? json) { + db.execute( + 'INSERT INTO ps_untyped (type, id, data) VALUES (?, ?, ?)', + [syncName, id, jsonEncode(json)], + ); + } + + test('inferred statements', () { + createUntypedItem('foo', {'hello': 'world'}); + createUntypedItem('bar', {'hello': 'again'}); + + db.executeInTx('SELECT powersync_replace_schema(?)', [ + json.encode(singleRawTableSchema( + {'name': syncName, 'table_name': 'greetings'})), + ]); + db.execute( + 'CREATE TABLE greetings (id TEXT PRIMARY KEY, hello TEXT) STRICT;'); + + db.executeInTx( + 'SELECT powersync_raw_table_migrate(?, ?)', ['create', syncName]); + expect(db.select('select * from ps_untyped'), isEmpty); + expect(db.select('select * from greetings'), [ + {'id': 'foo', 'hello': 'world'}, + {'id': 'bar', 'hello': 'again'}, + ]); + }); + + test('custom put statement', () { + createUntypedItem('foo', {'hello': 'world'}); + createUntypedItem('bar', {'hello': 'again'}); + + db.executeInTx('SELECT powersync_replace_schema(?)', [ + json.encode(singleRawTableSchema({ + 'name': syncName, + 'put': { + 'sql': 'INSERT INTO greetings (id, hello) VALUES (?, ?)', + 'params': [ + 'Id', + {'Column': 'hello'}, + ], + }, + 'delete': {'sql': 'unused', 'params': []} + })), + ]); + db.execute( + 'CREATE TABLE greetings (id TEXT PRIMARY KEY, hello TEXT) STRICT;'); + + db.executeInTx( + 'SELECT powersync_raw_table_migrate(?, ?)', ['create', syncName]); + expect(db.select('select * from ps_untyped'), isEmpty); + expect(db.select('select * from greetings'), [ + {'id': 'foo', 'hello': 'world'}, + {'id': 'bar', 'hello': 'again'}, + ]); + }); + + test('errors when table is not in schema', () { + db.executeInTx('SELECT powersync_replace_schema(?)', [ + json.encode(singleRawTableSchema( + {'name': syncName, 'table_name': 'greetings'})), + ]); + + expect( + () => db.executeInTx('SELECT powersync_raw_table_migrate(?, ?)', + ['create', 'unknown_table']), + throwsA(isA()), + ); + }); + }); + + test('drop', () { + db + ..execute( + 'CREATE TABLE greetings (id TEXT PRIMARY KEY, hello TEXT, local TEXT) STRICT;') + ..execute( + 'INSERT INTO greetings (id, hello, local) VALUES (?, ?, uuid())', + ['id_0', 'first'], + ) + ..execute( + 'INSERT INTO greetings (id, hello, local) VALUES (?, ?, uuid())', + ['id_1', 'second'], + ) + ..executeInTx('SELECT powersync_raw_table_migrate(?, ?)', [ + 'drop', + json.encode({ + 'name': syncName, + 'table_name': 'greetings', + 'synced_columns': ['hello'] + }) + ]); + expect(db.select('select * from ps_untyped'), [ + {'type': 'synced_table', 'id': 'id_0', 'data': '{"hello":"first"}'}, + {'type': 'synced_table', 'id': 'id_1', 'data': '{"hello":"second"}'}, + ]); + expect(db.select('select * from greetings'), isEmpty); + }); + + test('crud triggers are not invoked during migration', () { + final tableJson = + json.encode({'name': syncName, 'table_name': 'greetings'}); + + db.executeInTx('SELECT powersync_replace_schema(?)', [ + json.encode(singleRawTableSchema( + {'name': syncName, 'table_name': 'greetings'})), + ]); + db.execute( + 'CREATE TABLE greetings (id TEXT PRIMARY KEY, hello TEXT) STRICT;'); + + db.execute(''' +SELECT + powersync_create_raw_table_crud_trigger(?1, 'greetings_insert', 'INSERT'), + powersync_create_raw_table_crud_trigger(?1, 'greetings_update', 'UPDATE'), + powersync_create_raw_table_crud_trigger(?1, 'greetings_delete', 'DELETE') +''', [tableJson]); + + db + ..execute( + 'INSERT INTO ps_untyped (type, id, data) VALUES (?, ?, ?)', [ + syncName, + 'id_1', + json.encode({'hello': 'world'}) + ]) + ..execute( + 'INSERT INTO ps_untyped (type, id, data) VALUES (?, ?, ?)', [ + syncName, + 'id_2', + json.encode({'hello': 'again'}) + ]); + + // create migration: ps_untyped -> raw table; triggers must not fire + db.execute('BEGIN'); + db.execute( + 'SELECT powersync_raw_table_migrate(?, ?)', ['create', syncName]); + expect(db.select('SELECT * FROM ps_crud'), isEmpty); + + // drop migration: raw table -> ps_untyped; triggers must not fire + db.execute('SELECT powersync_raw_table_migrate(?, ?)', [ + 'drop', + json.encode({'name': syncName, 'table_name': 'greetings'}), + ]); + db.execute('COMMIT'); + expect(db.select('SELECT * FROM ps_crud'), isEmpty); + }); + }); }); }