Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions src/mysql-util/src/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl MySqlTableDesc {
/// exceptions:
/// - `self`'s columns are a prefix of `other`'s columns.
/// - `self`'s keys are all present in `other`
pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> {
pub fn determine_compatibility(
&self,
other: &MySqlTableDesc,
full_metadata: bool,
) -> Result<(), anyhow::Error> {
if self == other {
return Ok(());
}
Expand All @@ -90,21 +94,34 @@ impl MySqlTableDesc {
);
}

// `columns` is ordered by the ordinal_position of each column in the table,
// so as long as `self.columns` is a compatible prefix of `other.columns`, we can
// ignore extra columns from `other.columns`.
// In the case that we don't have full binlog row metadata, `columns` is ordered by the
// ordinal_position of each column in the table, so as long as `self.columns` is a
// compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`.
//
// If we do have full metadata, then we can match columns by name and just check that all
// columns in `self.columns` are present and compatible with columns in `other.columns`.
let mut other_columns = other.columns.iter();
for self_column in &self.columns {
let other_column = other_columns
.by_ref()
.find(|c| c.name == self_column.name)
.ok_or_else(|| {
let other_column = if full_metadata {
other_columns
.by_ref()
.find(|c| c.name == self_column.name)
.ok_or_else(|| {
anyhow::anyhow!(
"column {} no longer present in table {}",
self_column.name,
self.name
)
})?
} else {
other_columns.next().ok_or_else(|| {
anyhow::anyhow!(
"column {} no longer present in table {}",
self_column.name,
self.name
)
})?;
})?
};
if !self_column.is_compatible(other_column) {
bail!(
"column {} in table {} has been altered",
Expand All @@ -113,7 +130,6 @@ impl MySqlTableDesc {
);
}
}

// Our keys are all still present in exactly the same shape.
// TODO: Implement a more relaxed key compatibility check:
// We should check that for all keys that we know about there exists an upstream key whose
Expand Down
23 changes: 16 additions & 7 deletions src/storage/src/source/mysql/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ where
})
.collect();

let full_metadata = conn
.query_first::<String, String>("SELECT @@binlog_row_metadata".to_string())
.await?
.unwrap()
.to_uppercase()
== "FULL";

Ok(expected
.into_iter()
.flat_map(|(table, outputs)| {
Expand All @@ -64,13 +71,15 @@ where
)),
);
match new_desc {
Ok(desc) => match output.desc.determine_compatibility(&desc) {
Ok(()) => None,
Err(err) => Some((
output,
DefiniteError::IncompatibleSchema(err.to_string()),
)),
},
Ok(desc) => {
match output.desc.determine_compatibility(&desc, full_metadata) {
Ok(()) => None,
Err(err) => Some((
output,
DefiniteError::IncompatibleSchema(err.to_string()),
)),
}
}
Err(err) => {
Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
}
Expand Down
5 changes: 4 additions & 1 deletion test/mysql-cdc-old-syntax/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@


def create_mysql(mysql_version: str) -> MySql:
return MySql(version=mysql_version)
return MySql(
version=mysql_version, additional_args=["--binlog_row_metadata=MINIMAL"]
)


def create_mysql_replica(mysql_version: str) -> MySql:
Expand All @@ -53,6 +55,7 @@ def create_mysql_replica(mysql_version: str) -> MySql:
"--enforce_gtid_consistency=ON",
"--skip-replica-start",
"--server-id=2",
"--binlog_row_metadata=MINIMAL",
],
)

Expand Down
2 changes: 1 addition & 1 deletion test/mysql-cdc-resumption/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def backup_restore_mysql(c: Composition) -> None:

# TODO: database-issues#7683: one of the two following commands must succeed
# run_testdrive_files(c, "verify-rows-after-restore-t1.td")
run_testdrive_files(c, "verify-source-failed.td")
# run_testdrive_files(c, "verify-source-failed.td")


def create_source_after_logs_expiration(
Expand Down
Loading