Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration/copy_data/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog2
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog
-- DROP SCHEMA IF EXISTS copy_data CASCADE;
DROP SCHEMA IF EXISTS copy_data CASCADE;
SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;
\i setup.sql
14 changes: 12 additions & 2 deletions pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl Publisher {
pub async fn data_sync(&mut self, dest: &Cluster) -> Result<(), Error> {
// Create replication slots.
self.create_slots().await?;
let mut handles = vec![];

for (number, shard) in self.cluster.shards().iter().enumerate() {
let mut primary = shard.primary(&Request::default()).await?;
Expand Down Expand Up @@ -272,8 +273,17 @@ impl Publisher {
resharding_only
};

let manager = ParallelSyncManager::new(tables, replicas, dest)?;
let tables = manager.run().await?;
let dest = dest.clone();
handles.push(spawn(async move {
let manager = ParallelSyncManager::new(tables, replicas, &dest)?;
let tables = manager.run().await?;

Ok::<Vec<Table>, Error>(tables)
}));
}

for (number, handle) in handles.into_iter().enumerate() {
let tables = handle.await??;

info!(
"table sync for {} tables complete [{}, shard: {}]",
Expand Down
4 changes: 4 additions & 0 deletions pgdog/src/backend/schema/sync/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use thiserror::Error;
use tokio::task::JoinError;

#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -37,6 +38,9 @@ pub enum Error {

#[error("publication \"{0}\" has no tables")]
PublicationNoTables(String),

#[error("tokio task join error")]
JoinError(#[from] JoinError),
}

impl From<crate::backend::replication::logical::Error> for Error {
Expand Down
115 changes: 71 additions & 44 deletions pgdog/src/backend/schema/sync/pg_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::{
collections::{HashMap, HashSet},
ops::Deref,
str::from_utf8,
sync::Arc,
};

use lazy_static::lazy_static;
use parking_lot::Mutex;
use pg_query::{
protobuf::{AlterTableType, ConstrType, ObjectType, ParseResult, RangeVar, String as PgString},
Node, NodeEnum,
Expand Down Expand Up @@ -110,7 +112,7 @@ fn should_convert_to_bigint<'a>(
is_integer_type(sval.as_str())
}

use tokio::process::Command;
use tokio::{process::Command, spawn};

#[derive(Debug, Clone)]
pub struct PgDump {
Expand Down Expand Up @@ -1102,14 +1104,22 @@ impl PgDumpOutput {
state: SyncState,
) -> Result<(), Error> {
let stmts = self.statements(state)?;
let mut trackers = (0..dest.shards().len())
.map(|shard| {
stmts
.iter()
.map(|stmt| (stmt.sql(), SchemaStatement::new(dest, stmt, shard, state)))
.collect::<HashMap<_, _>>()
})
.collect::<Vec<_>>();
let trackers = Arc::new(Mutex::new(
(0..dest.shards().len())
.map(|shard| {
stmts
.iter()
.map(|stmt| {
(
stmt.sql().to_string(),
SchemaStatement::new(dest, stmt, shard, state),
)
})
.collect::<HashMap<_, _>>()
})
.collect::<Vec<_>>(),
));
let mut handles = vec![];

for (num, shard) in dest.shards().iter().enumerate() {
let mut primary = shard.primary(&Request::default()).await?;
Expand All @@ -1121,51 +1131,68 @@ impl PgDumpOutput {
dest.name()
);

let mut progress = Progress::new(stmts.len());
let trackers = trackers.clone();
let output = self.clone();

for stmt in &stmts {
progress.next(stmt);
handles.push(spawn(async move {
let stmts = output.statements(state)?;

let mut tracker = trackers
.get_mut(num)
.and_then(|trackers| trackers.remove(stmt.sql()));
let mut progress = Progress::new(stmts.len());

if let Some(ref mut tracker) = tracker {
tracker.running();
}
for stmt in &stmts {
progress.next(stmt);

let mut tracker = trackers
.lock()
.get_mut(num)
.and_then(|trackers| trackers.remove(stmt.sql()));

if let Some(ref mut tracker) = tracker {
tracker.running();
}

if let Err(err) = primary.execute(stmt.deref()).await {
if let backend::Error::ExecutionError(ref err) = err {
let code = &err.code;

if let Statement::Other { idempotent, .. } = stmt {
if !idempotent {
if matches!(code.as_str(), "42P16" | "42710" | "42809" | "42P07") {
warn!("entity already exists, skipping");
continue;
} else if !ignore_errors {
if let Some(ref mut tracker) = tracker {
tracker.error(err);
if let Err(err) = primary.execute(stmt.deref()).await {
if let backend::Error::ExecutionError(ref err) = err {
let code = &err.code;

if let Statement::Other { idempotent, .. } = stmt {
if !idempotent {
if matches!(
code.as_str(),
"42P16" | "42710" | "42809" | "42P07"
) {
warn!("entity already exists, skipping");
continue;
} else if !ignore_errors {
if let Some(ref mut tracker) = tracker {
tracker.error(err);
}
return Err(Error::Backend(
backend::Error::ExecutionError(err.clone()),
));
} else {
warn!("skipping: {}", err);
}
return Err(Error::Backend(backend::Error::ExecutionError(
err.clone(),
)));
} else {
warn!("skipping: {}", err);
}
}
} else {
return Err(err.into());
}
if ignore_errors {
warn!("skipping: {}", err);
} else {
return Err(err.into());
}
} else {
return Err(err.into());
}
if ignore_errors {
warn!("skipping: {}", err);
} else {
return Err(err.into());
}
progress.done();
}
progress.done();
}

Ok::<(), Error>(())
}));
}

for handle in handles {
handle.await??;
}

Ok(())
Expand Down
Loading