From 24871641768e72db37922194d0f804cfa0c62f0d Mon Sep 17 00:00:00 2001 From: mikkeldamsgaard Date: Wed, 11 Mar 2026 23:48:09 +0100 Subject: [PATCH] feat: add reconcile mode for declarative database seeding Add reconcile mode (`mode: reconcile`) for seed sets, making seeding declarative: the rendered spec becomes the source of truth, and initium reconciles the database to match it whenever the spec changes. New rows are inserted, changed rows are updated, and removed rows are deleted. Key changes: - Per-row tracking table (`initium_seed_rows`) for change detection - Content hash on seed tracking table for fast skip optimization - `--reconcile-all` CLI flag to override all seed sets to reconcile mode - `--dry-run` CLI flag to preview changes without modifying the database - Schema validation: reconcile requires unique_key on every table, validates rows contain all key columns, rejects reserved keys - Runtime guard: --reconcile-all rejects tables missing unique_key - Hash-skip disabled for seed sets with @ref: expressions to prevent stale foreign keys when upstream auto-generated IDs shift - Dry-run treats @ref: as literals to avoid resolution failures - MySQL row tracking uses SHA-256 generated column for PK (no collisions) - CI summary job (`ci`) for branch ruleset status check - Backward compatible: existing seed sets default to mode: once Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 10 + CHANGELOG.md | 19 + docs/seeding.md | 63 ++- src/main.rs | 19 +- src/seed/db.rs | 857 ++++++++++++++++++++++++++++ src/seed/executor.rs | 1151 +++++++++++++++++++++++++++++++++++++- src/seed/hash.rs | 255 +++++++++ src/seed/mod.rs | 13 +- src/seed/schema.rs | 119 ++++ 9 files changed, 2496 insertions(+), 10 deletions(-) create mode 100644 src/seed/hash.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e3799c9..36f4a70 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,3 +45,13 @@ jobs: - run: helm lint charts/initium - run: helm template test-release charts/initium --set sampleDeployment.enabled=true --set 'initContainers[0].name=wait' --set 'initContainers[0].command[0]=wait-for' --set 'initContainers[0].args[0]=--target' --set 'initContainers[0].args[1]=tcp://localhost:5432' - run: helm unittest charts/initium + ci: + if: always() + needs: [lint, test, build, helm-lint] + runs-on: ubuntu-latest + steps: + - run: | + if [[ "${{ contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') }}" == "true" ]]; then + echo "One or more jobs failed or were cancelled" + exit 1 + fi diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ae737f..1a67764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Reconcile mode for seed sets (`mode: reconcile`): declarative seeding where the spec is the source of truth. Changed rows are updated, new rows are inserted, and removed rows are deleted automatically. +- `--reconcile-all` CLI flag to override all seed sets to reconcile mode for a single run. +- `--dry-run` CLI flag to preview what changes reconciliation would make without modifying the database. +- Per-row tracking table (`initium_seed_rows`) for change detection and orphan deletion in reconcile mode. +- Content hash (`content_hash` column) on the seed tracking table for fast "anything changed?" checks before row-by-row comparison. +- Automatic migration of existing tracking tables: the `content_hash` column is added transparently on first run. Existing seed sets remain in `once` mode with no behavior change. + +### Changed +- Reconcile hash-skip now only applies to seed sets without `@ref:` expressions. Seed sets containing `@ref:` references always run row-level reconciliation to prevent stale foreign keys when upstream auto-generated IDs shift. +- Hash computation sorts tables by `(order, table_name)` instead of just `order` for deterministic hashing when multiple tables share the same order value. +- Dry-run mode treats `@ref:` expressions as literals to avoid failures when references haven't been populated yet (e.g., auto_id + refs within the same seed set). + +### Fixed +- `--reconcile-all` now rejects seed sets where any table is missing `unique_key`, preventing reconciliation from generating identical row keys and updating/deleting wrong rows. +- Reconcile mode validation now rejects empty/whitespace-only `unique_key` entries and reserved column names like `_ref`. +- Reconcile mode validation now checks that every row contains all `unique_key` columns, preventing incomplete row keys during reconciliation. +- MySQL row tracking table now uses SHA-256 generated column (`row_key_hash`) for the primary key instead of `row_key(255)` prefix, preventing key collisions for JSON keys exceeding 255 bytes. + ## [1.1.0] - 2026-02-26 ### Added diff --git a/docs/seeding.md b/docs/seeding.md index ed8f703..d8f0e1f 100644 --- a/docs/seeding.md +++ b/docs/seeding.md @@ -50,6 +50,7 @@ phases: seed_sets: # Optional. Seed sets to apply in this phase. - name: initial_data order: 1 # Optional. Controls execution order across seed sets. + mode: once # Optional. "once" (default) or "reconcile". tables: - table: config order: 1 # Optional. Controls execution order within a seed set. @@ -82,6 +83,7 @@ phases: | `phases[].wait_for[].timeout` | string | No | Per-object timeout override (e.g. `60s`, `2m`, `1m30s`) | | `phases[].seed_sets[].name` | string | Yes | Unique name for the seed set (used in tracking) | | `phases[].seed_sets[].order` | integer | No | Execution order (lower values first, default: 0) | +| `phases[].seed_sets[].mode` | string | No | Seed mode: `once` (default) or `reconcile` | | `phases[].seed_sets[].tables[].table` | string | Yes | Target database table name | | `phases[].seed_sets[].tables[].order` | integer | No | Execution order within the seed set (default: 0) | | `phases[].seed_sets[].tables[].unique_key` | string[] | No | Columns for duplicate detection | @@ -213,6 +215,55 @@ rows: password_hash: "{{ env.ADMIN_PASSWORD_HASH }}" ``` +### Reconcile Mode + +By default, seed sets are applied once and never modified (`mode: once`). Reconcile mode makes seeding declarative: the rendered spec becomes the source of truth, and initium reconciles the database to match it whenever the rendered spec changes. + +If the rendered spec has not changed since the last run (content hash match), initium treats the seed set as already reconciled and skips it. Out-of-band database changes are not corrected until a spec change triggers reconciliation again. + +Enable reconcile mode per seed set: + +```yaml +seed_sets: + - name: departments + mode: reconcile # "once" (default) or "reconcile" + tables: + - table: departments + unique_key: [name] # Required for reconcile mode + rows: + - name: Engineering + - name: Sales +``` + +Or override all seed sets for a single run: + +```bash +initium seed --spec /seeds/seed.yaml --reconcile-all +``` + +**How it works:** + +1. On each run, initium computes a content hash of the rendered seed set (after template/env expansion). +2. If the hash matches the stored hash, the seed set is skipped (no-op). +3. If the hash differs, initium reconciles row by row: + - **New rows** (in spec but not in DB) are inserted. + - **Changed rows** (different values for same unique key) are updated. + - **Removed rows** (in DB but not in spec) are deleted. + +**Requirements:** +- Every table in a reconciled seed set must have a `unique_key`. Without it, there is no way to identify which rows correspond to which spec entries. +- Environment variable changes trigger reconciliation (resolved values are compared, not raw templates). + +**Row tracking:** Initium creates a companion table (`{tracking_table}_rows`, e.g., `initium_seed_rows`) that stores the resolved values of each seeded row. This enables change detection and orphan deletion. + +**Dry-run mode:** Preview what reconciliation would do without modifying the database: + +```bash +initium seed --spec /seeds/seed.yaml --dry-run +``` + +This logs insert/update/delete counts per table without executing any changes. + ### Reset Mode Use `--reset` to delete all data from seeded tables and remove tracking entries before re-applying. Tables are deleted in reverse order to respect foreign key constraints: @@ -276,11 +327,13 @@ spec: ## CLI Reference -| Flag | Default | Description | -| --------- | ---------- | --------------------------------------- | -| `--spec` | (required) | Path to seed spec file (YAML or JSON) | -| `--reset` | `false` | Delete existing data and re-apply seeds | -| `--json` | `false` | Enable JSON log output | +| Flag | Default | Description | +| ------------------ | ---------- | --------------------------------------------------------- | +| `--spec` | (required) | Path to seed spec file (YAML or JSON) | +| `--reset` | `false` | Delete existing data and re-apply seeds | +| `--dry-run` | `false` | Preview changes without modifying the database | +| `--reconcile-all` | `false` | Override all seed sets to reconcile mode for this run | +| `--json` | `false` | Enable JSON log output | ## Failure Modes diff --git a/src/main.rs b/src/main.rs index f1872a0..5f8c782 100644 --- a/src/main.rs +++ b/src/main.rs @@ -145,6 +145,18 @@ enum Commands { help = "Reset mode: delete existing data before re-seeding" )] reset: bool, + #[arg( + long, + env = "INITIUM_DRY_RUN", + help = "Dry-run: show what would change without modifying the database" + )] + dry_run: bool, + #[arg( + long, + env = "INITIUM_RECONCILE_ALL", + help = "Override all seed sets to reconcile mode for this run" + )] + reconcile_all: bool, }, /// Render templates into config files @@ -313,7 +325,12 @@ fn main() { lock_file, args, } => cmd::migrate::run(&log, &args, &workdir, &lock_file), - Commands::Seed { spec, reset } => seed::run(&log, &spec, reset), + Commands::Seed { + spec, + reset, + dry_run, + reconcile_all, + } => seed::run(&log, &spec, reset, dry_run, reconcile_all), Commands::Render { template, output, diff --git a/src/seed/db.rs b/src/seed/db.rs index ff5e219..9059c7d 100644 --- a/src/seed/db.rs +++ b/src/seed/db.rs @@ -24,6 +24,87 @@ pub trait Database: Send { fn create_schema(&mut self, name: &str) -> Result<(), String>; fn object_exists(&mut self, obj_type: &str, name: &str) -> Result; fn driver_name(&self) -> &str; + + // --- Reconciliation support --- + + /// Add content_hash column to existing tracking table if missing. + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String>; + + /// Create the per-row tracking table ({tracking_table}_rows). + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String>; + + /// Get the stored content hash for a seed set. + fn get_seed_hash(&mut self, table_name: &str, seed_set: &str) + -> Result, String>; + + /// Update the tracking entry with a new hash (upsert). + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String>; + + /// Store or update a tracked row in the row tracking table. + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String>; + + /// Get all tracked rows for a seed set + table. + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String>; + + /// Delete a specific tracked row. + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String>; + + /// Delete all tracked rows for a seed set. + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String>; + + /// Update specific columns of a row identified by key columns. + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result; + + /// Fetch specific column values from a row identified by key columns. + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String>; + + /// Delete a single row identified by key columns. + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result; } #[cfg(feature = "sqlite")] @@ -236,6 +317,274 @@ impl Database for SqliteDb { fn driver_name(&self) -> &str { "sqlite" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // Check if content_hash column exists + let sql = format!("PRAGMA table_info(\"{}\")", safe); + let has_hash = self + .conn + .prepare(&sql) + .map_err(|e| format!("checking tracking table schema: {}", e))? + .query_map([], |row| { + let name: String = row.get(1)?; + Ok(name) + }) + .map_err(|e| format!("reading tracking table schema: {}", e))? + .any(|r| r.map(|n| n == "content_hash").unwrap_or(false)); + + if !has_hash { + let alter = format!("ALTER TABLE \"{}\" ADD COLUMN content_hash TEXT", safe); + self.conn + .execute(&alter, []) + .map_err(|e| format!("migrating tracking table: {}", e))?; + } + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS \"{}_rows\" ( + seed_set TEXT NOT NULL, + table_name TEXT NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (seed_set, table_name, row_key) + )", + safe + ); + self.conn + .execute(&sql, []) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM \"{}\" WHERE seed_set = ?1", + sanitize_identifier(table_name) + ); + match self + .conn + .query_row(&sql, [seed_set], |row| row.get::<_, Option>(0)) + { + Ok(hash) => Ok(hash), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(format!("getting seed hash: {}", e)), + } + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // Upsert: update hash if exists, insert if not + let sql = format!( + "INSERT INTO \"{}\" (seed_set, content_hash) VALUES (?1, ?2) \ + ON CONFLICT(seed_set) DO UPDATE SET content_hash = ?2, applied_at = datetime('now')", + safe + ); + self.conn + .execute(&sql, [seed_set, hash]) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO \"{}_rows\" (seed_set, table_name, row_key, row_values) VALUES (?1, ?2, ?3, ?4) \ + ON CONFLICT(seed_set, table_name, row_key) DO UPDATE SET row_values = ?4, applied_at = datetime('now')", + safe + ); + self.conn + .execute(&sql, [seed_set, table_name, row_key, row_values]) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM \"{}_rows\" WHERE seed_set = ?1 AND table_name = ?2", + safe + ); + let mut stmt = self + .conn + .prepare(&sql) + .map_err(|e| format!("preparing tracked rows query: {}", e))?; + let rows = stmt + .query_map([seed_set, table_name], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + }) + .map_err(|e| format!("querying tracked rows: {}", e))? + .collect::, _>>() + .map_err(|e| format!("reading tracked rows: {}", e))?; + Ok(rows) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM \"{}_rows\" WHERE seed_set = ?1 AND table_name = ?2 AND row_key = ?3", + safe + ); + self.conn + .execute(&sql, [seed_set, table_name, row_key]) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM \"{}_rows\" WHERE seed_set = ?1", safe); + self.conn + .execute(&sql, [seed_set]) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let where_clause: Vec = where_columns + .iter() + .enumerate() + .map(|(i, c)| { + format!( + "\"{}\" = ?{}", + sanitize_identifier(c), + set_values.len() + i + 1 + ) + }) + .collect(); + let sql = format!( + "UPDATE \"{}\" SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + let mut all_values: Vec<&dyn rusqlite::types::ToSql> = Vec::new(); + for v in set_values.iter().chain(where_values.iter()) { + all_values.push(v as &dyn rusqlite::types::ToSql); + } + let count = self + .conn + .execute(&sql, all_values.as_slice()) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + Ok(count as u64) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(\"{}\" AS TEXT)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let sql = format!( + "SELECT {} FROM \"{}\" WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let params: Vec<&dyn rusqlite::types::ToSql> = key_values + .iter() + .map(|v| v as &dyn rusqlite::types::ToSql) + .collect(); + match self.conn.query_row(&sql, params.as_slice(), |row| { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = row.get(i)?; + vals.push(v.unwrap_or_default()); + } + Ok(vals) + }) { + Ok(vals) => Ok(Some(vals)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(format!("getting row from '{}': {}", table, e)), + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .enumerate() + .map(|(i, c)| format!("\"{}\" = ?{}", sanitize_identifier(c), i + 1)) + .collect(); + let sql = format!( + "DELETE FROM \"{}\" WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let params: Vec<&dyn rusqlite::types::ToSql> = key_values + .iter() + .map(|v| v as &dyn rusqlite::types::ToSql) + .collect(); + let count = self + .conn + .execute(&sql, params.as_slice()) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + Ok(count as u64) + } } #[cfg(feature = "postgres")] @@ -477,6 +826,247 @@ impl Database for PostgresDb { fn driver_name(&self) -> &str { "postgres" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "DO $$ BEGIN \ + IF NOT EXISTS (SELECT 1 FROM information_schema.columns \ + WHERE table_name='{}' AND column_name='content_hash') THEN \ + ALTER TABLE \"{}\" ADD COLUMN content_hash TEXT; \ + END IF; \ + END $$", + safe, safe + ); + self.client + .execute(&sql, &[]) + .map_err(|e| format!("migrating tracking table: {}", e))?; + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS \"{}_rows\" ( + seed_set TEXT NOT NULL, + table_name TEXT NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (seed_set, table_name, row_key) + )", + safe + ); + self.client + .execute(&sql, &[]) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM \"{}\" WHERE seed_set = $1", + sanitize_identifier(table_name) + ); + let rows = self + .client + .query(&sql, &[&seed_set]) + .map_err(|e| format!("getting seed hash: {}", e))?; + if rows.is_empty() { + Ok(None) + } else { + Ok(rows[0].get(0)) + } + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "INSERT INTO \"{}\" (seed_set, content_hash) VALUES ($1, $2) \ + ON CONFLICT(seed_set) DO UPDATE SET content_hash = $2, applied_at = NOW()", + safe + ); + self.client + .execute(&sql, &[&seed_set, &hash]) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO \"{}_rows\" (seed_set, table_name, row_key, row_values) VALUES ($1, $2, $3, $4) \ + ON CONFLICT(seed_set, table_name, row_key) DO UPDATE SET row_values = $4, applied_at = NOW()", + safe + ); + self.client + .execute(&sql, &[&seed_set, &table_name, &row_key, &row_values]) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM \"{}_rows\" WHERE seed_set = $1 AND table_name = $2", + safe + ); + let rows = self + .client + .query(&sql, &[&seed_set, &table_name]) + .map_err(|e| format!("querying tracked rows: {}", e))?; + Ok(rows + .iter() + .map(|r| (r.get::<_, String>(0), r.get::<_, String>(1))) + .collect()) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM \"{}_rows\" WHERE seed_set = $1 AND table_name = $2 AND row_key = $3", + safe + ); + self.client + .execute(&sql, &[&seed_set, &table_name, &row_key]) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM \"{}_rows\" WHERE seed_set = $1", safe); + self.client + .execute(&sql, &[&seed_set]) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .zip(set_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let where_clause: Vec = where_columns + .iter() + .zip(where_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "UPDATE \"{}\" SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + let count = self + .client + .execute(&sql, &[]) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + Ok(count) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(\"{}\" AS TEXT)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .zip(key_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "SELECT {} FROM \"{}\" WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let rows = self + .client + .query(&sql, &[]) + .map_err(|e| format!("getting row from '{}': {}", table, e))?; + if rows.is_empty() { + Ok(None) + } else { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = rows[0].get(i); + vals.push(v.unwrap_or_default()); + } + Ok(Some(vals)) + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .zip(key_values.iter()) + .map(|(c, v)| format!("\"{}\" = {}", sanitize_identifier(c), escape_sql_value(v))) + .collect(); + let sql = format!( + "DELETE FROM \"{}\" WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + let count = self + .client + .execute(&sql, &[]) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + Ok(count) + } } #[cfg(feature = "mysql")] @@ -695,6 +1285,273 @@ impl Database for MysqlDb { fn driver_name(&self) -> &str { "mysql" } + + fn migrate_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + // MySQL: ALTER TABLE ADD COLUMN IF NOT EXISTS is not supported in older versions. + // Check information_schema first. + use mysql::prelude::Queryable; + let check_sql = format!( + "SELECT COUNT(*) FROM information_schema.columns \ + WHERE table_schema = DATABASE() AND table_name = '{}' AND column_name = 'content_hash'", + safe + ); + let count: Option = self + .conn + .exec_first(&check_sql, ()) + .map_err(|e| format!("checking tracking table schema: {}", e))?; + if count.unwrap_or(0) == 0 { + let alter = format!("ALTER TABLE `{}` ADD COLUMN content_hash TEXT", safe); + self.conn + .query_drop(&alter) + .map_err(|e| format!("migrating tracking table: {}", e))?; + } + Ok(()) + } + + fn ensure_row_tracking_table(&mut self, table_name: &str) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "CREATE TABLE IF NOT EXISTS `{}_rows` ( + seed_set VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + row_key TEXT NOT NULL, + row_values TEXT NOT NULL, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + row_key_hash BINARY(32) GENERATED ALWAYS AS (UNHEX(SHA2(row_key, 256))) STORED, + PRIMARY KEY (seed_set, table_name, row_key_hash) + )", + safe + ); + use mysql::prelude::Queryable; + self.conn + .query_drop(&sql) + .map_err(|e| format!("creating row tracking table: {}", e))?; + Ok(()) + } + + fn get_seed_hash( + &mut self, + table_name: &str, + seed_set: &str, + ) -> Result, String> { + let sql = format!( + "SELECT content_hash FROM `{}` WHERE seed_set = ?", + sanitize_identifier(table_name) + ); + use mysql::prelude::Queryable; + let result: Option> = self + .conn + .exec_first(&sql, (seed_set,)) + .map_err(|e| format!("getting seed hash: {}", e))?; + Ok(result.flatten()) + } + + fn update_seed_entry( + &mut self, + table_name: &str, + seed_set: &str, + hash: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(table_name); + let sql = format!( + "INSERT INTO `{}` (seed_set, content_hash) VALUES (?, ?) \ + ON DUPLICATE KEY UPDATE content_hash = VALUES(content_hash), applied_at = CURRENT_TIMESTAMP", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, hash)) + .map_err(|e| format!("updating seed entry: {}", e))?; + Ok(()) + } + + fn store_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + row_values: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "INSERT INTO `{}_rows` (seed_set, table_name, row_key, row_values) VALUES (?, ?, ?, ?) \ + ON DUPLICATE KEY UPDATE row_values = VALUES(row_values), applied_at = CURRENT_TIMESTAMP", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, table_name, row_key, row_values)) + .map_err(|e| format!("storing tracked row: {}", e))?; + Ok(()) + } + + fn get_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + ) -> Result, String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "SELECT row_key, row_values FROM `{}_rows` WHERE seed_set = ? AND table_name = ?", + safe + ); + use mysql::prelude::Queryable; + let rows: Vec<(String, String)> = self + .conn + .exec(&sql, (seed_set, table_name)) + .map_err(|e| format!("querying tracked rows: {}", e))?; + Ok(rows) + } + + fn delete_tracked_row( + &mut self, + tracking_table: &str, + seed_set: &str, + table_name: &str, + row_key: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!( + "DELETE FROM `{}_rows` WHERE seed_set = ? AND table_name = ? AND row_key = ?", + safe + ); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set, table_name, row_key)) + .map_err(|e| format!("deleting tracked row: {}", e))?; + Ok(()) + } + + fn delete_all_tracked_rows( + &mut self, + tracking_table: &str, + seed_set: &str, + ) -> Result<(), String> { + let safe = sanitize_identifier(tracking_table); + let sql = format!("DELETE FROM `{}_rows` WHERE seed_set = ?", safe); + use mysql::prelude::Queryable; + self.conn + .exec_drop(&sql, (seed_set,)) + .map_err(|e| format!("deleting all tracked rows: {}", e))?; + Ok(()) + } + + fn update_row( + &mut self, + table: &str, + set_columns: &[String], + set_values: &[String], + where_columns: &[String], + where_values: &[String], + ) -> Result { + let set_clause: Vec = set_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = where_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "UPDATE `{}` SET {} WHERE {}", + sanitize_identifier(table), + set_clause.join(", "), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = set_values + .iter() + .chain(where_values.iter()) + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + self.conn + .exec_drop(&sql, ¶ms) + .map_err(|e| format!("updating row in '{}': {}", table, e))?; + let affected: Option = self + .conn + .exec_first("SELECT ROW_COUNT()", ()) + .map_err(|e| format!("getting affected rows: {}", e))?; + Ok(affected.unwrap_or(0)) + } + + fn get_row_columns( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + fetch_columns: &[String], + ) -> Result>, String> { + if fetch_columns.is_empty() { + return Ok(None); + } + let select_cols: Vec = fetch_columns + .iter() + .map(|c| format!("CAST(`{}` AS CHAR)", sanitize_identifier(c))) + .collect(); + let where_clause: Vec = key_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "SELECT {} FROM `{}` WHERE {}", + select_cols.join(", "), + sanitize_identifier(table), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = key_values + .iter() + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + let row: Option = self + .conn + .exec_first(&sql, ¶ms) + .map_err(|e| format!("getting row from '{}': {}", table, e))?; + match row { + Some(r) => { + let mut vals = Vec::new(); + for i in 0..fetch_columns.len() { + let v: Option = r.get(i); + vals.push(v.unwrap_or_default()); + } + Ok(Some(vals)) + } + None => Ok(None), + } + } + + fn delete_row_by_key( + &mut self, + table: &str, + key_columns: &[String], + key_values: &[String], + ) -> Result { + let where_clause: Vec = key_columns + .iter() + .map(|c| format!("`{}` = ?", sanitize_identifier(c))) + .collect(); + let sql = format!( + "DELETE FROM `{}` WHERE {}", + sanitize_identifier(table), + where_clause.join(" AND ") + ); + use mysql::prelude::Queryable; + let params: Vec = key_values + .iter() + .map(|v| mysql::Value::from(v.as_str())) + .collect(); + self.conn + .exec_drop(&sql, ¶ms) + .map_err(|e| format!("deleting row from '{}': {}", table, e))?; + let affected: Option = self + .conn + .exec_first("SELECT ROW_COUNT()", ()) + .map_err(|e| format!("getting affected rows: {}", e))?; + Ok(affected.unwrap_or(0)) + } } pub fn connect(driver: &str, url: &str) -> Result, String> { diff --git a/src/seed/executor.rs b/src/seed/executor.rs index 2594e9b..3481039 100644 --- a/src/seed/executor.rs +++ b/src/seed/executor.rs @@ -1,8 +1,9 @@ use crate::duration::{format_duration, parse_duration}; use crate::logging::Logger; use crate::seed::db::Database; +use crate::seed::hash::compute_seed_set_hash; use crate::seed::schema::{SeedPhase, SeedPlan, SeedSet, TableSeed, WaitForObject}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::time::{Duration, Instant}; pub struct SeedExecutor<'a> { @@ -10,6 +11,8 @@ pub struct SeedExecutor<'a> { db: Box, tracking_table: String, reset: bool, + dry_run: bool, + reconcile_all: bool, refs: HashMap>, } @@ -25,13 +28,27 @@ impl<'a> SeedExecutor<'a> { db, tracking_table, reset, + dry_run: false, + reconcile_all: false, refs: HashMap::new(), } } + pub fn with_dry_run(mut self, dry_run: bool) -> Self { + self.dry_run = dry_run; + self + } + + pub fn with_reconcile_all(mut self, reconcile_all: bool) -> Self { + self.reconcile_all = reconcile_all; + self + } + pub fn execute(&mut self, plan: &SeedPlan) -> Result<(), String> { self.log.info("starting seed execution", &[]); self.db.ensure_tracking_table(&self.tracking_table)?; + self.db.migrate_tracking_table(&self.tracking_table)?; + self.db.ensure_row_tracking_table(&self.tracking_table)?; self.execute_phases(plan)?; @@ -149,6 +166,7 @@ impl<'a> SeedExecutor<'a> { fn reset_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { let name = &ss.name; + let tt = self.tracking_table.clone(); self.log .info("reset mode: clearing seed set data", &[("seed_set", name)]); let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); @@ -160,13 +178,36 @@ impl<'a> SeedExecutor<'a> { &[("table", &ts.table), ("count", &count.to_string())], ); } + self.db.delete_all_tracked_rows(&tt, name)?; self.db.remove_seed_mark(&self.tracking_table, name)?; Ok(()) } fn execute_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { let name = &ss.name; - self.log.info("processing seed set", &[("seed_set", name)]); + let is_reconcile = ss.is_reconcile() || self.reconcile_all; + self.log.info( + "processing seed set", + &[ + ("seed_set", name), + ("mode", if is_reconcile { "reconcile" } else { "once" }), + ], + ); + + if is_reconcile { + // Guard: reconcile requires unique_key on every table. + // Schema validation catches this for mode: reconcile, but --reconcile-all + // can force reconcile on mode: once seed sets that lack unique_key. + for ts in &ss.tables { + if ts.unique_key.is_empty() { + return Err(format!( + "cannot reconcile seed set '{}': table '{}' has no unique_key (required for reconcile mode)", + name, ts.table + )); + } + } + return self.reconcile_seed_set(ss); + } if self.db.is_seed_applied(&self.tracking_table, name)? { self.log @@ -174,6 +215,14 @@ impl<'a> SeedExecutor<'a> { return Ok(()); } + if self.dry_run { + self.log.info( + "dry-run: seed set would be applied (new)", + &[("seed_set", name)], + ); + return Ok(()); + } + self.db.begin_transaction()?; let result = self.apply_seed_set_tables(ss); match result { @@ -314,6 +363,390 @@ impl<'a> SeedExecutor<'a> { .cloned() .ok_or_else(|| format!("column '{}' not found in reference '{}'", column, ref_name)) } + + // --- Reconciliation --- + + fn reconcile_seed_set(&mut self, ss: &SeedSet) -> Result<(), String> { + let name = &ss.name; + + // Compute hash of current spec (resolve env vars, keep @ref: as literals) + let current_hash = compute_seed_set_hash(ss, &|val| self.resolve_value(val))?; + + // Check stored hash for quick skip. + // Only skip if the seed set has no @ref: expressions, because + // compute_seed_set_hash treats @ref: values as literals. Resolved + // reference targets can change without affecting the hash (e.g., + // upstream auto_id row deleted/reinserted), and skipping could leave + // stale foreign keys. + let stored_hash = self.db.get_seed_hash(&self.tracking_table, name)?; + let has_refs = ss.tables.iter().any(|ts| { + ts.rows.iter().any(|row| { + row.values() + .any(|v| v.as_str().map(|s| s.starts_with("@ref:")).unwrap_or(false)) + }) + }); + if !has_refs && stored_hash.as_deref() == Some(current_hash.as_str()) { + self.log.info( + "seed set unchanged (hash match), skipping", + &[("seed_set", name)], + ); + // Still need to populate refs for downstream seed sets + self.populate_refs_from_db(ss)?; + return Ok(()); + } + + if self.dry_run { + self.log.info( + "dry-run: seed set has changes, would reconcile", + &[("seed_set", name)], + ); + self.dry_run_reconcile_tables(ss)?; + return Ok(()); + } + + self.log.info("reconciling seed set", &[("seed_set", name)]); + + self.db.begin_transaction()?; + let result = self.reconcile_tables(ss, ¤t_hash); + match result { + Ok(()) => { + let tt = self.tracking_table.clone(); + self.db.update_seed_entry(&tt, name, ¤t_hash)?; + self.db.commit_transaction()?; + self.log + .info("seed set reconciled successfully", &[("seed_set", name)]); + Ok(()) + } + Err(e) => { + self.db.rollback_transaction()?; + Err(format!("reconciling seed set '{}' failed: {}", name, e)) + } + } + } + + fn reconcile_tables(&mut self, ss: &SeedSet, _hash: &str) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + + for ts in &tables { + self.reconcile_table(ss, ts)?; + } + Ok(()) + } + + fn reconcile_table(&mut self, ss: &SeedSet, ts: &TableSeed) -> Result<(), String> { + let table = &ts.table; + let tt = self.tracking_table.clone(); + let ss_name = ss.name.clone(); + + self.log.info( + "reconciling table", + &[ + ("table", table.as_str()), + ("rows", &ts.rows.len().to_string()), + ], + ); + + // Get currently tracked rows for this seed_set + table + let tracked = self.db.get_tracked_rows(&tt, &ss_name, table)?; + let tracked_keys: HashSet = tracked.iter().map(|(k, _)| k.clone()).collect(); + let tracked_values: HashMap = tracked.into_iter().collect(); + + let mut seen_keys = HashSet::new(); + + for (idx, row) in ts.rows.iter().enumerate() { + let ref_name = row + .get("_ref") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let mut columns = Vec::new(); + let mut values = Vec::new(); + let mut unique_columns = Vec::new(); + let mut unique_values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value(val)?; + columns.push(key.clone()); + values.push(resolved.clone()); + + if ts.unique_key.contains(key) { + unique_columns.push(key.clone()); + unique_values.push(resolved); + } + } + + // Build canonical row_key JSON (sorted by unique key column name) + let row_key = build_row_key(&ts.unique_key, &unique_columns, &unique_values); + // Build row_values JSON (all columns, sorted) + let row_values_json = build_row_values(&columns, &values); + + seen_keys.insert(row_key.clone()); + + let existing_values = tracked_values.get(&row_key); + + if let Some(stored_vals) = existing_values { + // Row exists in tracking — check if values changed + if stored_vals == &row_values_json { + // No change — populate refs if needed + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + self.log.info( + "row unchanged, skipping", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + continue; + } + + // Values differ — UPDATE + let non_key_columns: Vec = columns + .iter() + .filter(|c| !ts.unique_key.contains(c)) + .cloned() + .collect(); + let non_key_values: Vec = columns + .iter() + .zip(values.iter()) + .filter(|(c, _)| !ts.unique_key.contains(c)) + .map(|(_, v)| v.clone()) + .collect(); + + if !non_key_columns.is_empty() { + self.db.update_row( + table, + &non_key_columns, + &non_key_values, + &unique_columns, + &unique_values, + )?; + } + + self.db + .store_tracked_row(&tt, &ss_name, table, &row_key, &row_values_json)?; + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + self.log.info( + "updated row", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + } else { + // New row — INSERT + let auto_id_col = ts.auto_id.as_ref().map(|a| a.column.as_str()); + let generated_id = self.db.insert_row(table, &columns, &values, auto_id_col)?; + + if let Some(ref_key) = &ref_name { + let mut ref_map = HashMap::new(); + for (i, col) in columns.iter().enumerate() { + ref_map.insert(col.clone(), values[i].clone()); + } + if let (Some(ref auto_id), Some(id)) = (&ts.auto_id, generated_id) { + ref_map.insert(auto_id.column.clone(), id.to_string()); + } + self.refs.insert(ref_key.clone(), ref_map); + } + + self.db + .store_tracked_row(&tt, &ss_name, table, &row_key, &row_values_json)?; + self.log.info( + "inserted row", + &[("table", table.as_str()), ("row", &(idx + 1).to_string())], + ); + } + } + + // Delete orphaned rows (in tracking but not in current spec) + let orphaned_keys: Vec = tracked_keys.difference(&seen_keys).cloned().collect(); + + for orphan_key in &orphaned_keys { + // Parse the row_key JSON to get column names + values + let key_map: BTreeMap = serde_json::from_str(orphan_key) + .map_err(|e| format!("parsing orphan row key: {}", e))?; + let key_cols: Vec = key_map.keys().cloned().collect(); + let key_vals: Vec = key_map.values().cloned().collect(); + + self.db.delete_row_by_key(table, &key_cols, &key_vals)?; + self.db + .delete_tracked_row(&tt, &ss_name, table, orphan_key)?; + self.log.info( + "deleted orphaned row", + &[("table", table.as_str()), ("row_key", orphan_key)], + ); + } + + Ok(()) + } + + /// Populate refs from an existing (unchanged) row, fetching auto_id from DB if needed. + fn populate_row_refs( + &mut self, + ts: &TableSeed, + ref_name: &Option, + columns: &[String], + values: &[String], + ) -> Result<(), String> { + if let Some(ref_key) = ref_name { + let mut ref_map = HashMap::new(); + for (i, col) in columns.iter().enumerate() { + ref_map.insert(col.clone(), values[i].clone()); + } + + // If there's an auto_id, fetch the actual ID from the DB + if let Some(ref auto_id) = ts.auto_id { + let unique_cols: Vec = ts.unique_key.clone(); + let unique_vals: Vec = ts + .unique_key + .iter() + .filter_map(|uk| { + columns + .iter() + .zip(values.iter()) + .find(|(c, _)| *c == uk) + .map(|(_, v)| v.clone()) + }) + .collect(); + if let Some(row_vals) = self.db.get_row_columns( + &ts.table, + &unique_cols, + &unique_vals, + std::slice::from_ref(&auto_id.column), + )? { + if let Some(id_val) = row_vals.first() { + ref_map.insert(auto_id.column.clone(), id_val.clone()); + } + } + } + self.refs.insert(ref_key.clone(), ref_map); + } + Ok(()) + } + + /// Populate refs for a skipped (hash-matched) seed set by reading from DB. + fn populate_refs_from_db(&mut self, ss: &SeedSet) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + + for ts in &tables { + for row in &ts.rows { + let ref_name = row + .get("_ref") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + if ref_name.is_none() { + continue; + } + + let mut columns = Vec::new(); + let mut values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value(val)?; + columns.push(key.clone()); + values.push(resolved); + } + + self.populate_row_refs(ts, &ref_name, &columns, &values)?; + } + } + Ok(()) + } + + /// Resolve a value for dry-run: treats `@ref:` as literals to avoid failures + /// when refs haven't been populated (common with auto_id + refs in same seed set). + fn resolve_value_dry_run(&self, val: &serde_yaml::Value) -> Result { + match val { + serde_yaml::Value::String(s) if s.starts_with("@ref:") => Ok(s.clone()), + _ => self.resolve_value(val), + } + } + + /// Dry-run: compute what reconciliation would do without modifying the DB. + fn dry_run_reconcile_tables(&mut self, ss: &SeedSet) -> Result<(), String> { + let mut tables: Vec<&TableSeed> = ss.tables.iter().collect(); + tables.sort_by_key(|t| t.order); + let tt = self.tracking_table.clone(); + let ss_name = ss.name.clone(); + + for ts in &tables { + let tracked = self.db.get_tracked_rows(&tt, &ss_name, &ts.table)?; + let tracked_keys: HashSet = tracked.iter().map(|(k, _)| k.clone()).collect(); + let tracked_values: HashMap = tracked.into_iter().collect(); + + let mut seen_keys = HashSet::new(); + let mut inserts = 0u64; + let mut updates = 0u64; + + for row in &ts.rows { + let mut unique_columns = Vec::new(); + let mut unique_values = Vec::new(); + let mut columns = Vec::new(); + let mut values = Vec::new(); + + for (key, val) in row { + if key == "_ref" { + continue; + } + let resolved = self.resolve_value_dry_run(val)?; + columns.push(key.clone()); + values.push(resolved.clone()); + if ts.unique_key.contains(key) { + unique_columns.push(key.clone()); + unique_values.push(resolved); + } + } + + let row_key = build_row_key(&ts.unique_key, &unique_columns, &unique_values); + let row_values_json = build_row_values(&columns, &values); + seen_keys.insert(row_key.clone()); + + match tracked_values.get(&row_key) { + Some(stored) if stored == &row_values_json => {} + Some(_) => updates += 1, + None => inserts += 1, + } + } + + let deletes = tracked_keys.difference(&seen_keys).count() as u64; + + self.log.info( + "dry-run: table reconciliation summary", + &[ + ("table", ts.table.as_str()), + ("inserts", &inserts.to_string()), + ("updates", &updates.to_string()), + ("deletes", &deletes.to_string()), + ], + ); + } + + // Populate refs from DB for downstream dry-run accuracy + self.populate_refs_from_db(ss)?; + Ok(()) + } +} + +/// Build a canonical JSON key from unique key columns (sorted by column name). +fn build_row_key(unique_key_spec: &[String], columns: &[String], values: &[String]) -> String { + let mut map = BTreeMap::new(); + for uk in unique_key_spec { + if let Some(idx) = columns.iter().position(|c| c == uk) { + map.insert(uk.clone(), values[idx].clone()); + } + } + serde_json::to_string(&map).unwrap_or_default() +} + +/// Build a canonical JSON representation of all row values (sorted by column name). +fn build_row_values(columns: &[String], values: &[String]) -> String { + let mut map = BTreeMap::new(); + for (i, col) in columns.iter().enumerate() { + map.insert(col.clone(), values[i].clone()); + } + serde_json::to_string(&map).unwrap_or_default() } #[cfg(test)] @@ -1278,4 +1711,718 @@ phases: err ); } + + // --- Reconciliation tests --- + + #[test] + fn test_reconcile_initial_apply() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: reconcile_test + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + let mut executor = SeedExecutor::new(&log, Box::new(sqlite), "initium_seed".into(), false); + executor.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + } + + #[test] + fn test_reconcile_skip_unchanged() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: reconcile_idem + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First run + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + // Second run — should skip (hash match) + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_reconcile_update_changed_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch("CREATE TABLE config (key TEXT PRIMARY KEY, value TEXT);") + .unwrap(); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: config + mode: reconcile + tables: + - table: config + unique_key: [key] + rows: + - key: app_name + value: OldName +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + // Verify initial value + let db_check = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db_check + .conn + .query_row("SELECT value FROM config WHERE key = 'app_name'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "OldName"); + + // Run with changed value + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: config + mode: reconcile + tables: + - table: config + unique_key: [key] + rows: + - key: app_name + value: NewName +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db_final + .conn + .query_row("SELECT value FROM config WHERE key = 'app_name'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "NewName"); + } + + #[test] + fn test_reconcile_add_new_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + // Add a row + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + } + + #[test] + fn test_reconcile_delete_removed_row() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering + - name: Sales +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan1).unwrap(); + + let db_check = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_check + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 2); + + // Remove Sales + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: depts + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan2).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + + let name: String = db_final + .conn + .query_row("SELECT name FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(name, "Engineering"); + } + + #[test] + fn test_reconcile_with_auto_id_and_refs() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: org + mode: reconcile + tables: + - table: departments + order: 1 + unique_key: [name] + auto_id: + column: id + rows: + - _ref: dept_eng + name: Engineering + - table: employees + order: 2 + unique_key: [email] + rows: + - name: Alice + email: alice@example.com + department_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First apply + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + // Verify + let db = SqliteDb::connect(db_path_str).unwrap(); + let dept_id: i64 = db + .conn + .query_row( + "SELECT id FROM departments WHERE name = 'Engineering'", + [], + |r| r.get(0), + ) + .unwrap(); + let emp_dept_id: i64 = db + .conn + .query_row( + "SELECT department_id FROM employees WHERE email = 'alice@example.com'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(dept_id, emp_dept_id); + + // Run again — should be a no-op (hash match) + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_reconcile_mode_requires_unique_key() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: bad + mode: reconcile + tables: + - table: departments + rows: + - name: Engineering +"#; + let result = SeedPlan::from_yaml(yaml); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("unique_key"), + "error should mention unique_key: {}", + err + ); + } + + #[test] + fn test_reconcile_all_flag_overrides_mode() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch("CREATE TABLE config (key TEXT PRIMARY KEY, value TEXT);") + .unwrap(); + + // mode: once, but we use reconcile_all + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: cfg + tables: + - table: config + unique_key: [key] + rows: + - key: app + value: v1 +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false) + .with_reconcile_all(true); + exec1.execute(&plan1).unwrap(); + + // Change value and run again with reconcile_all + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: cfg + tables: + - table: config + unique_key: [key] + rows: + - key: app + value: v2 +"#; + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false) + .with_reconcile_all(true); + exec2.execute(&plan2).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let val: String = db + .conn + .query_row("SELECT value FROM config WHERE key = 'app'", [], |r| { + r.get(0) + }) + .unwrap(); + assert_eq!(val, "v2"); + } + + #[test] + fn test_dry_run_no_changes() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: dry + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec = + SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false).with_dry_run(true); + exec.execute(&plan).unwrap(); + + // Should not have inserted anything + let db = SqliteDb::connect(db_path_str).unwrap(); + let count: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM departments", [], |r| r.get(0)) + .unwrap(); + assert_eq!(count, 0, "dry-run should not modify the database"); + } + + #[test] + fn test_reconcile_cross_seed_set_refs() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + order: 1 + seed_sets: + - name: depts + mode: reconcile + order: 1 + tables: + - table: departments + unique_key: [name] + auto_id: + column: id + rows: + - _ref: dept_eng + name: Engineering + - name: phase2 + order: 2 + seed_sets: + - name: emps + mode: reconcile + order: 1 + tables: + - table: employees + unique_key: [email] + rows: + - name: Alice + email: alice@example.com + department_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + // First run + let db1 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec1 = SeedExecutor::new(&log, Box::new(db1), "initium_seed".into(), false); + exec1.execute(&plan).unwrap(); + + let db = SqliteDb::connect(db_path_str).unwrap(); + let dept_id: i64 = db + .conn + .query_row("SELECT id FROM departments", [], |r| r.get(0)) + .unwrap(); + let emp_dept_id: i64 = db + .conn + .query_row("SELECT department_id FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!( + dept_id, emp_dept_id, + "cross-phase reconcile refs should work" + ); + + // Second run — both should skip (hash match), refs should still resolve + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec2 = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec2.execute(&plan).unwrap(); + + let db_final = SqliteDb::connect(db_path_str).unwrap(); + let emp_count: i64 = db_final + .conn + .query_row("SELECT COUNT(*) FROM employees", [], |r| r.get(0)) + .unwrap(); + assert_eq!(emp_count, 1, "second run should not duplicate employees"); + } + + #[test] + fn test_reconcile_tracking_table_migration() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + // Create old-style tracking table (no content_hash column) + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + sqlite + .conn + .execute_batch( + "CREATE TABLE initium_seed ( + seed_set TEXT PRIMARY KEY, + applied_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + CREATE TABLE departments (id INTEGER PRIMARY KEY, name TEXT UNIQUE);", + ) + .unwrap(); + + // Insert a legacy tracking entry + sqlite + .conn + .execute( + "INSERT INTO initium_seed (seed_set) VALUES ('legacy_set')", + [], + ) + .unwrap(); + + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: new_set + mode: reconcile + tables: + - table: departments + unique_key: [name] + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + + let db2 = SqliteDb::connect(db_path_str).unwrap(); + let mut exec = SeedExecutor::new(&log, Box::new(db2), "initium_seed".into(), false); + exec.execute(&plan).unwrap(); + + // Verify migration worked: content_hash column exists + let db = SqliteDb::connect(db_path_str).unwrap(); + let has_hash: bool = db + .conn + .prepare("PRAGMA table_info(initium_seed)") + .unwrap() + .query_map([], |row| row.get::<_, String>(1)) + .unwrap() + .any(|r| r.map(|n| n == "content_hash").unwrap_or(false)); + assert!( + has_hash, + "tracking table should have content_hash column after migration" + ); + + // Legacy entry should still be there + let legacy: i64 = db + .conn + .query_row( + "SELECT COUNT(*) FROM initium_seed WHERE seed_set = 'legacy_set'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(legacy, 1, "legacy entry should be preserved"); + } + + #[test] + fn test_invalid_seed_mode() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: bad_mode + mode: invalid + tables: + - table: t + rows: + - a: b +"#; + let result = SeedPlan::from_yaml(yaml); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("invalid mode")); + } + + #[test] + fn test_reconcile_all_rejects_missing_unique_key() { + let dir = tempfile::TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let db_path_str = db_path.to_str().unwrap(); + + let sqlite = SqliteDb::connect(db_path_str).unwrap(); + setup_db_with_tables(&sqlite); + + // mode: once with no unique_key + reconcile_all should error + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: no_uk + tables: + - table: departments + rows: + - name: Engineering +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let log = test_logger(); + let mut exec = SeedExecutor::new(&log, Box::new(sqlite), "initium_seed".into(), false) + .with_reconcile_all(true); + let result = exec.execute(&plan); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("no unique_key")); + } } diff --git a/src/seed/hash.rs b/src/seed/hash.rs new file mode 100644 index 0000000..eedd7d9 --- /dev/null +++ b/src/seed/hash.rs @@ -0,0 +1,255 @@ +use crate::seed::schema::SeedSet; +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; + +/// Compute a deterministic SHA-256 hash of a seed set's content. +/// +/// Values are resolved using the provided resolver function (expanding env vars +/// and templates), except `@ref:` expressions which are kept as literals to +/// avoid cascading false positives when auto-generated IDs shift. +/// +/// The `_ref` key is excluded from the hash (it is a structural label, not data). +pub fn compute_seed_set_hash( + ss: &SeedSet, + resolver: &dyn Fn(&serde_yaml::Value) -> Result, +) -> Result { + let mut hasher = Sha256::new(); + + let mut tables: Vec<_> = ss.tables.iter().collect(); + tables.sort_by(|a, b| a.order.cmp(&b.order).then_with(|| a.table.cmp(&b.table))); + + for ts in &tables { + hasher.update(ts.table.as_bytes()); + hasher.update(b"\n"); + + // Include unique_key in hash so changing it triggers reconciliation + let uk_json = serde_json::to_string(&ts.unique_key) + .map_err(|e| format!("serializing unique_key: {}", e))?; + hasher.update(uk_json.as_bytes()); + hasher.update(b"\n"); + + // Include auto_id config + let auto_id_str = match &ts.auto_id { + Some(a) => format!("{}:{}", a.column, a.id_type), + None => String::new(), + }; + hasher.update(auto_id_str.as_bytes()); + hasher.update(b"\n"); + + for row in &ts.rows { + // Sort keys for determinism (HashMap iteration order is random) + let sorted: BTreeMap<_, _> = row.iter().collect(); + for (key, val) in &sorted { + if key.as_str() == "_ref" { + continue; + } + hasher.update(key.as_bytes()); + hasher.update(b"="); + + // Keep @ref: literals as-is; resolve everything else + let val_str = match val.as_str() { + Some(s) if s.starts_with("@ref:") => s.to_string(), + _ => resolver(val)?, + }; + hasher.update(val_str.as_bytes()); + hasher.update(b"\x00"); + } + hasher.update(b"\n"); + } + } + + let hash = hasher.finalize(); + Ok(hex_encode(&hash)) +} + +fn hex_encode(bytes: &[u8]) -> String { + use std::fmt::Write; + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + let _ = write!(s, "{:02x}", b); + } + s +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::seed::schema::SeedPlan; + + fn identity_resolver(val: &serde_yaml::Value) -> Result { + match val { + serde_yaml::Value::String(s) => Ok(s.clone()), + serde_yaml::Value::Number(n) => Ok(n.to_string()), + serde_yaml::Value::Bool(b) => Ok(b.to_string()), + serde_yaml::Value::Null => Ok(String::new()), + _ => Ok(format!("{:?}", val)), + } + } + + #[test] + fn test_deterministic_hash() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: phase1 + seed_sets: + - name: test + mode: reconcile + tables: + - table: users + unique_key: [email] + rows: + - email: alice@example.com + name: Alice +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + assert_eq!(h1, h2); + assert_eq!(h1.len(), 64); // SHA-256 hex + } + + #[test] + fn test_hash_changes_on_value_change() { + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: "1" +"#; + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: "2" +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let h1 = compute_seed_set_hash(&plan1.phases[0].seed_sets[0], &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(&plan2.phases[0].seed_sets[0], &identity_resolver).unwrap(); + assert_ne!(h1, h2); + } + + #[test] + fn test_hash_stable_with_ref_expressions() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [name] + rows: + - name: Alice + dept_id: "@ref:dept_eng.id" +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + assert_eq!(h1, h2); + } + + #[test] + fn test_hash_changes_on_env_resolution() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + v: some_value +"#; + let plan = SeedPlan::from_yaml(yaml).unwrap(); + let ss = &plan.phases[0].seed_sets[0]; + + let h1 = compute_seed_set_hash(ss, &identity_resolver).unwrap(); + + // Simulate different env resolution + let different_resolver = |val: &serde_yaml::Value| -> Result { + match val { + serde_yaml::Value::String(s) if s == "some_value" => Ok("different_value".into()), + _ => identity_resolver(val), + } + }; + + let h2 = compute_seed_set_hash(ss, &different_resolver).unwrap(); + assert_ne!(h1, h2); + } + + #[test] + fn test_hash_changes_on_row_added() { + let yaml1 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a +"#; + let yaml2 = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [k] + rows: + - k: a + - k: b +"#; + let plan1 = SeedPlan::from_yaml(yaml1).unwrap(); + let plan2 = SeedPlan::from_yaml(yaml2).unwrap(); + let h1 = compute_seed_set_hash(&plan1.phases[0].seed_sets[0], &identity_resolver).unwrap(); + let h2 = compute_seed_set_hash(&plan2.phases[0].seed_sets[0], &identity_resolver).unwrap(); + assert_ne!(h1, h2); + } +} diff --git a/src/seed/mod.rs b/src/seed/mod.rs index 6618aaf..b8643d4 100644 --- a/src/seed/mod.rs +++ b/src/seed/mod.rs @@ -1,5 +1,6 @@ pub mod db; pub mod executor; +pub mod hash; pub mod schema; use crate::logging::Logger; @@ -19,7 +20,13 @@ fn render_template(content: &str) -> Result { .map_err(|e| format!("rendering seed template: {}", e)) } -pub fn run(log: &Logger, spec_file: &str, reset: bool) -> Result<(), String> { +pub fn run( + log: &Logger, + spec_file: &str, + reset: bool, + dry_run: bool, + reconcile_all: bool, +) -> Result<(), String> { let content = std::fs::read_to_string(spec_file) .map_err(|e| format!("reading seed spec '{}': {}", spec_file, e))?; @@ -38,7 +45,9 @@ pub fn run(log: &Logger, spec_file: &str, reset: bool) -> Result<(), String> { log.info("connecting to database", &[("driver", driver.as_str())]); let db = db::connect(&driver, &db_url)?; - let mut exec = executor::SeedExecutor::new(log, db, tracking_table, reset); + let mut exec = executor::SeedExecutor::new(log, db, tracking_table, reset) + .with_dry_run(dry_run) + .with_reconcile_all(reconcile_all); exec.execute(&plan) } diff --git a/src/seed/schema.rs b/src/seed/schema.rs index 0991c18..3b64f0a 100644 --- a/src/seed/schema.rs +++ b/src/seed/schema.rs @@ -98,9 +98,21 @@ pub struct SeedSet { pub name: String, #[serde(default)] pub order: i32, + #[serde(default = "default_seed_mode")] + pub mode: String, pub tables: Vec, } +fn default_seed_mode() -> String { + "once".into() +} + +impl SeedSet { + pub fn is_reconcile(&self) -> bool { + self.mode == "reconcile" + } +} + #[derive(Debug, Deserialize, Clone)] pub struct TableSeed { pub table: String, @@ -198,6 +210,15 @@ impl SeedPlan { if ss.name.is_empty() { return Err("seed_set name must not be empty".into()); } + let valid_modes = ["once", "reconcile"]; + if !valid_modes.contains(&ss.mode.as_str()) { + return Err(format!( + "seed_set '{}' has invalid mode '{}' (supported: {})", + ss.name, + ss.mode, + valid_modes.join(", ") + )); + } if ss.tables.is_empty() { return Err(format!( "seed_set '{}' must contain at least one table", @@ -211,6 +232,41 @@ impl SeedPlan { ss.name )); } + if ss.is_reconcile() && ts.unique_key.is_empty() { + return Err(format!( + "table '{}' in seed_set '{}' must have unique_key when mode is 'reconcile'", + ts.table, ss.name + )); + } + if ss.is_reconcile() { + if ts.unique_key.iter().any(|k| k.trim().is_empty()) { + return Err(format!( + "table '{}' in seed_set '{}' has empty or whitespace-only entries in unique_key when mode is 'reconcile'", + ts.table, ss.name + )); + } + let reserved_keys = ["_ref"]; + if let Some(reserved) = ts + .unique_key + .iter() + .find(|k| reserved_keys.contains(&k.as_str())) + { + return Err(format!( + "table '{}' in seed_set '{}' uses reserved column '{}' in unique_key when mode is 'reconcile'", + ts.table, ss.name, reserved + )); + } + for (row_idx, row) in ts.rows.iter().enumerate() { + for uk in &ts.unique_key { + if !row.contains_key(uk) { + return Err(format!( + "table '{}' in seed_set '{}': row {} is missing unique_key column '{}'", + ts.table, ss.name, row_idx + 1, uk + )); + } + } + } + } } Ok(()) } @@ -619,4 +675,67 @@ phases: let plan = SeedPlan::from_yaml(yaml).unwrap(); assert!(plan.phases[0].seed_sets.is_empty()); } + + #[test] + fn test_reconcile_rejects_empty_unique_key_entry() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: ["", "k"] + rows: + - k: a +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("empty or whitespace-only")); + } + + #[test] + fn test_reconcile_rejects_reserved_unique_key() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [_ref] + rows: + - _ref: r1 +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("reserved column '_ref'")); + } + + #[test] + fn test_reconcile_rejects_row_missing_unique_key_column() { + let yaml = r#" +database: + driver: sqlite + url: ":memory:" +phases: + - name: p + seed_sets: + - name: s + mode: reconcile + tables: + - table: t + unique_key: [email] + rows: + - name: Alice +"#; + let err = SeedPlan::from_yaml(yaml).unwrap_err(); + assert!(err.contains("missing unique_key column 'email'")); + } }