From 78d48a7f2c76139cc7d036e47894d3b4d535655b Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sun, 5 Apr 2026 08:58:56 -0700 Subject: [PATCH 1/4] fix: replicate omni tables correctly --- integration/copy_data/pgbench.sql | 30 +++++++++++ integration/copy_data/setup.sql | 37 +++++++++++++ .../logical/publisher/publisher_impl.rs | 54 +++++++++++++++---- .../replication/logical/publisher/table.rs | 20 ++++++- .../replication/logical/subscriber/stream.rs | 13 ++++- 5 files changed, 142 insertions(+), 12 deletions(-) diff --git a/integration/copy_data/pgbench.sql b/integration/copy_data/pgbench.sql index f0ab53571..e0942c802 100644 --- a/integration/copy_data/pgbench.sql +++ b/integration/copy_data/pgbench.sql @@ -31,6 +31,32 @@ INSERT INTO copy_data.log_actions (id, tenant_id, action) VALUES (:log_id, :tenant_id, 'bench') ON CONFLICT (id) DO NOTHING; +\set country_id random(1, 10) +\set currency_id random(1, 10) +\set category_id random(1, 10) + +-- Upsert a country. +INSERT INTO copy_data.countries (id, code, name) +VALUES (:country_id, 'X' || :country_id, 'Bench Country ' || :country_id) +ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name; + +-- Read a country. +SELECT id, code, name FROM copy_data.countries WHERE id = :country_id; + +-- Update a currency symbol. +UPDATE copy_data.currencies SET symbol = '$' WHERE id = :currency_id; + +-- Read a currency. +SELECT id, code, name, symbol FROM copy_data.currencies WHERE id = :currency_id; + +-- Upsert a category. +INSERT INTO copy_data.categories (id, name, parent_id) +VALUES (:category_id, 'Bench Category ' || :category_id, NULL) +ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name; + +-- Read a category. +SELECT id, name, parent_id FROM copy_data.categories WHERE id = :category_id; + -- Clean up everything we created. DELETE FROM copy_data.orders WHERE id = :order_id; @@ -38,3 +64,7 @@ DELETE FROM copy_data.log_actions WHERE id = :log_id; DELETE FROM copy_data.users WHERE id = :user_id AND tenant_id = :tenant_id AND email = 'bench_' || :user_id || '@example.com'; + +DELETE FROM copy_data.countries WHERE id = :country_id AND code = 'X' || :country_id; + +DELETE FROM copy_data.categories WHERE id = :category_id AND name = 'Bench Category ' || :category_id; diff --git a/integration/copy_data/setup.sql b/integration/copy_data/setup.sql index fa6114e5e..14358a665 100644 --- a/integration/copy_data/setup.sql +++ b/integration/copy_data/setup.sql @@ -45,6 +45,26 @@ CREATE TABLE copy_data.with_identity( tenant_id BIGINT NOT NULL ); +-- Omni (non-sharded) tables: no tenant_id column. +CREATE TABLE IF NOT EXISTS copy_data.countries ( + id SERIAL PRIMARY KEY, + code CHAR(2) NOT NULL UNIQUE, + name VARCHAR NOT NULL +); + +CREATE TABLE IF NOT EXISTS copy_data.currencies ( + id SERIAL PRIMARY KEY, + code CHAR(3) NOT NULL UNIQUE, + name VARCHAR NOT NULL, + symbol VARCHAR(5) +); + +CREATE TABLE IF NOT EXISTS copy_data.categories ( + id SERIAL PRIMARY KEY, + name VARCHAR NOT NULL, + parent_id INT REFERENCES copy_data.categories(id) +); + DROP PUBLICATION IF EXISTS pgdog; CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data; @@ -154,3 +174,20 @@ FROM generate_series(1, 10000); INSERT INTO copy_data.with_identity (tenant_id) SELECT floor(random() * 10000)::bigint FROM generate_series(1, 10000); + +INSERT INTO copy_data.countries (code, name) VALUES + ('US', 'United States'), ('GB', 'United Kingdom'), ('DE', 'Germany'), + ('FR', 'France'), ('JP', 'Japan'), ('CA', 'Canada'), + ('AU', 'Australia'), ('BR', 'Brazil'), ('IN', 'India'), ('CN', 'China'); + +INSERT INTO copy_data.currencies (code, name, symbol) VALUES + ('USD', 'US Dollar', '$'), ('EUR', 'Euro', '€'), ('GBP', 'British Pound', '£'), + ('JPY', 'Japanese Yen', '¥'), ('CAD', 'Canadian Dollar', 'C$'), + ('AUD', 'Australian Dollar', 'A$'), ('BRL', 'Brazilian Real', 'R$'), + ('INR', 'Indian Rupee', '₹'), ('CNY', 'Chinese Yuan', '¥'), ('CHF', 'Swiss Franc', 'Fr'); + +INSERT INTO copy_data.categories (name, parent_id) VALUES + ('Electronics', NULL), ('Clothing', NULL), ('Books', NULL), + ('Home & Garden', NULL), ('Sports', NULL); +INSERT INTO copy_data.categories (name, parent_id) VALUES + ('Phones', 1), ('Laptops', 1), ('Shirts', 2), ('Pants', 2), ('Fiction', 3); diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index f98463a9a..e96fe7ae9 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -70,14 +70,47 @@ impl Publisher { } /// Synchronize tables for all shards. - pub async fn sync_tables(&mut self) -> Result<(), Error> { + pub async fn sync_tables(&mut self, data_sync: bool) -> Result<(), Error> { + let sharding_tables = &self.cluster.sharding_schema().tables; + let mut omnisharded = HashSet::new(); + for (number, shard) in self.cluster.shards().iter().enumerate() { // Load tables from publication. let mut primary = shard.primary(&Request::default()).await?; let tables = Table::load(&self.publication, &mut primary, self.query_parser_engine).await?; - self.tables.insert(number, tables); + // For data sync, split omni tables evenly between shards. + if data_sync { + omnisharded.extend( + tables + .iter() + .filter(|table| !table.is_sharded(&sharding_tables)) + .cloned() + .collect::>(), + ); + + self.tables.insert( + number, + tables + .into_iter() + .filter(|table| table.is_sharded(&sharding_tables)) + .collect(), + ); + } else { + // For replication, process changes from all shards. + self.tables.insert(number, tables); + } + } + + // Distribute omni tables rougly equally between all shards. + let mut shard_index = 0; + for table in omnisharded { + let shard = shard_index % self.cluster.shards().len(); + if let Some(shard) = self.tables.get_mut(&shard) { + shard.push(table); + } + shard_index += 1; } Ok(()) @@ -117,9 +150,7 @@ impl Publisher { let mut streams = vec![]; // Synchronize tables from publication. - if self.tables.is_empty() { - self.sync_tables().await?; - } + self.sync_tables(false).await?; // Create replication slots if we haven't already. if self.slots.is_empty() { @@ -239,12 +270,17 @@ impl Publisher { pub async fn data_sync(&mut self, dest: &Cluster) -> Result<(), Error> { // Create replication slots. self.create_slots().await?; + // Fetch schema. + self.sync_tables(true).await?; + let mut handles = vec![]; for (number, shard) in self.cluster.shards().iter().enumerate() { - let mut primary = shard.primary(&Request::default()).await?; - let tables = - Table::load(&self.publication, &mut primary, self.query_parser_engine).await?; + let tables = self + .tables + .get(&number) + .ok_or(Error::NoReplicationTables(number))? + .clone(); info!( "table sync starting for {} tables, shard={}", diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index f7236de72..8859c6867 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -11,8 +11,9 @@ use crate::backend::replication::publisher::progress::Progress; use crate::backend::replication::publisher::Lsn; use crate::backend::replication::status::TableCopy; -use crate::backend::{Cluster, Server}; +use crate::backend::{Cluster, Server, ShardedTables}; use crate::config::config; +use crate::frontend::router::parser::Column; use crate::net::replication::StatusUpdate; use crate::util::escape_identifier; @@ -184,6 +185,23 @@ impl Table { Ok(()) } + /// Check if this table is sharded. + pub fn is_sharded(&self, tables: &ShardedTables) -> bool { + for column in &self.columns { + let c = Column { + name: &column.name, + table: Some(&self.table.name), + schema: Some(&self.table.schema), + }; + + if tables.get_table(c).is_some() { + return true; + } + } + + false + } + pub async fn data_sync( &mut self, source: &Address, diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 29c0cb64b..34af7e8d8 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -63,6 +63,7 @@ struct Statements { upsert: Statement, update: Statement, delete: Statement, + omni: bool, } #[derive(Default, Debug, Clone)] @@ -284,8 +285,15 @@ impl StreamSubscriber { if let Some(statements) = self.statements.get(&insert.oid) { // Convert TupleData into a Bind message. We can now insert that tuple // using a prepared statement. - let mut context = - StreamContext::new(&self.cluster, &insert.tuple_data, statements.insert.parse()); + let mut context = StreamContext::new( + &self.cluster, + &insert.tuple_data, + if statements.omni { + statements.upsert.parse() + } else { + statements.insert.parse() + }, + ); let bind = context.bind().clone(); let shard = context.shard()?; @@ -481,6 +489,7 @@ impl StreamSubscriber { upsert, update, delete, + omni: !table.is_sharded(&self.cluster.sharding_schema().tables), }, ); From 0682258deda4349c89985a14fe60db8517c8ded8 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sun, 5 Apr 2026 13:03:02 -0700 Subject: [PATCH 2/4] fix --- integration/copy_data/dev.sh | 12 +++++ integration/copy_data/init.sql | 5 ++ integration/copy_data/pgdog.toml | 26 +++++++++++ integration/copy_data/setup.sql | 16 +++---- integration/copy_data/users.toml | 6 +++ .../logical/publisher/parallel_sync.rs | 28 +++++++---- .../logical/publisher/publisher_impl.rs | 46 +++++++++---------- .../replication/logical/publisher/table.rs | 6 +++ pgdog/src/frontend/router/parser/copy.rs | 12 ++++- 9 files changed, 113 insertions(+), 44 deletions(-) diff --git a/integration/copy_data/dev.sh b/integration/copy_data/dev.sh index d035c4dd4..31bcd4ed5 100644 --- a/integration/copy_data/dev.sh +++ b/integration/copy_data/dev.sh @@ -22,10 +22,20 @@ pushd ${SCRIPT_DIR} psql -f init.sql +# +# 0 -> 2 +# ${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog ${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data ${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover +# +# 2 -> 2 +# +${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog +${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data +${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover + # Start replication in the background. ${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog & REPL_PID=$! @@ -63,4 +73,6 @@ if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 exit ${REPL_EXIT} fi +psql -f init.sql + popd diff --git a/integration/copy_data/init.sql b/integration/copy_data/init.sql index 5e8dfb34f..771564768 100644 --- a/integration/copy_data/init.sql +++ b/integration/copy_data/init.sql @@ -4,5 +4,10 @@ DROP SCHEMA IF EXISTS copy_data CASCADE; DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog DROP SCHEMA IF EXISTS copy_data CASCADE; +\c shard_0 +DROP SCHEMA IF EXISTS copy_data CASCADE; +\c shard_1 +DROP SCHEMA IF EXISTS copy_data CASCADE; +\c pgdog SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; \i setup.sql diff --git a/integration/copy_data/pgdog.toml b/integration/copy_data/pgdog.toml index 36e856411..f4dd7233d 100644 --- a/integration/copy_data/pgdog.toml +++ b/integration/copy_data/pgdog.toml @@ -1,11 +1,17 @@ [general] resharding_copy_format = "binary" +# +# Source of data. +# [[databases]] name = "source" host = "127.0.0.1" database_name = "pgdog" +# +# Reshard 0 -> 2 +# [[databases]] name = "destination" host = "127.0.0.1" @@ -23,5 +29,25 @@ database = "destination" column = "tenant_id" data_type = "bigint" +# +# Reshard 2 -> 2 +# +[[databases]] +name = "destination2" +host = "127.0.0.1" +database_name = "shard_0" +shard = 0 + +[[databases]] +name = "destination2" +host = "127.0.0.1" +database_name = "shard_1" +shard = 1 + +[[sharded_tables]] +database = "destination2" +column = "tenant_id" +data_type = "bigint" + [admin] password = "pgdog" diff --git a/integration/copy_data/setup.sql b/integration/copy_data/setup.sql index 14358a665..5c3dd266c 100644 --- a/integration/copy_data/setup.sql +++ b/integration/copy_data/setup.sql @@ -47,22 +47,22 @@ CREATE TABLE copy_data.with_identity( -- Omni (non-sharded) tables: no tenant_id column. CREATE TABLE IF NOT EXISTS copy_data.countries ( - id SERIAL PRIMARY KEY, - code CHAR(2) NOT NULL UNIQUE, + id BIGSERIAL PRIMARY KEY, + code VARCHAR NOT NULL UNIQUE, name VARCHAR NOT NULL ); CREATE TABLE IF NOT EXISTS copy_data.currencies ( - id SERIAL PRIMARY KEY, - code CHAR(3) NOT NULL UNIQUE, + id BIGSERIAL PRIMARY KEY, + code VARCHAR NOT NULL UNIQUE, name VARCHAR NOT NULL, - symbol VARCHAR(5) + symbol VARCHAR ); CREATE TABLE IF NOT EXISTS copy_data.categories ( - id SERIAL PRIMARY KEY, + id BIGSERIAL PRIMARY KEY, name VARCHAR NOT NULL, - parent_id INT REFERENCES copy_data.categories(id) + parent_id INT ); DROP PUBLICATION IF EXISTS pgdog; @@ -165,7 +165,7 @@ FROM items_raw ir; INSERT INTO copy_data.log_actions (tenant_id, action) SELECT - CASE WHEN random() < 0.2 THEN NULL ELSE (floor(random() * 10000) + 1)::bigint END AS tenant_id, + (floor(random() * 10000) + 1)::bigint AS tenant_id, (ARRAY['login', 'logout', 'click', 'purchase', 'view', 'error'])[ floor(random() * 6 + 1)::int ] AS action diff --git a/integration/copy_data/users.toml b/integration/copy_data/users.toml index 67142d309..66009cf99 100644 --- a/integration/copy_data/users.toml +++ b/integration/copy_data/users.toml @@ -9,3 +9,9 @@ database = "destination" name = "pgdog" password = "pgdog" schema_admin = true + +[[users]] +database = "destination2" +name = "pgdog" +password = "pgdog" +schema_admin = true diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index 418eb648d..90b5ee1ae 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -11,6 +11,7 @@ use tokio::{ mpsc::{unbounded_channel, UnboundedSender}, Semaphore, }, + task::JoinHandle, }; use tracing::info; @@ -32,7 +33,7 @@ struct ParallelSync { impl ParallelSync { // Run parallel sync. - pub fn run(mut self) { + pub fn run(mut self) -> JoinHandle> { spawn(async move { // Record copy in queue before waiting for permit. let tracker = TableCopy::new(&self.table.table.schema, &self.table.table.name); @@ -68,7 +69,7 @@ impl ParallelSync { .map_err(|_| Error::ParallelConnection)?; Ok::<(), Error>(()) - }); + }) } } @@ -115,16 +116,19 @@ impl ParallelSyncManager { let (tx, mut rx) = unbounded_channel(); let mut tables = vec![]; + let mut handles = vec![]; for table in self.tables { - ParallelSync { - table, - addr: replica.addr().clone(), - dest: self.dest.clone(), - tx: tx.clone(), - permit: self.permit.clone(), - } - .run(); + handles.push( + ParallelSync { + table, + addr: replica.addr().clone(), + dest: self.dest.clone(), + tx: tx.clone(), + permit: self.permit.clone(), + } + .run(), + ); } drop(tx); @@ -133,6 +137,10 @@ impl ParallelSyncManager { tables.push(table?); } + for handle in handles { + handle.await??; + } + Ok(tables) } } diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index e96fe7ae9..90f1feb99 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -70,9 +70,12 @@ impl Publisher { } /// Synchronize tables for all shards. - pub async fn sync_tables(&mut self, data_sync: bool) -> Result<(), Error> { - let sharding_tables = &self.cluster.sharding_schema().tables; - let mut omnisharded = HashSet::new(); + pub async fn sync_tables(&mut self, data_sync: bool, dest: &Cluster) -> Result<(), Error> { + let sharding_tables = dest.sharding_schema().tables; + + // Omnisharded tables are split evenly between shards + // during copy to avoid duplicate key errors. + let mut omnisharded = HashMap::new(); for (number, shard) in self.cluster.shards().iter().enumerate() { // Load tables from publication. @@ -82,21 +85,15 @@ impl Publisher { // For data sync, split omni tables evenly between shards. if data_sync { - omnisharded.extend( - tables - .iter() - .filter(|table| !table.is_sharded(&sharding_tables)) - .cloned() - .collect::>(), - ); - - self.tables.insert( - number, - tables - .into_iter() - .filter(|table| table.is_sharded(&sharding_tables)) - .collect(), - ); + for table in tables { + let omni = !table.is_sharded(&sharding_tables); + if omni { + omnisharded.insert(table.key(), table); + } else { + let entry = self.tables.entry(number).or_insert(vec![]); + entry.push(table); + } + } } else { // For replication, process changes from all shards. self.tables.insert(number, tables); @@ -105,10 +102,11 @@ impl Publisher { // Distribute omni tables rougly equally between all shards. let mut shard_index = 0; - for table in omnisharded { + for table in omnisharded.into_values() { let shard = shard_index % self.cluster.shards().len(); - if let Some(shard) = self.tables.get_mut(&shard) { - shard.push(table); + if let Some(tables) = self.tables.get_mut(&shard) { + tables.push(table); + } else { } shard_index += 1; } @@ -150,7 +148,7 @@ impl Publisher { let mut streams = vec![]; // Synchronize tables from publication. - self.sync_tables(false).await?; + self.sync_tables(false, dest).await?; // Create replication slots if we haven't already. if self.slots.is_empty() { @@ -271,7 +269,7 @@ impl Publisher { // Create replication slots. self.create_slots().await?; // Fetch schema. - self.sync_tables(true).await?; + self.sync_tables(true, dest).await?; let mut handles = vec![]; diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index 8859c6867..60937989f 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -66,6 +66,11 @@ impl Table { Ok(results) } + /// Key used for duplicate check. + pub(super) fn key(&self) -> (String, String) { + (self.table.schema.clone(), self.table.name.clone()) + } + /// Check that the table supports replication. pub fn valid(&self) -> Result<(), Error> { if !self.columns.iter().any(|c| c.identity) { @@ -256,6 +261,7 @@ impl Table { } copy_sub.copy_done().await?; + copy_sub.disconnect().await?; progress.done(); diff --git a/pgdog/src/frontend/router/parser/copy.rs b/pgdog/src/frontend/router/parser/copy.rs index f85362f37..348576b11 100644 --- a/pgdog/src/frontend/router/parser/copy.rs +++ b/pgdog/src/frontend/router/parser/copy.rs @@ -313,7 +313,12 @@ impl CopyParser { mod test { use pg_query::parse; - use crate::config::config; + use crate::{ + backend::server::test::test_server, + config::config, + frontend::router::parser::binary::header::Header, + net::{CopyDone, Protocol, Query, ToBytes}, + }; use super::*; @@ -553,7 +558,7 @@ mod test { _ => panic!("not a copy"), }; - let mut copy = CopyParser::new(©, &Cluster::default()).unwrap(); + let mut copy = CopyParser::new(©, &Cluster::new_test(&config())).unwrap(); assert!(copy.is_from); assert!(copy.headers); let mut data = b"PGCOPY".to_vec(); @@ -574,7 +579,10 @@ mod test { let sharded = copy.shard(&[header]).unwrap(); assert_eq!(sharded.len(), 3); assert_eq!(sharded[0].message().data(), &data[..19]); // Header is 19 bytes long. + assert_eq!(sharded[0].shard(), &Shard::All); assert_eq!(sharded[1].message().data().len(), 2 + 4 + 8 + 4 + 3); + assert!(matches!(sharded[1].shard(), &Shard::Direct(_))); assert_eq!(sharded[2].message().data(), (-1_i16).to_be_bytes()); + assert_eq!(sharded[2].shard(), &Shard::All) } } From cfffa10a2b8db5e8b4cf5874e448e0bba222e36c Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sun, 5 Apr 2026 17:58:14 -0700 Subject: [PATCH 3/4] more --- integration/copy_data/dev.sh | 55 +++++++++++++++++++ .../src/backend/replication/logical/error.rs | 3 + .../replication/logical/subscriber/copy.rs | 10 +++- pgdog/src/frontend/router/parser/copy.rs | 7 +-- 4 files changed, 66 insertions(+), 9 deletions(-) diff --git a/integration/copy_data/dev.sh b/integration/copy_data/dev.sh index 31bcd4ed5..a4db4caeb 100644 --- a/integration/copy_data/dev.sh +++ b/integration/copy_data/dev.sh @@ -29,6 +29,34 @@ ${PGDOG_BIN} schema-sync --from-database source --to-database destination --publ ${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data ${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover +# Check row counts: source (pgdog) vs destination (pgdog1 + pgdog2) +echo "Checking row counts: source -> destination..." +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" + +for TABLE in ${SHARDED_TABLES}; do + SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST=$((DST1 + DST2)) + if [ "${SRC}" -ne "${DST}" ]; then + echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})" + exit 1 + fi + echo "OK ${TABLE}: ${SRC} rows" +done + +for TABLE in ${OMNI_TABLES}; do + SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then + echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)" + exit 1 + fi + echo "OK ${TABLE}: ${SRC} rows on each shard" +done + # # 2 -> 2 # @@ -36,6 +64,33 @@ ${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 ${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data ${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover +# Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1) +echo "Checking row counts: destination -> destination2..." +for TABLE in ${SHARDED_TABLES}; do + SRC1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + SRC2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + SRC=$((SRC1 + SRC2)) + DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST=$((DST1 + DST2)) + if [ "${SRC}" -ne "${DST}" ]; then + echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})" + exit 1 + fi + echo "OK ${TABLE}: ${SRC} rows" +done + +for TABLE in ${OMNI_TABLES}; do + SRC=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then + echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)" + exit 1 + fi + echo "OK ${TABLE}: ${SRC} rows on each shard" +done + # Start replication in the background. ${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog & REPL_PID=$! diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index da103b7f7..375a813f6 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -122,6 +122,9 @@ pub enum Error { #[error("task is not a replication task")] NotReplication, + + #[error("binary format mismatch (likely int -> bigint), use text copy instead: {0}")] + BinaryFormatMistmatch(ErrorResponse), } impl From for Error { diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 5c22ee4da..6022828e8 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -142,9 +142,13 @@ impl CopySubscriber { let command_complete = server.read().await?; match command_complete.code() { 'E' => { - return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - command_complete.to_bytes()?, - )?))) + let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?; + if error.code == "08P01" && error.message == "insufficient data left in message" + { + return Err(Error::BinaryFormatMistmatch(error)); + } else { + return Err(Error::PgError(Box::new(error))); + } } 'C' => (), c => return Err(Error::OutOfSync(c)), diff --git a/pgdog/src/frontend/router/parser/copy.rs b/pgdog/src/frontend/router/parser/copy.rs index 348576b11..545220929 100644 --- a/pgdog/src/frontend/router/parser/copy.rs +++ b/pgdog/src/frontend/router/parser/copy.rs @@ -313,12 +313,7 @@ impl CopyParser { mod test { use pg_query::parse; - use crate::{ - backend::server::test::test_server, - config::config, - frontend::router::parser::binary::header::Header, - net::{CopyDone, Protocol, Query, ToBytes}, - }; + use crate::config::config; use super::*; From a9e05c99ff49734804c0a86a5e6b088d0a65d1d0 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Sun, 5 Apr 2026 18:03:21 -0700 Subject: [PATCH 4/4] save --- .../src/backend/replication/logical/publisher/publisher_impl.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 90f1feb99..b994d29db 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -106,7 +106,6 @@ impl Publisher { let shard = shard_index % self.cluster.shards().len(); if let Some(tables) = self.tables.get_mut(&shard) { tables.push(table); - } else { } shard_index += 1; }