diff --git a/integration/copy_data/init.sql b/integration/copy_data/init.sql index 9fdd1fb33..5e8dfb34f 100644 --- a/integration/copy_data/init.sql +++ b/integration/copy_data/init.sql @@ -3,6 +3,6 @@ DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog2 DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog --- DROP SCHEMA IF EXISTS copy_data CASCADE; +DROP SCHEMA IF EXISTS copy_data CASCADE; SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; \i setup.sql diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index c30ea3b03..f98463a9a 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -239,6 +239,7 @@ impl Publisher { pub async fn data_sync(&mut self, dest: &Cluster) -> Result<(), Error> { // Create replication slots. self.create_slots().await?; + let mut handles = vec![]; for (number, shard) in self.cluster.shards().iter().enumerate() { let mut primary = shard.primary(&Request::default()).await?; @@ -272,8 +273,17 @@ impl Publisher { resharding_only }; - let manager = ParallelSyncManager::new(tables, replicas, dest)?; - let tables = manager.run().await?; + let dest = dest.clone(); + handles.push(spawn(async move { + let manager = ParallelSyncManager::new(tables, replicas, &dest)?; + let tables = manager.run().await?; + + Ok::, Error>(tables) + })); + } + + for (number, handle) in handles.into_iter().enumerate() { + let tables = handle.await??; info!( "table sync for {} tables complete [{}, shard: {}]", diff --git a/pgdog/src/backend/schema/sync/error.rs b/pgdog/src/backend/schema/sync/error.rs index e7e4ed629..5c8dbe843 100644 --- a/pgdog/src/backend/schema/sync/error.rs +++ b/pgdog/src/backend/schema/sync/error.rs @@ -1,4 +1,5 @@ use thiserror::Error; +use tokio::task::JoinError; #[derive(Debug, Error)] pub enum Error { @@ -37,6 +38,9 @@ pub enum Error { #[error("publication \"{0}\" has no tables")] PublicationNoTables(String), + + #[error("tokio task join error")] + JoinError(#[from] JoinError), } impl From for Error { diff --git a/pgdog/src/backend/schema/sync/pg_dump.rs b/pgdog/src/backend/schema/sync/pg_dump.rs index 9c584cc91..8e080b994 100644 --- a/pgdog/src/backend/schema/sync/pg_dump.rs +++ b/pgdog/src/backend/schema/sync/pg_dump.rs @@ -4,9 +4,11 @@ use std::{ collections::{HashMap, HashSet}, ops::Deref, str::from_utf8, + sync::Arc, }; use lazy_static::lazy_static; +use parking_lot::Mutex; use pg_query::{ protobuf::{AlterTableType, ConstrType, ObjectType, ParseResult, RangeVar, String as PgString}, Node, NodeEnum, @@ -110,7 +112,7 @@ fn should_convert_to_bigint<'a>( is_integer_type(sval.as_str()) } -use tokio::process::Command; +use tokio::{process::Command, spawn}; #[derive(Debug, Clone)] pub struct PgDump { @@ -1102,14 +1104,22 @@ impl PgDumpOutput { state: SyncState, ) -> Result<(), Error> { let stmts = self.statements(state)?; - let mut trackers = (0..dest.shards().len()) - .map(|shard| { - stmts - .iter() - .map(|stmt| (stmt.sql(), SchemaStatement::new(dest, stmt, shard, state))) - .collect::>() - }) - .collect::>(); + let trackers = Arc::new(Mutex::new( + (0..dest.shards().len()) + .map(|shard| { + stmts + .iter() + .map(|stmt| { + ( + stmt.sql().to_string(), + SchemaStatement::new(dest, stmt, shard, state), + ) + }) + .collect::>() + }) + .collect::>(), + )); + let mut handles = vec![]; for (num, shard) in dest.shards().iter().enumerate() { let mut primary = shard.primary(&Request::default()).await?; @@ -1121,51 +1131,68 @@ impl PgDumpOutput { dest.name() ); - let mut progress = Progress::new(stmts.len()); + let trackers = trackers.clone(); + let output = self.clone(); - for stmt in &stmts { - progress.next(stmt); + handles.push(spawn(async move { + let stmts = output.statements(state)?; - let mut tracker = trackers - .get_mut(num) - .and_then(|trackers| trackers.remove(stmt.sql())); + let mut progress = Progress::new(stmts.len()); - if let Some(ref mut tracker) = tracker { - tracker.running(); - } + for stmt in &stmts { + progress.next(stmt); + + let mut tracker = trackers + .lock() + .get_mut(num) + .and_then(|trackers| trackers.remove(stmt.sql())); + + if let Some(ref mut tracker) = tracker { + tracker.running(); + } - if let Err(err) = primary.execute(stmt.deref()).await { - if let backend::Error::ExecutionError(ref err) = err { - let code = &err.code; - - if let Statement::Other { idempotent, .. } = stmt { - if !idempotent { - if matches!(code.as_str(), "42P16" | "42710" | "42809" | "42P07") { - warn!("entity already exists, skipping"); - continue; - } else if !ignore_errors { - if let Some(ref mut tracker) = tracker { - tracker.error(err); + if let Err(err) = primary.execute(stmt.deref()).await { + if let backend::Error::ExecutionError(ref err) = err { + let code = &err.code; + + if let Statement::Other { idempotent, .. } = stmt { + if !idempotent { + if matches!( + code.as_str(), + "42P16" | "42710" | "42809" | "42P07" + ) { + warn!("entity already exists, skipping"); + continue; + } else if !ignore_errors { + if let Some(ref mut tracker) = tracker { + tracker.error(err); + } + return Err(Error::Backend( + backend::Error::ExecutionError(err.clone()), + )); + } else { + warn!("skipping: {}", err); } - return Err(Error::Backend(backend::Error::ExecutionError( - err.clone(), - ))); - } else { - warn!("skipping: {}", err); } } + } else { + return Err(err.into()); + } + if ignore_errors { + warn!("skipping: {}", err); + } else { + return Err(err.into()); } - } else { - return Err(err.into()); - } - if ignore_errors { - warn!("skipping: {}", err); - } else { - return Err(err.into()); } + progress.done(); } - progress.done(); - } + + Ok::<(), Error>(()) + })); + } + + for handle in handles { + handle.await??; } Ok(())