diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e3799c9..36f4a70 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,3 +45,13 @@ jobs: - run: helm lint charts/initium - run: helm template test-release charts/initium --set sampleDeployment.enabled=true --set 'initContainers[0].name=wait' --set 'initContainers[0].command[0]=wait-for' --set 'initContainers[0].args[0]=--target' --set 'initContainers[0].args[1]=tcp://localhost:5432' - run: helm unittest charts/initium + ci: + if: always() + needs: [lint, test, build, helm-lint] + runs-on: ubuntu-latest + steps: + - run: | + if [[ "${{ contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') }}" == "true" ]]; then + echo "One or more jobs failed or were cancelled" + exit 1 + fi diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ae737f..1a67764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Reconcile mode for seed sets (`mode: reconcile`): declarative seeding where the spec is the source of truth. Changed rows are updated, new rows are inserted, and removed rows are deleted automatically. +- `--reconcile-all` CLI flag to override all seed sets to reconcile mode for a single run. +- `--dry-run` CLI flag to preview what changes reconciliation would make without modifying the database. +- Per-row tracking table (`initium_seed_rows`) for change detection and orphan deletion in reconcile mode. +- Content hash (`content_hash` column) on the seed tracking table for fast "anything changed?" checks before row-by-row comparison. +- Automatic migration of existing tracking tables: the `content_hash` column is added transparently on first run. Existing seed sets remain in `once` mode with no behavior change. + +### Changed +- Reconcile hash-skip now only applies to seed sets without `@ref:` expressions. Seed sets containing `@ref:` references always run row-level reconciliation to prevent stale foreign keys when upstream auto-generated IDs shift. +- Hash computation sorts tables by `(order, table_name)` instead of just `order` for deterministic hashing when multiple tables share the same order value. +- Dry-run mode treats `@ref:` expressions as literals to avoid failures when references haven't been populated yet (e.g., auto_id + refs within the same seed set). + +### Fixed +- `--reconcile-all` now rejects seed sets where any table is missing `unique_key`, preventing reconciliation from generating identical row keys and updating/deleting wrong rows. +- Reconcile mode validation now rejects empty/whitespace-only `unique_key` entries and reserved column names like `_ref`. +- Reconcile mode validation now checks that every row contains all `unique_key` columns, preventing incomplete row keys during reconciliation. +- MySQL row tracking table now uses SHA-256 generated column (`row_key_hash`) for the primary key instead of `row_key(255)` prefix, preventing key collisions for JSON keys exceeding 255 bytes. + ## [1.1.0] - 2026-02-26 ### Added diff --git a/docs/seeding.md b/docs/seeding.md index ed8f703..d8f0e1f 100644 --- a/docs/seeding.md +++ b/docs/seeding.md @@ -50,6 +50,7 @@ phases: seed_sets: # Optional. Seed sets to apply in this phase. - name: initial_data order: 1 # Optional. Controls execution order across seed sets. + mode: once # Optional. "once" (default) or "reconcile". tables: - table: config order: 1 # Optional. Controls execution order within a seed set. @@ -82,6 +83,7 @@ phases: | `phases[].wait_for[].timeout` | string | No | Per-object timeout override (e.g. `60s`, `2m`, `1m30s`) | | `phases[].seed_sets[].name` | string | Yes | Unique name for the seed set (used in tracking) | | `phases[].seed_sets[].order` | integer | No | Execution order (lower values first, default: 0) | +| `phases[].seed_sets[].mode` | string | No | Seed mode: `once` (default) or `reconcile` | | `phases[].seed_sets[].tables[].table` | string | Yes | Target database table name | | `phases[].seed_sets[].tables[].order` | integer | No | Execution order within the seed set (default: 0) | | `phases[].seed_sets[].tables[].unique_key` | string[] | No | Columns for duplicate detection | @@ -213,6 +215,55 @@ rows: password_hash: "{{ env.ADMIN_PASSWORD_HASH }}" ``` +### Reconcile Mode + +By default, seed sets are applied once and never modified (`mode: once`). Reconcile mode makes seeding declarative: the rendered spec becomes the source of truth, and initium reconciles the database to match it whenever the rendered spec changes. + +If the rendered spec has not changed since the last run (content hash match), initium treats the seed set as already reconciled and skips it. Out-of-band database changes are not corrected until a spec change triggers reconciliation again. + +Enable reconcile mode per seed set: + +```yaml +seed_sets: + - name: departments + mode: reconcile # "once" (default) or "reconcile" + tables: + - table: departments + unique_key: [name] # Required for reconcile mode + rows: + - name: Engineering + - name: Sales +``` + +Or override all seed sets for a single run: + +```bash +initium seed --spec /seeds/seed.yaml --reconcile-all +``` + +**How it works:** + +1. On each run, initium computes a content hash of the rendered seed set (after template/env expansion). +2. If the hash matches the stored hash, the seed set is skipped (no-op). +3. If the hash differs, initium reconciles row by row: + - **New rows** (in spec but not in DB) are inserted. + - **Changed rows** (different values for same unique key) are updated. + - **Removed rows** (in DB but not in spec) are deleted. + +**Requirements:** +- Every table in a reconciled seed set must have a `unique_key`. Without it, there is no way to identify which rows correspond to which spec entries. +- Environment variable changes trigger reconciliation (resolved values are compared, not raw templates). + +**Row tracking:** Initium creates a companion table (`{tracking_table}_rows`, e.g., `initium_seed_rows`) that stores the resolved values of each seeded row. This enables change detection and orphan deletion. + +**Dry-run mode:** Preview what reconciliation would do without modifying the database: + +```bash +initium seed --spec /seeds/seed.yaml --dry-run +``` + +This logs insert/update/delete counts per table without executing any changes. + ### Reset Mode Use `--reset` to delete all data from seeded tables and remove tracking entries before re-applying. Tables are deleted in reverse order to respect foreign key constraints: @@ -276,11 +327,13 @@ spec: ## CLI Reference -| Flag | Default | Description | -| --------- | ---------- | --------------------------------------- | -| `--spec` | (required) | Path to seed spec file (YAML or JSON) | -| `--reset` | `false` | Delete existing data and re-apply seeds | -| `--json` | `false` | Enable JSON log output | +| Flag | Default | Description | +| ------------------ | ---------- | --------------------------------------------------------- | +| `--spec` | (required) | Path to seed spec file (YAML or JSON) | +| `--reset` | `false` | Delete existing data and re-apply seeds | +| `--dry-run` | `false` | Preview changes without modifying the database | +| `--reconcile-all` | `false` | Override all seed sets to reconcile mode for this run | +| `--json` | `false` | Enable JSON log output | ## Failure Modes diff --git a/src/main.rs b/src/main.rs index f1872a0..5f8c782 100644 --- a/src/main.rs +++ b/src/main.rs @@ -145,6 +145,18 @@ enum Commands { help = "Reset mode: delete existing data before re-seeding" )] reset: bool, + #[arg( + long, + env = "INITIUM_DRY_RUN", + help = "Dry-run: show what would change without modifying the database" + )] + dry_run: bool, + #[arg( + long, + env = "INITIUM_RECONCILE_ALL", + help = "Override all seed sets to reconcile mode for this run" + )] + reconcile_all: bool, }, /// Render templates into config files @@ -313,7 +325,12 @@ fn main() { lock_file, args, } => cmd::migrate::run(&log, &args, &workdir, &lock_file), - Commands::Seed { spec, reset } => seed::run(&log, &spec, reset), + Commands::Seed { + spec, + reset, + dry_run, + reconcile_all, + } => seed::run(&log, &spec, reset, dry_run, reconcile_all), Commands::Render { template, output, diff --git a/src/seed/db.rs b/src/seed/db.rs index ff5e219..9059c7d 100644 --- a/src/seed/db.rs +++ b/src/seed/db.rs @@ -24,6 +24,87 @@ pub trait Database: Send { fn create_schema(&mut self, name: &str) -> Result<(), String>; fn object_exists(&mut self, obj_type: &str, name: &str) -> Result; fn driver_name(&self) -> &str; + + // --- Reconciliation support --- + + /// Add content_hash column to existing tracking table if missing. + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String>; + + /// Create the per-row tracking table ({tracking_table}_rows). + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String>; + + /// Get the stored content hash for a seed set. + fn get_seed_hash(&mut self, table_name: &str, seed_set: &str) + -> Result, String>; + + /// Update the tracking entry with a new hash (upsert). + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String>; + + /// Store or update a tracked row in the row tracking table. + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String>; + + /// Get all tracked rows for a seed set + table. + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String>; + + /// Delete a specific tracked row. + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String>; + + /// Delete all tracked rows for a seed set. + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String>; + + /// Update specific columns of a row identified by key columns. + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result; + + /// Fetch specific column values from a row identified by key columns. + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String>; + + /// Delete a single row identified by key columns. + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result; } #[cfg(feature = "sqlite")] @@ -236,6 +317,274 @@ impl Database for SqliteDb { fn driver_name(&self) -> &str { "sqlite" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // Check if content_hash column exists + let sql = format!("PRAGMA table_info(\"{}\")", safe); + let has_hash = self + .conn + .prepare(&sql) + .map_err(|e| format!("checking tracking table schema: {}", e))? + .query_map([], |row| { + let name: String = row.get(1)?; + Ok(name) + }) + .map_err(|e| format!("reading tracking table schema: {}", e))? + .any(|r| r.map(|n| n == "content_hash").unwrap_or(false)); + + if !has_hash { + let alter = format!("ALTER TABLE \"{}\" ADD COLUMN content_hash TEXT", safe); + self.conn + .execute(&alter, []) + .map_err(|e| format!("migrating tracking table: {}", e))?; + } + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS \"{}_rows\" ( + seed_set TEXT NOT NULL, + table_name TEXT NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (seed_set, table_name, row_key) + )", + safe + ); + self.conn + .execute(&sql, []) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM \"{}\" WHERE seed_set = ?1", + sanitize_identifier(table_name) + ); + match self + .conn + .query_row(&sql, [seed_set], |row| row.get::<_, Option>(0)) + { + Ok(hash) => Ok(hash), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(format!("getting seed hash: {}", e)), + } + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // Upsert: update hash if exists, insert if not + let sql = format!( + "INSERT INTO \"{}\" (seed_set, content_hash) VALUES (?1, ?2) \ + ON CONFLICT(seed_set) DO UPDATE SET content_hash = ?2, applied_at = datetime('now')", + safe + ); + self.conn + .execute(&sql, [seed_set, hash]) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO \"{}_rows\" (seed_set, table_name, row_key, row_values) VALUES (?1, ?2, ?3, ?4) \ + ON CONFLICT(seed_set, table_name, row_key) DO UPDATE SET row_values = ?4, applied_at = datetime('now')", + safe + ); + self.conn + .execute(&sql, [seed_set, table_name, row_key, row_values]) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM \"{}_rows\" WHERE seed_set = ?1 AND table_name = ?2", + safe + ); + let mut stmt = self + .conn + .prepare(&sql) + .map_err(|e| format!("preparing tracked rows query: {}", e))?; + let rows = stmt + .query_map([seed_set, table_name], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + }) + .map_err(|e| format!("querying tracked rows: {}", e))? + .collect::, _>>() + .map_err(|e| format!("reading tracked rows: {}", e))?; + Ok(rows) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM \"{}_rows\" WHERE seed_set = ?1 AND table_name = ?2 AND row_key = ?3", + safe + ); + self.conn + .execute(&sql, [seed_set, table_name, row_key]) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM \"{}_rows\" WHERE seed_set = ?1", safe); + self.conn + .execute(&sql, [seed_set]) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let where_clause: Vec = where_columns + .iter() + .enumerate() + .map(|(i, c)| { + format!( + "\"{}\" = ?{}", + sanitize_identifier(c), + set_values.len() + i + 1 + ) + }) + .collect(); + let sql = format!( + "UPDATE \"{}\" SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + let mut all_values: Vec<&dyn rusqlite::types::ToSql> = Vec::new(); + for v in set_values.iter().chain(where_values.iter()) { + all_values.push(v as &dyn rusqlite::types::ToSql); + } + let count = self + .conn + .execute(&sql, all_values.as_slice()) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + Ok(count as u64) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(\"{}\" AS TEXT)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let sql = format!( + "SELECT {} FROM \"{}\" WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let params: Vec<&dyn rusqlite::types::ToSql> = key_values + .iter() + .map(|v| v as &dyn rusqlite::types::ToSql) + .collect(); + match self.conn.query_row(&sql, params.as_slice(), |row| { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = row.get(i)?; + vals.push(v.unwrap_or_default()); + } + Ok(vals) + }) { + Ok(vals) => Ok(Some(vals)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(format!("getting row from '{}': {}", table, e)), + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let sql = format!( + "DELETE FROM \"{}\" WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let params: Vec<&dyn rusqlite::types::ToSql> = key_values + .iter() + .map(|v| v as &dyn rusqlite::types::ToSql) + .collect(); + let count = self + .conn + .execute(&sql, params.as_slice()) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + Ok(count as u64) + } } #[cfg(feature = "postgres")] @@ -477,6 +826,247 @@ impl Database for PostgresDb { fn driver_name(&self) -> &str { "postgres" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "DO $$ BEGIN \ + IF NOT EXISTS (SELECT 1 FROM information_schema.columns \ + WHERE table_name='{}' AND column_name='content_hash') THEN \ + ALTER TABLE \"{}\" ADD COLUMN content_hash TEXT; \ + END IF; \ + END $$", + safe, safe + ); + self.client + .execute(&sql, &[]) + .map_err(|e| format!("migrating tracking table: {}", e))?; + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS \"{}_rows\" ( + seed_set TEXT NOT NULL, + table_name TEXT NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (seed_set, table_name, row_key) + )", + safe + ); + self.client + .execute(&sql, &[]) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM \"{}\" WHERE seed_set = $1", + sanitize_identifier(table_name) + ); + let rows = self + .client + .query(&sql, &[&seed_set]) + .map_err(|e| format!("getting seed hash: {}", e))?; + if rows.is_empty() { + Ok(None) + } else { + Ok(rows[0].get(0)) + } + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "INSERT INTO \"{}\" (seed_set, content_hash) VALUES ($1, $2) \ + ON CONFLICT(seed_set) DO UPDATE SET content_hash = $2, applied_at = NOW()", + safe + ); + self.client + .execute(&sql, &[&seed_set, &hash]) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO \"{}_rows\" (seed_set, table_name, row_key, row_values) VALUES ($1, $2, $3, $4) \ + ON CONFLICT(seed_set, table_name, row_key) DO UPDATE SET row_values = $4, applied_at = NOW()", + safe + ); + self.client + .execute(&sql, &[&seed_set, &table_name, &row_key, &row_values]) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM \"{}_rows\" WHERE seed_set = $1 AND table_name = $2", + safe + ); + let rows = self + .client + .query(&sql, &[&seed_set, &table_name]) + .map_err(|e| format!("querying tracked rows: {}", e))?; + Ok(rows + .iter() + .map(|r| (r.get::<_, String>(0), r.get::<_, String>(1))) + .collect()) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM \"{}_rows\" WHERE seed_set = $1 AND table_name = $2 AND row_key = $3", + safe + ); + self.client + .execute(&sql, &[&seed_set, &table_name, &row_key]) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM \"{}_rows\" WHERE seed_set = $1", safe); + self.client + .execute(&sql, &[&seed_set]) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .zip(set_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let where_clause: Vec = where_columns + .iter() + .zip(where_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "UPDATE \"{}\" SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + let count = self + .client + .execute(&sql, &[]) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + Ok(count) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(\"{}\" AS TEXT)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .zip(key_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "SELECT {} FROM \"{}\" WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let rows = self + .client + .query(&sql, &[]) + .map_err(|e| format!("getting row from '{}': {}", table, e))?; + if rows.is_empty() { + Ok(None) + } else { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = rows[0].get(i); + vals.push(v.unwrap_or_default()); + } + Ok(Some(vals)) + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .zip(key_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "DELETE FROM \"{}\" WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let count = self + .client + .execute(&sql, &[]) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + Ok(count) + } } #[cfg(feature = "mysql")] @@ -695,6 +1285,273 @@ impl Database for MysqlDb { fn driver_name(&self) -> &str { "mysql" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // MySQL: ALTER TABLE ADD COLUMN IF NOT EXISTS is not supported in older versions. + // Check information_schema first. + use mysql::prelude::Queryable; + let check_sql = format!( + "SELECT COUNT(*) FROM information_schema.columns \ + WHERE table_schema = DATABASE() AND table_name = '{}' AND column_name = 'content_hash'", + safe + ); + let count: Option = self + .conn + .exec_first(&check_sql, ()) + .map_err(|e| format!("checking tracking table schema: {}", e))?; + if count.unwrap_or(0) == 0 { + let alter = format!("ALTER TABLE `{}` ADD COLUMN content_hash TEXT", safe); + self.conn + .query_drop(&alter) + .map_err(|e| format!("migrating tracking table: {}", e))?; + } + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS `{}_rows` ( + seed_set VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + row_key_hash BINARY(32) GENERATED ALWAYS AS (UNHEX(SHA2(row_key, 256))) STORED, + PRIMARY KEY (seed_set, table_name, row_key_hash) + )", + safe + ); + use mysql::prelude::Queryable; + self.conn + .query_drop(&sql) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM `{}` WHERE seed_set = ?", + sanitize_identifier(table_name) + ); + use mysql::prelude::Queryable; + let result: Option> = self + .conn + .exec_first(&sql, (seed_set,)) + .map_err(|e| format!("getting seed hash: {}", e))?; + Ok(result.flatten()) + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "INSERT INTO `{}` (seed_set, content_hash) VALUES (?, ?) \ + ON DUPLICATE KEY UPDATE content_hash = VALUES(content_hash), applied_at = CURRENT_TIMESTAMP", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, hash)) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO `{}_rows` (seed_set, table_name, row_key, row_values) VALUES (?, ?, ?, ?) \ + ON DUPLICATE KEY UPDATE row_values = VALUES(row_values), applied_at = CURRENT_TIMESTAMP", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, table_name, row_key, row_values)) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM `{}_rows` WHERE seed_set = ? AND table_name = ?", + safe + ); + use mysql::prelude::Queryable; + let rows: Vec<(String, String)> = self + .conn + .exec(&sql, (seed_set, table_name)) + .map_err(|e| format!("querying tracked rows: {}", e))?; + Ok(rows) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM `{}_rows` WHERE seed_set = ? AND table_name = ? AND row_key = ?", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, table_name, row_key)) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM `{}_rows` WHERE seed_set = ?", safe); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set,)) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = where_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "UPDATE `{}` SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = set_values + .iter() + .chain(where_values.iter()) + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + self.conn + .exec_drop(&sql, ¶ms) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + let affected: Option = self + .conn + .exec_first("SELECT ROW_COUNT()", ()) + .map_err(|e| format!("getting affected rows: {}", e))?; + Ok(affected.unwrap_or(0)) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(`{}` AS CHAR)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "SELECT {} FROM `{}` WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = key_values + .iter() + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + let row: Option = self + .conn + .exec_first(&sql, ¶ms) + .map_err(|e| format!("getting row from '{}': {}", table, e))?; + match row { + Some(r) => { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = r.get(i); + vals.push(v.unwrap_or_default()); + } + Ok(Some(vals)) + } + None => Ok(None), + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "DELETE FROM `{}` WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = key_values + .iter() + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + self.conn + .exec_drop(&sql, ¶ms) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + let affected: Option = self + .conn + .exec_first("SELECT ROW_COUNT()", ()) + .map_err(|e| format!("getting affected rows: {}", e))?; + Ok(affected.unwrap_or(0)) + } } pub fn connect(driver: &str, url: &str) -> Result, String> { diff --git a/src/seed/executor.rs b/src/seed/executor.rs index 2594e9b..3481039 100644 --- a/src/seed/executor.rs +++ b/src/seed/executor.rs @@ -1,8 +1,9 @@ use crate::duration::{format_duration, parse_duration}; use crate::logging::Logger; use crate::seed::db::Database; +use crate::seed::hash::compute_seed_set_hash; use crate::seed::schema::{SeedPhase, SeedPlan, SeedSet, TableSeed, WaitForObject}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::time::{Duration, Instant}; pub struct SeedExecutor<'a> { @@ -10,6 +11,8 @@ pub struct SeedExecutor<'a> { db: Box, tracking_table: String, reset: bool, + dry_run: bool, + reconcile_all: bool, refs: HashMap>, } @@ -25,13 +28,27 @@ impl<'a> SeedExecutor<'a> { db, tracking_table, reset, + dry_run: false, + reconcile_all: false, refs: HashMap::new(), } } + pub fn with_dry_run(mut self, dry_run: bool) -> Self { + self.dry_run = dry_run; + self + } + + pub fn with_reconcile_all(mut self, reconcile_all: bool) -> Self { + self.reconcile_all = reconcile_all; + self + } + pub fn execute(&mut self, plan: &SeedPlan) -> Result<(), String> { self.log.info("starting seed execution", &[]); self.db.ensure_tracking_table(&self.tracking_table)?; + self.db.migrate_tracking_table(&self.tracking_table)?; + self.db.ensure_row_tracking_table(&self.tracking_table)?; self.execute_phases(plan)?; @@ -149,6 +166,7 @@ impl<'a> SeedExecutor<'a> { fn reset_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { let name = &ss.name; + let tt = self.tracking_table.clone(); self.log .info("reset mode: clearing seed set data", &[("seed_set", name)]); let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); @@ -160,13 +178,36 @@ impl<'a> SeedExecutor<'a> { &[("table", &ts.table), ("count", &count.to_string())], ); } + self.db.delete_all_tracked_rows(&tt, name)?; self.db.remove_seed_mark(&self.tracking_table, name)?; Ok(()) } fn execute_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { let name = &ss.name; - self.log.info("processing seed set", &[("seed_set", name)]); + let is_reconcile = ss.is_reconcile() || self.reconcile_all; + self.log.info( + "processing seed set", + &[ + ("seed_set", name), + ("mode", if is_reconcile { "reconcile" } else { "once" }), + ], + ); + + if is_reconcile { + // Guard: reconcile requires unique_key on every table. + // Schema validation catches this for mode: reconcile, but --reconcile-all + // can force reconcile on mode: once seed sets that lack unique_key. + for ts in &ss.tables { + if ts.unique_key.is_empty() { + return Err(format!( + "cannot reconcile seed set '{}': table '{}' has no unique_key (required for reconcile mode)", + name, ts.table + )); + } + } + return self.reconcile_seed_set(ss); + } if self.db.is_seed_applied(&self.tracking_table, name)? { self.log @@ -174,6 +215,14 @@ impl<'a> SeedExecutor<'a> { return Ok(()); } + if self.dry_run { + self.log.info( + "dry-run: seed set would be applied (new)", + &[("seed_set", name)], + ); + return Ok(()); + } + self.db.begin_transaction()?; let result = self.apply_seed_set_tables(ss); match result { @@ -314,6 +363,390 @@ impl<'a> SeedExecutor<'a> { .cloned() .ok_or_else(|| format!("column '{}' not found in reference '{}'", column, ref_name)) } + + // --- Reconciliation --- + + fn reconcile_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { + let name = &ss.name; + + // Compute hash of current spec (resolve env vars, keep @ref: as literals) + let current_hash = compute_seed_set_hash(ss, &|val| self.resolve_value(val))?; + + // Check stored hash for quick skip. + // Only skip if the seed set has no @ref: expressions, because + // compute_seed_set_hash treats @ref: values as literals. Resolved + // reference targets can change without affecting the hash (e.g., + // upstream auto_id row deleted/reinserted), and skipping could leave + // stale foreign keys. + let stored_hash = self.db.get_seed_hash(&self.tracking_table, name)?; + let has_refs = ss.tables.iter().any(|ts| { + ts.rows.iter().any(|row| { + row.values() + .any(|v| v.as_str().map(|s| s.starts_with("@ref:")).unwrap_or(false)) + }) + }); + if !has_refs && stored_hash.as_deref() == Some(current_hash.as_str()) { + self.log.info( + "seed set unchanged (hash match), skipping", + &[("seed_set", name)], + ); + // Still need to populate refs for downstream seed sets + self.populate_refs_from_db(ss)?; + return Ok(()); + } + + if self.dry_run { + self.log.info( + "dry-run: seed set has changes, would reconcile", + &[("seed_set", name)], + ); + self.dry_run_reconcile_tables(ss)?; + return Ok(()); + } + + self.log.info("reconciling seed set", &[("seed_set", name)]); + + self.db.begin_transaction()?; + let result = self.reconcile_tables(ss, ¤t_hash); + match result { + Ok(()) => { + let tt = self.tracking_table.clone(); + self.db.update_seed_entry(&tt, name, ¤t_hash)?; + self.db.commit_transaction()?; + self.log + .info("seed set reconciled successfully", &[("seed_set", name)]); + Ok(()) + } + Err(e) => { + self.db.rollback_transaction()?; + Err(format!("reconciling seed set '{}' failed: {}", name, e)) + } + } + } + + fn reconcile_tables(&mut self, ss: &SeedSet, _hash: &str) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + + for ts in &tables { + self.reconcile_table(ss, ts)?; + } + Ok(()) + } + + fn reconcile_table(&mut self, ss: &SeedSet, ts: &TableSeed) -> Result<(), String> { + let table = &ts.table; + let tt = self.tracking_table.clone(); + let ss_name = ss.name.clone(); + + self.log.info( + "reconciling table", + &[ + ("table", table.as_str()), + ("rows", &ts.rows.len().to_string()), + ], + ); + + // Get currently tracked rows for this seed_set + table + let tracked = self.db.get_tracked_rows(&tt, &ss_name, table)?; + let tracked_keys: HashSet = tracked.iter().map(|(k, _)| k.clone()).collect(); + let tracked_values: HashMap = tracked.into_iter().collect(); + + let mut seen_keys = HashSet::new(); + + for (idx, row) in ts.rows.iter().enumerate() { + let ref_name = row + .get("_ref") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let mut columns = Vec::new(); + let mut values = Vec::new(); + let mut unique_columns = Vec::new(); + let mut unique_values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value(val)?; + columns.push(key.clone()); + values.push(resolved.clone()); + + if ts.unique_key.contains(key) { + unique_columns.push(key.clone()); + unique_values.push(resolved); + } + } + + // Build canonical row_key JSON (sorted by unique key column name) + let row_key = build_row_key(&ts.unique_key, &unique_columns, &unique_values); + // Build row_values JSON (all columns, sorted) + let row_values_json = build_row_values(&columns, &values); + + seen_keys.insert(row_key.clone()); + + let existing_values = tracked_values.get(&row_key); + + if let Some(stored_vals) = existing_values { + // Row exists in tracking — check if values changed + if stored_vals == &row_values_json { + // No change — populate refs if needed + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + self.log.info( + "row unchanged, skipping", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + continue; + } + + // Values differ — UPDATE + let non_key_columns: Vec = columns + .iter() + .filter(|c| !ts.unique_key.contains(c)) + .cloned() + .collect(); + let non_key_values: Vec = columns + .iter() + .zip(values.iter()) + .filter(|(c, _)| !ts.unique_key.contains(c)) + .map(|(_, v)| v.clone()) + .collect(); + + if !non_key_columns.is_empty() { + self.db.update_row( + table, + &non_key_columns, + &non_key_values, + &unique_columns, + &unique_values, + )?; + } + + self.db + .store_tracked_row(&tt, &ss_name, table, &row_key, &row_values_json)?; + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + self.log.info( + "updated row", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + } else { + // New row — INSERT + let auto_id_col = ts.auto_id.as_ref().map(|a| a.column.as_str()); + let generated_id = self.db.insert_row(table, &columns, &values, auto_id_col)?; + + if let Some(ref_key) = &ref_name { + let mut ref_map = HashMap::new(); + for (i, col) in columns.iter().enumerate() { + ref_map.insert(col.clone(), values[i].clone()); + } + if let (Some(ref auto_id), Some(id)) = (&ts.auto_id, generated_id) { + ref_map.insert(auto_id.column.clone(), id.to_string()); + } + self.refs.insert(ref_key.clone(), ref_map); + } + + self.db + .store_tracked_row(&tt, &ss_name, table, &row_key, &row_values_json)?; + self.log.info( + "inserted row", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + } + } + + // Delete orphaned rows (in tracking but not in current spec) + let orphaned_keys: Vec = tracked_keys.difference(&seen_keys).cloned().collect(); + + for orphan_key in &orphaned_keys { + // Parse the row_key JSON to get column names + values + let key_map: BTreeMap = serde_json::from_str(orphan_key) + .map_err(|e| format!("parsing orphan row key: {}", e))?; + let key_cols: Vec = key_map.keys().cloned().collect(); + let key_vals: Vec = key_map.values().cloned().collect(); + + self.db.delete_row_by_key(table, &key_cols, &key_vals)?; + self.db + .delete_tracked_row(&tt, &ss_name, table, orphan_key)?; + self.log.info( + "deleted orphaned row", + &[("table", table.as_str()), ("row_key", orphan_key)], + ); + } + + Ok(()) + } + + /// Populate refs from an existing (unchanged) row, fetching auto_id from DB if needed. + fn populate_row_refs( + &mut self, + ts: &TableSeed, + ref_name: &Option, + columns: &[String], + values: &[String], + ) -> Result<(), String> { + if let Some(ref_key) = ref_name { + let mut ref_map = HashMap::new(); + for (i, col) in columns.iter().enumerate() { + ref_map.insert(col.clone(), values[i].clone()); + } + + // If there's an auto_id, fetch the actual ID from the DB + if let Some(ref auto_id) = ts.auto_id { + let unique_cols: Vec = ts.unique_key.clone(); + let unique_vals: Vec = ts + .unique_key + .iter() + .filter_map(|uk| { + columns + .iter() + .zip(values.iter()) + .find(|(c, _)| *c == uk) + .map(|(_, v)| v.clone()) + }) + .collect(); + if let Some(row_vals) = self.db.get_row_columns( + &ts.table, + &unique_cols, + &unique_vals, + std::slice::from_ref(&auto_id.column), + )? { + if let Some(id_val) = row_vals.first() { + ref_map.insert(auto_id.column.clone(), id_val.clone()); + } + } + } + self.refs.insert(ref_key.clone(), ref_map); + } + Ok(()) + } + + /// Populate refs for a skipped (hash-matched) seed set by reading from DB. + fn populate_refs_from_db(&mut self, ss: &SeedSet) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + + for ts in &tables { + for row in &ts.rows { + let ref_name = row + .get("_ref") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + if ref_name.is_none() { + continue; + } + + let mut columns = Vec::new(); + let mut values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value(val)?; + columns.push(key.clone()); + values.push(resolved); + } + + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + } + } + Ok(()) + } + + /// Resolve a value for dry-run: treats `@ref:` as literals to avoid failures + /// when refs haven't been populated (common with auto_id + refs in same seed set). + fn resolve_value_dry_run(&self, val: &serde_yaml::Value) -> Result { + match val { + serde_yaml::Value::String(s) if s.starts_with("@ref:") => Ok(s.clone()), + _ => self.resolve_value(val), + } + } + + /// Dry-run: compute what reconciliation would do without modifying the DB. + fn dry_run_reconcile_tables(&mut self, ss: &SeedSet) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + let tt = self.tracking_table.clone(); + let ss_name = ss.name.clone(); + + for ts in &tables { + let tracked = self.db.get_tracked_rows(&tt, &ss_name, &ts.table)?; + let tracked_keys: HashSet = tracked.iter().map(|(k, _)| k.clone()).collect(); + let tracked_values: HashMap = tracked.into_iter().collect(); + + let mut seen_keys = HashSet::new(); + let mut inserts = 0u64; + let mut updates = 0u64; + + for row in &ts.rows { + let mut unique_columns = Vec::new(); + let mut unique_values = Vec::new(); + let mut columns = Vec::new(); + let mut values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value_dry_run(val)?; + columns.push(key.clone()); + values.push(resolved.clone()); + if ts.unique_key.contains(key) { + unique_columns.push(key.clone()); + unique_values.push(resolved); + } + } + + let row_key = build_row_key(&ts.unique_key, &unique_columns, &unique_values); + let row_values_json = build_row_values(&columns, &values); + seen_keys.insert(row_key.clone()); + + match tracked_values.get(&row_key) { + Some(stored) if stored == &row_values_json => {} + Some(_) => updates += 1, + None => inserts += 1, + } + } + + let deletes = tracked_keys.difference(&seen_keys).count() as u64; + + self.log.info( + "dry-run: table reconciliation summary", + &[ + ("table", ts.table.as_str()), + ("inserts", &inserts.to_string()), + ("updates", &updates.to_string()), + ("deletes", &deletes.to_string()), + ], + ); + } + + // Populate refs from DB for downstream dry-run accuracy + self.populate_refs_from_db(ss)?; + Ok(()) + } +} + +/// Build a canonical JSON key from unique key columns (sorted by column name). +fn build_row_key(unique_key_spec: &[String], columns: &[String], values: &[String]) -> String { + let mut map = BTreeMap::new(); + for uk in unique_key_spec { + if let Some(idx) = columns.iter().position(|c| c == uk) { + map.insert(uk.clone(), values[idx].clone()); + } + } + serde_json::to_string(&map).unwrap_or_default() +} + +/// Build a canonical JSON representation of all row values (sorted by column name). +fn build_row_values(columns: &[String], values: &[String]) -> String { + let mut map = BTreeMap::new(); + for (i, col) in columns.iter().enumerate() { + map.insert(col.clone(), values[i].clone()); + } + serde_json::to_string(&map).unwrap_or_default() } #[cfg(test)] @@ -1278,4 +1711,718 @@ phases: err ); } + + // --- Reconciliation tests --- + + #[test] + fn test_reconcile_initial_apply() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: reconcile_test + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + let mut executor = SeedExecutor::new(&log, Box::new(sqlite), "initium_seed".into(), false); + executor.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + } + + #[test] + fn test_reconcile_skip_unchanged() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: reconcile_idem + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First run + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + // Second run — should skip (hash match) + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_reconcile_update_changed_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch("CREATE TABLE config (key TEXT PRIMARY KEY, value TEXT);") + .unwrap(); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: config + mode: reconcile + tables: + - table: config + unique_key: [key] + rows: + - key: app_name + value: OldName +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + // Verify initial value + let db_check = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db_check + .conn + .query_row("SELECT value FROM config WHERE key = 'app_name'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "OldName"); + + // Run with changed value + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: config + mode: reconcile + tables: + - table: config + unique_key: [key] + rows: + - key: app_name + value: NewName +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db_final + .conn + .query_row("SELECT value FROM config WHERE key = 'app_name'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "NewName"); + } + + #[test] + fn test_reconcile_add_new_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + // Add a row + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + } + + #[test] + fn test_reconcile_delete_removed_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + let db_check = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_check + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + + // Remove Sales + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + + let name: String = db_final + .conn + .query_row("SELECT name FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(name, "Engineering"); + } + + #[test] + fn test_reconcile_with_auto_id_and_refs() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: org + mode: reconcile + tables: + - table: departments + order: 1 + unique_key: [name] + auto_id: + column: id + rows: + - _ref: dept_eng + name: Engineering + - table: employees + order: 2 + unique_key: [email] + rows: + - name: Alice + email: alice@example.com + department_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First apply + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + // Verify + let db = SqliteDb::connect(db_path_str).unwrap(); + let dept_id: i64 = db + .conn + .query_row( + "SELECT id FROM departments WHERE name = 'Engineering'", + [], + |r| r.get(0), + ) + .unwrap(); + let emp_dept_id: i64 = db + .conn + .query_row( + "SELECT department_id FROM employees WHERE email = 'alice@example.com'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(dept_id, emp_dept_id); + + // Run again — should be a no-op (hash match) + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_reconcile_mode_requires_unique_key() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: bad + mode: reconcile + tables: + - table: departments + rows: + - name: Engineering +"#; + let result = SeedPlan::from_yaml(yaml); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("unique_key"), + "error should mention unique_key: {}", + err + ); + } + + #[test] + fn test_reconcile_all_flag_overrides_mode() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch("CREATE TABLE config (key TEXT PRIMARY KEY, value TEXT);") + .unwrap(); + + // mode: once, but we use reconcile_all + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: cfg + tables: + - table: config + unique_key: [key] + rows: + - key: app + value: v1 +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false) + .with_reconcile_all(true); + exec1.execute(&plan1).unwrap(); + + // Change value and run again with reconcile_all + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: cfg + tables: + - table: config + unique_key: [key] + rows: + - key: app + value: v2 +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false) + .with_reconcile_all(true); + exec2.execute(&plan2).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db + .conn + .query_row("SELECT value FROM config WHERE key = 'app'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "v2"); + } + + #[test] + fn test_dry_run_no_changes() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: dry + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec = + SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false).with_dry_run(true); + exec.execute(&plan).unwrap(); + + // Should not have inserted anything + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 0, "dry-run should not modify the database"); + } + + #[test] + fn test_reconcile_cross_seed_set_refs() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + order: 1 + seed_sets: + - name: depts + mode: reconcile + order: 1 + tables: + - table: departments + unique_key: [name] + auto_id: + column: id + rows: + - _ref: dept_eng + name: Engineering + - name: phase2 + order: 2 + seed_sets: + - name: emps + mode: reconcile + order: 1 + tables: + - table: employees + unique_key: [email] + rows: + - name: Alice + email: alice@example.com + department_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First run + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let dept_id: i64 = db + .conn + .query_row("SELECT id FROM departments", [], |r| r.get(0)) + .unwrap(); + let emp_dept_id: i64 = db + .conn + .query_row("SELECT department_id FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!( + dept_id, emp_dept_id, + "cross-phase reconcile refs should work" + ); + + // Second run — both should skip (hash match), refs should still resolve + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let emp_count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!(emp_count, 1, "second run should not duplicate employees"); + } + + #[test] + fn test_reconcile_tracking_table_migration() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + // Create old-style tracking table (no content_hash column) + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch( + "CREATE TABLE initium_seed ( + seed_set TEXT PRIMARY KEY, + applied_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + CREATE TABLE departments (id INTEGER PRIMARY KEY, name TEXT UNIQUE);", + ) + .unwrap(); + + // Insert a legacy tracking entry + sqlite + .conn + .execute( + "INSERT INTO initium_seed (seed_set) VALUES ('legacy_set')", + [], + ) + .unwrap(); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: new_set + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec.execute(&plan).unwrap(); + + // Verify migration worked: content_hash column exists + let db = SqliteDb::connect(db_path_str).unwrap(); + let has_hash: bool = db + .conn + .prepare("PRAGMA table_info(initium_seed)") + .unwrap() + .query_map([], |row| row.get::<_, String>(1)) + .unwrap() + .any(|r| r.map(|n| n == "content_hash").unwrap_or(false)); + assert!( + has_hash, + "tracking table should have content_hash column after migration" + ); + + // Legacy entry should still be there + let legacy: i64 = db + .conn + .query_row( + "SELECT COUNT(*) FROM initium_seed WHERE seed_set = 'legacy_set'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(legacy, 1, "legacy entry should be preserved"); + } + + #[test] + fn test_invalid_seed_mode() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: bad_mode + mode: invalid + tables: + - table: t + rows: + - a: b +"#; + let result = SeedPlan::from_yaml(yaml); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("invalid mode")); + } + + #[test] + fn test_reconcile_all_rejects_missing_unique_key() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + // mode: once with no unique_key + reconcile_all should error + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: no_uk + tables: + - table: departments + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + let mut exec = SeedExecutor::new(&log, Box::new(sqlite), "initium_seed".into(), false) + .with_reconcile_all(true); + let result = exec.execute(&plan); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("no unique_key")); + } } diff --git a/src/seed/hash.rs b/src/seed/hash.rs new file mode 100644 index 0000000..eedd7d9 --- /dev/null +++ b/src/seed/hash.rs @@ -0,0 +1,255 @@ +use crate::seed::schema::SeedSet; +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; + +/// Compute a deterministic SHA-256 hash of a seed set's content. +/// +/// Values are resolved using the provided resolver function (expanding env vars +/// and templates), except `@ref:` expressions which are kept as literals to +/// avoid cascading false positives when auto-generated IDs shift. +/// +/// The `_ref` key is excluded from the hash (it is a structural label, not data). +pub fn compute_seed_set_hash( + ss: &SeedSet, + resolver: &dyn Fn(&serde_yaml::Value) -> Result, +) -> Result { + let mut hasher = Sha256::new(); + + let mut tables: Vec<_> = ss.tables.iter().collect(); + tables.sort_by(|a, b| a.order.cmp(&b.order).then_with(|| a.table.cmp(&b.table))); + + for ts in &tables { + hasher.update(ts.table.as_bytes()); + hasher.update(b"\n"); + + // Include unique_key in hash so changing it triggers reconciliation + let uk_json = serde_json::to_string(&ts.unique_key) + .map_err(|e| format!("serializing unique_key: {}", e))?; + hasher.update(uk_json.as_bytes()); + hasher.update(b"\n"); + + // Include auto_id config + let auto_id_str = match &ts.auto_id { + Some(a) => format!("{}:{}", a.column, a.id_type), + None => String::new(), + }; + hasher.update(auto_id_str.as_bytes()); + hasher.update(b"\n"); + + for row in &ts.rows { + // Sort keys for determinism (HashMap iteration order is random) + let sorted: BTreeMap<_, _> = row.iter().collect(); + for (key, val) in &sorted { + if key.as_str() == "_ref" { + continue; + } + hasher.update(key.as_bytes()); + hasher.update(b"="); + + // Keep @ref: literals as-is; resolve everything else + let val_str = match val.as_str() { + Some(s) if s.starts_with("@ref:") => s.to_string(), + _ => resolver(val)?, + }; + hasher.update(val_str.as_bytes()); + hasher.update(b"\x00"); + } + hasher.update(b"\n"); + } + } + + let hash = hasher.finalize(); + Ok(hex_encode(&hash)) +} + +fn hex_encode(bytes: &[u8]) -> String { + use std::fmt::Write; + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + let _ = write!(s, "{:02x}", b); + } + s +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::seed::schema::SeedPlan; + + fn identity_resolver(val: &serde_yaml::Value) -> Result { + match val { + serde_yaml::Value::String(s) => Ok(s.clone()), + serde_yaml::Value::Number(n) => Ok(n.to_string()), + serde_yaml::Value::Bool(b) => Ok(b.to_string()), + serde_yaml::Value::Null => Ok(String::new()), + _ => Ok(format!("{:?}", val)), + } + } + + #[test] + fn test_deterministic_hash() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: test + mode: reconcile + tables: + - table: users + unique_key: [email] + rows: + - email: alice@example.com + name: Alice +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + assert_eq!(h1, h2); + assert_eq!(h1.len(), 64); // SHA-256 hex + } + + #[test] + fn test_hash_changes_on_value_change() { + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: "1" +"#; + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: "2" +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let h1 = compute_seed_set_hash(&plan1.phases[0].seed_sets[0], &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(&plan2.phases[0].seed_sets[0], &identity_resolver).unwrap(); + assert_ne!(h1, h2); + } + + #[test] + fn test_hash_stable_with_ref_expressions() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [name] + rows: + - name: Alice + dept_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + assert_eq!(h1, h2); + } + + #[test] + fn test_hash_changes_on_env_resolution() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: some_value +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + + // Simulate different env resolution + let different_resolver = |val: &serde_yaml::Value| -> Result { + match val { + serde_yaml::Value::String(s) if s == "some_value" => Ok("different_value".into()), + _ => identity_resolver(val), + } + }; + + let h2 = compute_seed_set_hash(ss, &different_resolver).unwrap(); + assert_ne!(h1, h2); + } + + #[test] + fn test_hash_changes_on_row_added() { + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a +"#; + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + - k: b +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let h1 = compute_seed_set_hash(&plan1.phases[0].seed_sets[0], &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(&plan2.phases[0].seed_sets[0], &identity_resolver).unwrap(); + assert_ne!(h1, h2); + } +} diff --git a/src/seed/mod.rs b/src/seed/mod.rs index 6618aaf..b8643d4 100644 --- a/src/seed/mod.rs +++ b/src/seed/mod.rs @@ -1,5 +1,6 @@ pub mod db; pub mod executor; +pub mod hash; pub mod schema; use crate::logging::Logger; @@ -19,7 +20,13 @@ fn render_template(content: &str) -> Result { .map_err(|e| format!("rendering seed template: {}", e)) } -pub fn run(log: &Logger, spec_file: &str, reset: bool) -> Result<(), String> { +pub fn run( + log: &Logger, + spec_file: &str, + reset: bool, + dry_run: bool, + reconcile_all: bool, +) -> Result<(), String> { let content = std::fs::read_to_string(spec_file) .map_err(|e| format!("reading seed spec '{}': {}", spec_file, e))?; @@ -38,7 +45,9 @@ pub fn run(log: &Logger, spec_file: &str, reset: bool) -> Result<(), String> { log.info("connecting to database", &[("driver", driver.as_str())]); let db = db::connect(&driver, &db_url)?; - let mut exec = executor::SeedExecutor::new(log, db, tracking_table, reset); + let mut exec = executor::SeedExecutor::new(log, db, tracking_table, reset) + .with_dry_run(dry_run) + .with_reconcile_all(reconcile_all); exec.execute(&plan) } diff --git a/src/seed/schema.rs b/src/seed/schema.rs index 0991c18..3b64f0a 100644 --- a/src/seed/schema.rs +++ b/src/seed/schema.rs @@ -98,9 +98,21 @@ pub struct SeedSet { pub name: String, #[serde(default)] pub order: i32, + #[serde(default = "default_seed_mode")] + pub mode: String, pub tables: Vec, } +fn default_seed_mode() -> String { + "once".into() +} + +impl SeedSet { + pub fn is_reconcile(&self) -> bool { + self.mode == "reconcile" + } +} + #[derive(Debug, Deserialize, Clone)] pub struct TableSeed { pub table: String, @@ -198,6 +210,15 @@ impl SeedPlan { if ss.name.is_empty() { return Err("seed_set name must not be empty".into()); } + let valid_modes = ["once", "reconcile"]; + if !valid_modes.contains(&ss.mode.as_str()) { + return Err(format!( + "seed_set '{}' has invalid mode '{}' (supported: {})", + ss.name, + ss.mode, + valid_modes.join(", ") + )); + } if ss.tables.is_empty() { return Err(format!( "seed_set '{}' must contain at least one table", @@ -211,6 +232,41 @@ impl SeedPlan { ss.name )); } + if ss.is_reconcile() && ts.unique_key.is_empty() { + return Err(format!( + "table '{}' in seed_set '{}' must have unique_key when mode is 'reconcile'", + ts.table, ss.name + )); + } + if ss.is_reconcile() { + if ts.unique_key.iter().any(|k| k.trim().is_empty()) { + return Err(format!( + "table '{}' in seed_set '{}' has empty or whitespace-only entries in unique_key when mode is 'reconcile'", + ts.table, ss.name + )); + } + let reserved_keys = ["_ref"]; + if let Some(reserved) = ts + .unique_key + .iter() + .find(|k| reserved_keys.contains(&k.as_str())) + { + return Err(format!( + "table '{}' in seed_set '{}' uses reserved column '{}' in unique_key when mode is 'reconcile'", + ts.table, ss.name, reserved + )); + } + for (row_idx, row) in ts.rows.iter().enumerate() { + for uk in &ts.unique_key { + if !row.contains_key(uk) { + return Err(format!( + "table '{}' in seed_set '{}': row {} is missing unique_key column '{}'", + ts.table, ss.name, row_idx + 1, uk + )); + } + } + } + } } Ok(()) } @@ -619,4 +675,67 @@ phases: let plan = SeedPlan::from_yaml(yaml).unwrap(); assert!(plan.phases[0].seed_sets.is_empty()); } + + #[test] + fn test_reconcile_rejects_empty_unique_key_entry() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: ["", "k"] + rows: + - k: a +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("empty or whitespace-only")); + } + + #[test] + fn test_reconcile_rejects_reserved_unique_key() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [_ref] + rows: + - _ref: r1 +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("reserved column '_ref'")); + } + + #[test] + fn test_reconcile_rejects_row_missing_unique_key_column() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [email] + rows: + - name: Alice +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("missing unique_key column 'email'")); + } }