Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions integration/copy_data/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,75 @@ pushd ${SCRIPT_DIR}

psql -f init.sql

#
# 0 -> 2
#
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog
${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover

# Check row counts: source (pgdog) vs destination (pgdog1 + pgdog2)
echo "Checking row counts: source -> destination..."
SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity"
OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories"

for TABLE in ${SHARDED_TABLES}; do
SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST=$((DST1 + DST2))
if [ "${SRC}" -ne "${DST}" ]; then
echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})"
exit 1
fi
echo "OK ${TABLE}: ${SRC} rows"
done

for TABLE in ${OMNI_TABLES}; do
SRC=$(psql -d pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}")
if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then
echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)"
exit 1
fi
echo "OK ${TABLE}: ${SRC} rows on each shard"
done

#
# 2 -> 2
#
${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog
${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data
${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover

# Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1)
echo "Checking row counts: destination -> destination2..."
for TABLE in ${SHARDED_TABLES}; do
SRC1=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
SRC2=$(psql -d pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}")
SRC=$((SRC1 + SRC2))
DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST=$((DST1 + DST2))
if [ "${SRC}" -ne "${DST}" ]; then
echo "MISMATCH ${TABLE}: source=${SRC} destination=${DST} (shard0=${DST1} shard1=${DST2})"
exit 1
fi
echo "OK ${TABLE}: ${SRC} rows"
done

for TABLE in ${OMNI_TABLES}; do
SRC=$(psql -d pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST1=$(psql -d shard_0 -tAc "SELECT COUNT(*) FROM ${TABLE}")
DST2=$(psql -d shard_1 -tAc "SELECT COUNT(*) FROM ${TABLE}")
if [ "${SRC}" -ne "${DST1}" ] || [ "${SRC}" -ne "${DST2}" ]; then
echo "MISMATCH ${TABLE}: source=${SRC} shard0=${DST1} shard1=${DST2} (expected ${SRC} on each shard)"
exit 1
fi
echo "OK ${TABLE}: ${SRC} rows on each shard"
done

# Start replication in the background.
${PGDOG_BIN} data-sync --replicate-only --from-database source --to-database destination --publication pgdog &
REPL_PID=$!
Expand Down Expand Up @@ -63,4 +128,6 @@ if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143
exit ${REPL_EXIT}
fi

psql -f init.sql

popd
5 changes: 5 additions & 0 deletions integration/copy_data/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,10 @@ DROP SCHEMA IF EXISTS copy_data CASCADE;
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c shard_0
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c shard_1
DROP SCHEMA IF EXISTS copy_data CASCADE;
\c pgdog
SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;
\i setup.sql
30 changes: 30 additions & 0 deletions integration/copy_data/pgbench.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,40 @@ INSERT INTO copy_data.log_actions (id, tenant_id, action)
VALUES (:log_id, :tenant_id, 'bench')
ON CONFLICT (id) DO NOTHING;

\set country_id random(1, 10)
\set currency_id random(1, 10)
\set category_id random(1, 10)

-- Upsert a country.
INSERT INTO copy_data.countries (id, code, name)
VALUES (:country_id, 'X' || :country_id, 'Bench Country ' || :country_id)
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name;

-- Read a country.
SELECT id, code, name FROM copy_data.countries WHERE id = :country_id;

-- Update a currency symbol.
UPDATE copy_data.currencies SET symbol = '$' WHERE id = :currency_id;

-- Read a currency.
SELECT id, code, name, symbol FROM copy_data.currencies WHERE id = :currency_id;

-- Upsert a category.
INSERT INTO copy_data.categories (id, name, parent_id)
VALUES (:category_id, 'Bench Category ' || :category_id, NULL)
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name;

-- Read a category.
SELECT id, name, parent_id FROM copy_data.categories WHERE id = :category_id;

-- Clean up everything we created.
DELETE FROM copy_data.orders WHERE id = :order_id;

DELETE FROM copy_data.log_actions WHERE id = :log_id;

DELETE FROM copy_data.users
WHERE id = :user_id AND tenant_id = :tenant_id AND email = 'bench_' || :user_id || '@example.com';

DELETE FROM copy_data.countries WHERE id = :country_id AND code = 'X' || :country_id;

DELETE FROM copy_data.categories WHERE id = :category_id AND name = 'Bench Category ' || :category_id;
26 changes: 26 additions & 0 deletions integration/copy_data/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
[general]
resharding_copy_format = "binary"

#
# Source of data.
#
[[databases]]
name = "source"
host = "127.0.0.1"
database_name = "pgdog"

#
# Reshard 0 -> 2
#
[[databases]]
name = "destination"
host = "127.0.0.1"
Expand All @@ -23,5 +29,25 @@ database = "destination"
column = "tenant_id"
data_type = "bigint"

#
# Reshard 2 -> 2
#
[[databases]]
name = "destination2"
host = "127.0.0.1"
database_name = "shard_0"
shard = 0

[[databases]]
name = "destination2"
host = "127.0.0.1"
database_name = "shard_1"
shard = 1

[[sharded_tables]]
database = "destination2"
column = "tenant_id"
data_type = "bigint"

[admin]
password = "pgdog"
39 changes: 38 additions & 1 deletion integration/copy_data/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ CREATE TABLE copy_data.with_identity(
tenant_id BIGINT NOT NULL
);

-- Omni (non-sharded) tables: no tenant_id column.
CREATE TABLE IF NOT EXISTS copy_data.countries (
id BIGSERIAL PRIMARY KEY,
code VARCHAR NOT NULL UNIQUE,
name VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS copy_data.currencies (
id BIGSERIAL PRIMARY KEY,
code VARCHAR NOT NULL UNIQUE,
name VARCHAR NOT NULL,
symbol VARCHAR
);

CREATE TABLE IF NOT EXISTS copy_data.categories (
id BIGSERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
parent_id INT
);

DROP PUBLICATION IF EXISTS pgdog;
CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data;

Expand Down Expand Up @@ -145,7 +165,7 @@ FROM items_raw ir;

INSERT INTO copy_data.log_actions (tenant_id, action)
SELECT
CASE WHEN random() < 0.2 THEN NULL ELSE (floor(random() * 10000) + 1)::bigint END AS tenant_id,
(floor(random() * 10000) + 1)::bigint AS tenant_id,
(ARRAY['login', 'logout', 'click', 'purchase', 'view', 'error'])[
floor(random() * 6 + 1)::int
] AS action
Expand All @@ -154,3 +174,20 @@ FROM generate_series(1, 10000);

INSERT INTO copy_data.with_identity (tenant_id)
SELECT floor(random() * 10000)::bigint FROM generate_series(1, 10000);

INSERT INTO copy_data.countries (code, name) VALUES
('US', 'United States'), ('GB', 'United Kingdom'), ('DE', 'Germany'),
('FR', 'France'), ('JP', 'Japan'), ('CA', 'Canada'),
('AU', 'Australia'), ('BR', 'Brazil'), ('IN', 'India'), ('CN', 'China');

INSERT INTO copy_data.currencies (code, name, symbol) VALUES
('USD', 'US Dollar', '$'), ('EUR', 'Euro', '€'), ('GBP', 'British Pound', '£'),
('JPY', 'Japanese Yen', '¥'), ('CAD', 'Canadian Dollar', 'C$'),
('AUD', 'Australian Dollar', 'A$'), ('BRL', 'Brazilian Real', 'R$'),
('INR', 'Indian Rupee', '₹'), ('CNY', 'Chinese Yuan', '¥'), ('CHF', 'Swiss Franc', 'Fr');

INSERT INTO copy_data.categories (name, parent_id) VALUES
('Electronics', NULL), ('Clothing', NULL), ('Books', NULL),
('Home & Garden', NULL), ('Sports', NULL);
INSERT INTO copy_data.categories (name, parent_id) VALUES
('Phones', 1), ('Laptops', 1), ('Shirts', 2), ('Pants', 2), ('Fiction', 3);
6 changes: 6 additions & 0 deletions integration/copy_data/users.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ database = "destination"
name = "pgdog"
password = "pgdog"
schema_admin = true

[[users]]
database = "destination2"
name = "pgdog"
password = "pgdog"
schema_admin = true
3 changes: 3 additions & 0 deletions pgdog/src/backend/replication/logical/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub enum Error {

#[error("task is not a replication task")]
NotReplication,

#[error("binary format mismatch (likely int -> bigint), use text copy instead: {0}")]
BinaryFormatMistmatch(ErrorResponse),
}

impl From<ErrorResponse> for Error {
Expand Down
28 changes: 18 additions & 10 deletions pgdog/src/backend/replication/logical/publisher/parallel_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{
mpsc::{unbounded_channel, UnboundedSender},
Semaphore,
},
task::JoinHandle,
};
use tracing::info;

Expand All @@ -32,7 +33,7 @@ struct ParallelSync {

impl ParallelSync {
// Run parallel sync.
pub fn run(mut self) {
pub fn run(mut self) -> JoinHandle<Result<(), Error>> {
spawn(async move {
// Record copy in queue before waiting for permit.
let tracker = TableCopy::new(&self.table.table.schema, &self.table.table.name);
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ParallelSync {
.map_err(|_| Error::ParallelConnection)?;

Ok::<(), Error>(())
});
})
}
}

Expand Down Expand Up @@ -115,16 +116,19 @@ impl ParallelSyncManager {

let (tx, mut rx) = unbounded_channel();
let mut tables = vec![];
let mut handles = vec![];

for table in self.tables {
ParallelSync {
table,
addr: replica.addr().clone(),
dest: self.dest.clone(),
tx: tx.clone(),
permit: self.permit.clone(),
}
.run();
handles.push(
ParallelSync {
table,
addr: replica.addr().clone(),
dest: self.dest.clone(),
tx: tx.clone(),
permit: self.permit.clone(),
}
.run(),
);
}

drop(tx);
Expand All @@ -133,6 +137,10 @@ impl ParallelSyncManager {
tables.push(table?);
}

for handle in handles {
handle.await??;
}

Ok(tables)
}
}
49 changes: 41 additions & 8 deletions pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,44 @@ impl Publisher {
}

/// Synchronize tables for all shards.
pub async fn sync_tables(&mut self) -> Result<(), Error> {
pub async fn sync_tables(&mut self, data_sync: bool, dest: &Cluster) -> Result<(), Error> {
let sharding_tables = dest.sharding_schema().tables;

// Omnisharded tables are split evenly between shards
// during copy to avoid duplicate key errors.
let mut omnisharded = HashMap::new();

for (number, shard) in self.cluster.shards().iter().enumerate() {
// Load tables from publication.
let mut primary = shard.primary(&Request::default()).await?;
let tables =
Table::load(&self.publication, &mut primary, self.query_parser_engine).await?;

self.tables.insert(number, tables);
// For data sync, split omni tables evenly between shards.
if data_sync {
for table in tables {
let omni = !table.is_sharded(&sharding_tables);
if omni {
omnisharded.insert(table.key(), table);
} else {
let entry = self.tables.entry(number).or_insert(vec![]);
entry.push(table);
}
}
} else {
// For replication, process changes from all shards.
self.tables.insert(number, tables);
}
}

// Distribute omni tables rougly equally between all shards.
let mut shard_index = 0;
for table in omnisharded.into_values() {
let shard = shard_index % self.cluster.shards().len();
if let Some(tables) = self.tables.get_mut(&shard) {
tables.push(table);
}
shard_index += 1;
}

Ok(())
Expand Down Expand Up @@ -117,9 +147,7 @@ impl Publisher {
let mut streams = vec![];

// Synchronize tables from publication.
if self.tables.is_empty() {
self.sync_tables().await?;
}
self.sync_tables(false, dest).await?;

// Create replication slots if we haven't already.
if self.slots.is_empty() {
Expand Down Expand Up @@ -239,12 +267,17 @@ impl Publisher {
pub async fn data_sync(&mut self, dest: &Cluster) -> Result<(), Error> {
// Create replication slots.
self.create_slots().await?;
// Fetch schema.
self.sync_tables(true, dest).await?;

let mut handles = vec![];

for (number, shard) in self.cluster.shards().iter().enumerate() {
let mut primary = shard.primary(&Request::default()).await?;
let tables =
Table::load(&self.publication, &mut primary, self.query_parser_engine).await?;
let tables = self
.tables
.get(&number)
.ok_or(Error::NoReplicationTables(number))?
.clone();

info!(
"table sync starting for {} tables, shard={}",
Expand Down
Loading
Loading