diff --git a/integration_tests/tests/test_data_freshness_sla.py b/integration_tests/tests/test_data_freshness_sla.py new file mode 100644 index 000000000..4767d9b6f --- /dev/null +++ b/integration_tests/tests/test_data_freshness_sla.py @@ -0,0 +1,124 @@ +from datetime import datetime, timedelta + +from data_generator import DATE_FORMAT +from dbt_project import DbtProject + +TEST_NAME = "elementary.data_freshness_sla" +TIMESTAMP_COLUMN = "updated_at" + + +def test_fresh_data_passes(test_id: str, dbt_project: DbtProject): + """Data updated today should pass when the SLA deadline has already passed.""" + utc_now = datetime.utcnow() + data = [ + {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)}, + {TIMESTAMP_COLUMN: (utc_now - timedelta(hours=1)).strftime(DATE_FORMAT)}, + ] + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "11:59pm", + "timezone": "UTC", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" + + +def test_stale_data_fails(test_id: str, dbt_project: DbtProject): + """Data only from previous days should fail when today's SLA deadline has passed.""" + utc_now = datetime.utcnow() + yesterday = utc_now - timedelta(days=2) + data = [ + {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}, + {TIMESTAMP_COLUMN: (yesterday - timedelta(hours=5)).strftime(DATE_FORMAT)}, + ] + # Use 12:01am UTC (= 00:01 UTC) so the deadline is always in the past when + # CI runs (typically 07:00+ UTC). Etc/GMT-14 was ambiguous in some pytz + # versions and caused Vertica to return wrong results. + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "12:01am", + "timezone": "UTC", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "fail" + + +def test_no_data_fails(test_id: str, dbt_project: DbtProject): + """An empty table (after WHERE filter) should fail when deadline has passed.""" + utc_now = datetime.utcnow() + # Seed with data that will be excluded by the where_expression + data = [ + {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "excluded"}, + ] + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "12:01am", + "timezone": "UTC", + "where_expression": "category = 'included'", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "fail" + + +def test_deadline_not_passed_does_not_fail(test_id: str, dbt_project: DbtProject): + """Even if data is stale, the test should pass if the deadline hasn't passed yet.""" + utc_now = datetime.utcnow() + yesterday = utc_now - timedelta(days=2) + data = [ + {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}, + ] + # Set the deadline to 11:59pm UTC so it reliably hasn't passed yet. + # (Etc/GMT-14 = UTC+14 means 11:59pm there = 09:59 UTC — not reliably future) + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "11:59pm", + "timezone": "UTC", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" + + +def test_with_where_expression(test_id: str, dbt_project: DbtProject): + """The where_expression should filter which rows count toward freshness.""" + utc_now = datetime.utcnow() + yesterday = utc_now - timedelta(days=2) + data = [ + # Fresh data for category A + {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "a"}, + # Stale data for category B + {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"}, + ] + # Test with category A (fresh data) -> should pass + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "11:59pm", + "timezone": "UTC", + "where_expression": "category = 'a'", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" + + # Test with category B (stale data) and early deadline -> should fail + test_args_stale = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "12:01am", + "timezone": "UTC", + "where_expression": "category = 'b'", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args_stale) + assert test_result["status"] == "fail" + + +def test_with_timezone(test_id: str, dbt_project: DbtProject): + """Test that timezone conversion works correctly.""" + utc_now = datetime.utcnow() + data = [ + {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)}, + ] + test_args = { + "timestamp_column": TIMESTAMP_COLUMN, + "sla_time": "11:59pm", + "timezone": "America/New_York", + } + test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" diff --git a/integration_tests/tests/test_volume_threshold.py b/integration_tests/tests/test_volume_threshold.py new file mode 100644 index 000000000..05bb87ffe --- /dev/null +++ b/integration_tests/tests/test_volume_threshold.py @@ -0,0 +1,224 @@ +from datetime import datetime, timedelta + +from data_generator import DATE_FORMAT, generate_dates +from dbt_project import DbtProject + +TIMESTAMP_COLUMN = "updated_at" +DBT_TEST_NAME = "elementary.volume_threshold" +DBT_TEST_ARGS = { + "timestamp_column": TIMESTAMP_COLUMN, + "time_bucket": {"period": "day", "count": 1}, + "days_back": 14, + "backfill_days": 14, +} + + +def _generate_stable_data(rows_per_day=100, days_back=14): + """Generate data with a consistent number of rows per day bucket. + + Note: Elementary only processes *complete* buckets (the latest full bucket + before ``run_started_at``). With daily buckets that means "yesterday" is + the newest bucket the macro will ever look at. We therefore generate data + up to yesterday only so that all buckets are complete. + """ + yesterday = datetime.utcnow().date() - timedelta(days=1) + data = [] + for cur_date in generate_dates(base_date=yesterday, days_back=days_back): + for _ in range(rows_per_day): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + return data + + +def test_stable_volume_passes(test_id: str, dbt_project: DbtProject): + """Consistent row counts across buckets should pass.""" + data = _generate_stable_data(rows_per_day=100) + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "pass" + + +def test_large_spike_fails(test_id: str, dbt_project: DbtProject): + """A large spike in row count (>10% default error threshold) should fail.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago (previous bucket): 100 rows + for _ in range(100): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday (current bucket — latest complete bucket): 150 rows (50% spike) + for _ in range(150): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "fail" + + +def test_large_drop_fails(test_id: str, dbt_project: DbtProject): + """A large drop in row count (>10% default error threshold) should fail.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago (previous bucket): 100 rows + for _ in range(100): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday (current bucket — latest complete bucket): 50 rows (50% drop) + for _ in range(50): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "fail" + + +def test_direction_spike_ignores_drop(test_id: str, dbt_project: DbtProject): + """With direction=spike, a drop should not trigger a failure.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago: 100 rows + for _ in range(100): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday: 50 rows (50% drop) + for _ in range(50): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + test_args = {**DBT_TEST_ARGS, "direction": "spike"} + test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" + + +def test_direction_drop_ignores_spike(test_id: str, dbt_project: DbtProject): + """With direction=drop, a spike should not trigger a failure.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago: 100 rows + for _ in range(100): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday: 150 rows (50% spike) + for _ in range(150): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + test_args = {**DBT_TEST_ARGS, "direction": "drop"} + test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) + assert test_result["status"] == "pass" + + +def test_min_row_count_skips_small_baseline(test_id: str, dbt_project: DbtProject): + """When previous bucket has fewer rows than min_row_count, check is skipped (pass).""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: only 5 rows each (below default min_row_count=100) + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(5): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago: 5 rows + for _ in range(5): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday: 50 rows (huge spike but baseline is too small) + for _ in range(50): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "pass" + + +def test_custom_thresholds(test_id: str, dbt_project: DbtProject): + """Custom thresholds should control the sensitivity of the test.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) + # Two days ago: 100 rows + for _ in range(100): + data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)}) + # Yesterday: 108 rows (8% change) + for _ in range(108): + data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) + + # With default thresholds (warn=5, error=10), 8% should warn but not error + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "warn" + + # With high thresholds (warn=20, error=50), 8% should pass + test_args_high = { + **DBT_TEST_ARGS, + "warn_threshold_percent": 20, + "error_threshold_percent": 50, + } + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + test_args_high, + test_vars={"force_metrics_backfill": True}, + ) + assert test_result["status"] == "pass" + + +def test_where_expression(test_id: str, dbt_project: DbtProject): + """The where_expression should filter which rows are counted.""" + yesterday = datetime.utcnow().date() - timedelta(days=1) + two_days_ago = yesterday - timedelta(days=1) + data = [] + # Older days: 100 rows of category A each + for cur_date in generate_dates(base_date=yesterday, days_back=14): + if cur_date < two_days_ago: + for _ in range(100): + data.append( + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "category": "a"} + ) + # Two days ago: 100 rows of category A + for _ in range(100): + data.append( + {TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT), "category": "a"} + ) + # Yesterday: 100 rows of category A (stable) + 200 rows of category B (noise) + for _ in range(100): + data.append( + {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "a"} + ) + for _ in range(200): + data.append( + {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"} + ) + + # Without filter: total yesterday = 300 vs 100 two days ago -> big spike -> error + test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) + assert test_result["status"] == "fail" + + # With filter on category A: 100 yesterday vs 100 two days ago -> stable -> pass + test_args_filtered = { + **DBT_TEST_ARGS, + "where_expression": "category = 'a'", + } + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + test_args_filtered, + test_vars={"force_metrics_backfill": True}, + ) + assert test_result["status"] == "pass" diff --git a/macros/edr/tests/test_data_freshness_sla.sql b/macros/edr/tests/test_data_freshness_sla.sql new file mode 100644 index 000000000..6b471865b --- /dev/null +++ b/macros/edr/tests/test_data_freshness_sla.sql @@ -0,0 +1,386 @@ +{# + Test: data_freshness_sla + + Verifies that data in a model was updated before a specified SLA deadline time. + Checks the max timestamp value of a specified column in the data itself. + + Use case: "Is the data fresh?" / "Was the data updated on time?" + + Parameters: + timestamp_column (required): Column name containing timestamps to check for freshness + sla_time (required): Deadline time. Supports formats like "07:00", "7am", "2:30pm", "14:30" + timezone (required): IANA timezone name (e.g., "America/Los_Angeles", "Europe/London") + day_of_week (optional): Day(s) to check. String or list: "Monday", ["Monday", "Wednesday"] + day_of_month (optional): Day(s) of month to check. Integer or list: 1, [1, 15] + where_expression (optional): Additional WHERE clause filter for the data query + + Schedule behavior: + - If neither day_of_week nor day_of_month is set: check every day (default) + - If day_of_week is set: only check on those days + - If day_of_month is set: only check on those days + - If both are set: check if today matches EITHER filter (OR logic) + + Example usage: + models: + - name: my_model + tests: + - elementary.data_freshness_sla: + timestamp_column: updated_at + sla_time: "07:00" + timezone: "America/Los_Angeles" + + - name: daily_events + tests: + - elementary.data_freshness_sla: + timestamp_column: event_timestamp + sla_time: "6am" + timezone: "Europe/Amsterdam" + where_expression: "event_type = 'completed'" + + - name: weekly_report_data + tests: + - elementary.data_freshness_sla: + timestamp_column: report_date + sla_time: "09:00" + timezone: "Asia/Tokyo" + day_of_week: ["Monday"] + + Test passes if: + - Today is not a scheduled check day (based on day_of_week/day_of_month) + - OR today's data exists (max timestamp is from the target day or later) + - OR the SLA deadline for today hasn't passed yet (still within the grace period) + + Test fails if: + - Today is a scheduled check day AND the deadline has passed AND: + - No data exists in the table + - The max timestamp is from a previous day (data not updated today) + + Note on DATA_FRESH semantics: + This test checks whether today's data EXISTS, not whether it arrived BEFORE the + SLA deadline. The SLA deadline acts as a grace period: the test won't fail until + the deadline has passed. Once today's data is present (max timestamp >= start of + target day in UTC), the test passes regardless of when the data appeared. + This is intentional because timestamp_column reflects the business timestamp of + the data, not the pipeline ingestion time. + + Important: + - The timestamp_column values are assumed to be in UTC (or timezone-naive timestamps + that represent UTC). If your data stores local timestamps, the comparison against + the SLA deadline (converted to UTC) will be incorrect. +#} +{% test data_freshness_sla( + model, + timestamp_column, + sla_time, + timezone, + day_of_week=none, + day_of_month=none, + where_expression=none +) %} + {{ config(tags=["elementary-tests"]) }} + + {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} + + {# Validate required parameters #} + {% if not timestamp_column %} + {{ + exceptions.raise_compiler_error( + "The 'timestamp_column' parameter is required. Example: timestamp_column: 'updated_at'" + ) + }} + {% endif %} + + {% if not sla_time %} + {{ + exceptions.raise_compiler_error( + "The 'sla_time' parameter is required. Example: sla_time: '07:00'" + ) + }} + {% endif %} + + {# Validate timezone #} + {% do elementary.validate_timezone(timezone) %} + + {# Normalize and validate day filters #} + {% set day_of_week_filter = elementary.normalize_day_of_week(day_of_week) %} + {% set day_of_month_filter = elementary.normalize_day_of_month(day_of_month) %} + + {# Get model relation and validate #} + {% set model_relation = elementary.get_model_relation_for_test( + model, elementary.get_test_model() + ) %} + {% if not model_relation %} + {{ + exceptions.raise_compiler_error( + "Unsupported model: " + ~ model + ~ " (this might happen if you override 'ref' or 'source')" + ) + }} + {% endif %} + + {%- if elementary.is_ephemeral_model(model_relation) %} + {{ + exceptions.raise_compiler_error( + "Test not supported for ephemeral models: " + ~ model_relation.identifier + ) + }} + {%- endif %} + + {# Validate timestamp column exists and is a timestamp type #} + {% set timestamp_column_data_type = ( + elementary.find_normalized_data_type_for_column( + model_relation, timestamp_column + ) + ) %} + {% if not elementary.is_column_timestamp( + model_relation, timestamp_column, timestamp_column_data_type + ) %} + {{ + exceptions.raise_compiler_error( + "Column '" + ~ timestamp_column + ~ "' is not a timestamp type. The timestamp_column must be a timestamp or datetime column." + ) + }} + {% endif %} + + {# Parse the SLA time #} + {% set parsed_time = elementary.parse_sla_time(sla_time) %} + {% set formatted_sla_time = elementary.format_sla_time(parsed_time) %} + + {# Calculate SLA deadline in UTC (also returns current day info) #} + {% set sla_info = elementary.calculate_sla_deadline_utc( + parsed_time.hour, parsed_time.minute, timezone + ) %} + + {# Check if today is a scheduled check day #} + {% set should_check = elementary.should_check_sla_today( + sla_info.day_of_week, + sla_info.day_of_month, + day_of_week_filter, + day_of_month_filter, + ) %} + + {# If today is not a scheduled check day, skip (pass) #} + {% if not should_check %} + {{ + elementary.edr_log( + "Skipping data_freshness_sla test for " + ~ model_relation.identifier + ~ " - not a scheduled check day (" + ~ sla_info.day_of_week + ~ ", day " + ~ sla_info.day_of_month + ~ ")" + ) + }} + {{ elementary.no_results_query() }} + {% else %} + + {{ + elementary.edr_log( + "Running data_freshness_sla test for " + ~ model_relation.identifier + ~ " with SLA " + ~ formatted_sla_time + ~ " " + ~ timezone + ) + }} + + {# Build the query #} + {{ + elementary.get_data_freshness_sla_query( + model_relation=model_relation, + timestamp_column=timestamp_column, + sla_deadline_utc=sla_info.sla_deadline_utc, + target_date=sla_info.target_date, + target_date_start_utc=sla_info.target_date_start_utc, + target_date_end_utc=sla_info.target_date_end_utc, + formatted_sla_time=formatted_sla_time, + timezone=timezone, + where_expression=where_expression, + ) + }} + + {% endif %} + + {%- else %} {{ elementary.no_results_query() }} + {%- endif %} + +{% endtest %} + + +{# + Build SQL query to check if data was updated before SLA deadline. + + Logic: + - Query the model table to get MAX(timestamp_column) + - Compare against today's date boundaries (computed in UTC at compile time) + - If max timestamp is from today or later: data is fresh (today's data exists) + - The SLA deadline acts as a grace period — the test only fails if the + deadline has passed AND today's data is not present + - Otherwise: Data is stale, SLA missed +#} +{% macro get_data_freshness_sla_query( + model_relation, + timestamp_column, + sla_deadline_utc, + target_date, + target_date_start_utc, + target_date_end_utc, + formatted_sla_time, + timezone, + where_expression +) %} + + with + + sla_deadline as ( + select + {{ elementary.edr_cast_as_timestamp("'" ~ sla_deadline_utc ~ "'") }} + as deadline_utc, + {{ + elementary.edr_cast_as_timestamp( + "'" ~ target_date_start_utc ~ "'" + ) + }} as target_date_start_utc, + {{ elementary.edr_cast_as_timestamp("'" ~ target_date_end_utc ~ "'") }} + as target_date_end_utc, + '{{ target_date }}' as target_date + ), + + {# Get the max timestamp from the data #} + max_data_timestamp as ( + select + max( + {{ elementary.edr_cast_as_timestamp(timestamp_column) }} + ) as max_timestamp + from {{ model_relation }} + {% if where_expression %} where {{ where_expression }} {% endif %} + ), + + {# Determine freshness status #} + freshness_result as ( + select + sd.target_date, + sd.deadline_utc as sla_deadline_utc, + mdt.max_timestamp, + case + when mdt.max_timestamp is null + then 'NO_DATA' + {# Data from today or later counts as fresh — this checks whether + today's data EXISTS, not whether it arrived before the SLA deadline. + The SLA deadline is enforced separately via is_failure. #} + when mdt.max_timestamp >= sd.target_date_start_utc + then 'DATA_FRESH' + else 'DATA_STALE' + end as freshness_status + from sla_deadline sd + cross join max_data_timestamp mdt + ), + + final_result as ( + select + '{{ model_relation.identifier }}' as model_name, + target_date, + '{{ formatted_sla_time }}' as sla_time, + '{{ timezone }}' as timezone, + freshness_status, + cast( + max_timestamp as {{ elementary.edr_type_string() }} + ) as max_timestamp, + {{ + elementary.edr_condition_as_boolean( + "freshness_status != 'DATA_FRESH' and " + ~ elementary.edr_current_timestamp_in_utc() + ~ " > sla_deadline_utc" + ) + }} as is_failure, + {# Use a different alias to avoid shadowing the input column. + ClickHouse resolves column refs against output aliases, so + keeping the same name would make is_failure compare against + the string-cast version, causing a type mismatch. #} + cast( + sla_deadline_utc as {{ elementary.edr_type_string() }} + ) as sla_deadline_utc_str, + {# BigQuery does not support '' to escape single quotes inside string literals. + Use \' for BigQuery and '' for all other adapters. #} + {%- if target.type == "bigquery" -%} + {%- set escaped_where = ( + where_expression | replace("'", "\\'") + if where_expression + else "" + ) -%} + {%- else -%} + {%- set escaped_where = ( + where_expression | replace("'", "''") + if where_expression + else "" + ) -%} + {%- endif -%} + {%- set where_suffix = ( + (" (with filter: " ~ escaped_where ~ ")") + if where_expression + else "" + ) -%} + case + when freshness_status = 'NO_DATA' + then + 'No data found in "{{ model_relation.identifier }}"{{ where_suffix }}. Expected data to be updated before {{ formatted_sla_time }} {{ timezone }}.' + when freshness_status = 'DATA_STALE' + then + {{ + elementary.edr_concat( + [ + "'Data in \"" + ~ model_relation.identifier + ~ "\" is stale. Last update was at '", + "cast(max_timestamp as " + ~ elementary.edr_type_string() + ~ ")", + "', which is before today. Expected fresh data before " + ~ formatted_sla_time + ~ " " + ~ timezone + ~ ".'", + ] + ) + }} + else + {{ + elementary.edr_concat( + [ + "'Data in \"" + ~ model_relation.identifier + ~ "\" is fresh, last update at '", + "cast(max_timestamp as " + ~ elementary.edr_type_string() + ~ ")", + "' (before SLA deadline " + ~ formatted_sla_time + ~ " " + ~ timezone + ~ ").'", + ] + ) + }} + end as result_description + from freshness_result + ) + + select + model_name, + target_date, + sla_time, + timezone, + sla_deadline_utc_str as sla_deadline_utc, + freshness_status, + max_timestamp, + result_description + from final_result + where is_failure = {{ elementary.edr_boolean_literal(true) }} + +{% endmacro %} diff --git a/macros/edr/tests/test_utils/sla_utils.sql b/macros/edr/tests/test_utils/sla_utils.sql index 5cf6e11b5..89c123937 100644 --- a/macros/edr/tests/test_utils/sla_utils.sql +++ b/macros/edr/tests/test_utils/sla_utils.sql @@ -4,10 +4,12 @@ {# Validate that a timezone string is a valid IANA timezone name. Raises a clear error if invalid. + + Note: dbt-fusion's modules.pytz has known discrepancies with dbt-core's + pytz (see dbt-labs/dbt-fusion#143). We skip validation in fusion and + fall back to zoneinfo-based validation when pytz is unavailable. #} {% macro validate_timezone(timezone) %} - {% set pytz = modules.pytz %} - {% if not timezone %} {{ exceptions.raise_compiler_error( @@ -16,6 +18,11 @@ }} {% endif %} + {# Skip pytz validation in dbt-fusion due to known discrepancies #} + {% if elementary.is_dbt_fusion() %} {% do return(none) %} {% endif %} + + {% set pytz = modules.pytz %} + {# Check if timezone is in pytz's list of all timezones #} {% if timezone not in pytz.all_timezones %} {{ @@ -52,44 +59,79 @@ {% set datetime = modules.datetime %} {% set pytz = modules.pytz %} - {# Get timezone objects #} - {% set utc_tz = pytz.timezone("UTC") %} - {% set target_tz = pytz.timezone(timezone) %} - - {# Get current time in UTC and target timezone #} - {% set now_utc = datetime.datetime.now(utc_tz) %} - {% set now_local = now_utc.astimezone(target_tz) %} - - {# Target date is today in the target timezone #} - {% set target_date_local = now_local.date() %} - - {# Create start of day (00:00:00) in target timezone #} - {% set day_start_naive = datetime.datetime.combine( - target_date_local, datetime.time(0, 0, 0) - ) %} - {% set day_start_local = target_tz.localize(day_start_naive, is_dst=False) %} - {% set day_start_utc = day_start_local.astimezone(utc_tz) %} - - {# Create end of day (23:59:59.999) in target timezone #} - {% set day_end_naive = datetime.datetime.combine( - target_date_local, datetime.time(23, 59, 59) - ) %} - {% set day_end_local = target_tz.localize(day_end_naive, is_dst=False) %} - {% set day_end_utc = day_end_local.astimezone(utc_tz) %} - - {# Create the SLA deadline in target timezone #} - {# Use is_dst=False to resolve ambiguous times during DST transitions to standard time #} - {% set sla_time_local = datetime.time(sla_hour, sla_minute, 0) %} - {% set sla_deadline_naive = datetime.datetime.combine( - target_date_local, sla_time_local - ) %} - {% set sla_deadline_local = target_tz.localize(sla_deadline_naive, is_dst=False) %} - - {# Convert to UTC #} - {% set sla_deadline_utc = sla_deadline_local.astimezone(utc_tz) %} - - {# Check if deadline has passed #} - {% set deadline_passed = now_utc > sla_deadline_utc %} + {% if elementary.is_dbt_fusion() %} + {# dbt-fusion's pytz.localize() is unreliable (dbt-labs/dbt-fusion#143). + Use stdlib datetime.timezone.utc to create a proper UTC-aware datetime, + then call astimezone(pytz_tz) which uses pytz's fromutc() internally — + more reliable than localize(). #} + {% set target_tz = pytz.timezone(timezone) %} + + {# Create a UTC-aware datetime using pytz.utc — avoids datetime.timezone which + is not exposed in fusion's modules.datetime (dbt-labs/dbt-fusion#143) #} + {% set now_utc_aware = datetime.datetime.now(pytz.utc) %} + {% set now_local = now_utc_aware.astimezone(target_tz) %} + {% set target_date_local = now_local.date() %} + {% set tz_offset = now_local.utcoffset() %} + + {# Keep a naive UTC datetime for final deadline comparison #} + {% set now_utc = datetime.datetime.utcnow() %} + + {# Build all datetimes as naive local, then convert to naive UTC + by subtracting the timezone offset. This avoids tz-aware comparison. #} + {% set day_start_utc = ( + datetime.datetime.combine( + target_date_local, datetime.time(0, 0, 0) + ) + - tz_offset + ) %} + + {% set day_end_utc = ( + datetime.datetime.combine( + target_date_local, datetime.time(23, 59, 59) + ) + - tz_offset + ) %} + + {% set sla_deadline_utc = ( + datetime.datetime.combine( + target_date_local, datetime.time(sla_hour, sla_minute, 0) + ) + - tz_offset + ) %} + + {# Compare naive UTC datetimes #} + {% set deadline_passed = now_utc > sla_deadline_utc %} + {% else %} + {# Standard dbt-core path using pytz.localize() #} + {% set utc_tz = pytz.timezone("UTC") %} + {% set target_tz = pytz.timezone(timezone) %} + + {% set now_utc = datetime.datetime.now(utc_tz) %} + {% set now_local = now_utc.astimezone(target_tz) %} + {% set target_date_local = now_local.date() %} + + {% set day_start_naive = datetime.datetime.combine( + target_date_local, datetime.time(0, 0, 0) + ) %} + {% set day_start_local = target_tz.localize(day_start_naive, is_dst=False) %} + {% set day_start_utc = day_start_local.astimezone(utc_tz) %} + + {% set day_end_naive = datetime.datetime.combine( + target_date_local, datetime.time(23, 59, 59) + ) %} + {% set day_end_local = target_tz.localize(day_end_naive, is_dst=False) %} + {% set day_end_utc = day_end_local.astimezone(utc_tz) %} + + {% set sla_deadline_naive = datetime.datetime.combine( + target_date_local, datetime.time(sla_hour, sla_minute, 0) + ) %} + {% set sla_deadline_local = target_tz.localize( + sla_deadline_naive, is_dst=False + ) %} + {% set sla_deadline_utc = sla_deadline_local.astimezone(utc_tz) %} + + {% set deadline_passed = now_utc > sla_deadline_utc %} + {% endif %} {# Format for SQL #} {% set sla_deadline_utc_str = sla_deadline_utc.strftime("%Y-%m-%d %H:%M:%S") %} diff --git a/macros/edr/tests/test_volume_threshold.sql b/macros/edr/tests/test_volume_threshold.sql new file mode 100644 index 000000000..e681830f8 --- /dev/null +++ b/macros/edr/tests/test_volume_threshold.sql @@ -0,0 +1,322 @@ +{# + Test: volume_threshold + + Monitors row count changes using percentage thresholds with multiple severity levels. + Uses Elementary's metric caching infrastructure to avoid recalculating row counts + for buckets that have already been computed. + + Parameters: + timestamp_column (required): Column to determine time periods + warn_threshold_percent (optional): % change that triggers warning (default: 5) + error_threshold_percent (optional): % change that triggers error (default: 10) + direction (optional): 'both', 'spike', or 'drop' (default: 'both') + time_bucket (optional): Time bucket config, e.g. {period: 'day', count: 1} + where_expression (optional): Additional WHERE filter + days_back (optional): Days of history to keep (default: 14) + backfill_days (optional): Days to backfill on each run (default: 2) + min_row_count (optional): Min baseline rows in previous bucket to trigger check (default: 100) + + Example: + - elementary.volume_threshold: + timestamp_column: created_at + warn_threshold_percent: 5 + error_threshold_percent: 10 + direction: both +#} +{% test volume_threshold( + model, + timestamp_column, + warn_threshold_percent=5, + error_threshold_percent=10, + direction="both", + time_bucket=none, + where_expression=none, + days_back=14, + backfill_days=2, + min_row_count=100 +) %} + {{ + config( + tags=["elementary-tests"], + fail_calc="max(severity_level)", + warn_if=">=1", + error_if=">=2", + ) + }} + + {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} + + {% if warn_threshold_percent < 0 or error_threshold_percent < 0 %} + {{ + exceptions.raise_compiler_error( + "warn_threshold_percent and error_threshold_percent must be non-negative" + ) + }} + {% endif %} + {% if min_row_count < 0 %} + {{ exceptions.raise_compiler_error("min_row_count must be non-negative") }} + {% endif %} + {% if warn_threshold_percent > error_threshold_percent %} + {{ + exceptions.raise_compiler_error( + "warn_threshold_percent cannot exceed error_threshold_percent" + ) + }} + {% endif %} + {% if direction not in ["both", "spike", "drop"] %} + {{ + exceptions.raise_compiler_error( + "direction must be 'both', 'spike', or 'drop'" + ) + }} + {% endif %} + + {% set model_relation = elementary.get_model_relation_for_test( + model, elementary.get_test_model() + ) %} + {% if not model_relation %} + {{ exceptions.raise_compiler_error("Unsupported model: " ~ model) }} + {% endif %} + + {%- if elementary.is_ephemeral_model(model_relation) %} + {{ + exceptions.raise_compiler_error( + "Test not supported for ephemeral models: " + ~ model_relation.identifier + ) + }} + {%- endif %} + + {# Validate timestamp column exists and is a timestamp type #} + {% set timestamp_column_data_type = ( + elementary.find_normalized_data_type_for_column( + model_relation, timestamp_column + ) + ) %} + {% if not elementary.is_column_timestamp( + model_relation, timestamp_column, timestamp_column_data_type + ) %} + {{ + exceptions.raise_compiler_error( + "Column '" + ~ timestamp_column + ~ "' is not a timestamp type. The timestamp_column must be a timestamp or datetime column." + ) + }} + {% endif %} + + {# Collect row_count metrics using Elementary's shared infrastructure. + This handles: incremental bucket detection, metric computation, temp table creation, cache storage. + Pass time_bucket as-is (none = use model/project default via get_time_bucket). #} + {% set table_metrics = [{"type": "row_count", "name": "row_count"}] %} + {% do elementary.collect_table_metrics( + table_metrics=table_metrics, + model_expr=model, + model_relation=model_relation, + timestamp_column=timestamp_column, + time_bucket=time_bucket, + days_back=days_back, + backfill_days=backfill_days, + where_expression=where_expression, + dimensions=[], + ) %} + + {# Build metric_properties to match the filter used by collect_table_metrics. + This must produce the same dict so our data_monitoring_metrics query matches. #} + {% set model_graph_node = elementary.get_model_graph_node(model_relation) %} + {% set metric_props = elementary.get_metric_properties( + model_graph_node, + timestamp_column, + where_expression, + time_bucket, + [], + ) %} + + {# Compare current vs previous bucket, combining cached history + newly computed metrics #} + {{ + elementary.get_volume_threshold_comparison_query( + model_relation=model_relation, + metric_props=metric_props, + warn_threshold_percent=warn_threshold_percent, + error_threshold_percent=error_threshold_percent, + direction=direction, + min_row_count=min_row_count, + ) + }} + + {%- else %} {{ elementary.no_results_query() }} + {%- endif %} +{% endtest %} + + +{% macro get_volume_threshold_comparison_query( + model_relation, + metric_props, + warn_threshold_percent, + error_threshold_percent, + direction, + min_row_count +) %} + + {% set data_monitoring_metrics_table = elementary.get_elementary_relation( + "data_monitoring_metrics" + ) %} + {% set test_metrics_table = elementary.get_elementary_test_table( + elementary.get_elementary_test_table_name(), "metrics" + ) %} + {% set full_table_name = elementary.relation_to_full_name(model_relation) %} + + with + + {# Union persisted history with newly computed metrics from this run #} + all_metrics as ( + select + bucket_start, + bucket_end, + metric_value as row_count, + 1 as source_priority + from {{ data_monitoring_metrics_table }} + where + upper(full_table_name) = upper('{{ full_table_name }}') + and lower(metric_name) = 'row_count' + and metric_properties + = {{ elementary.dict_to_quoted_json(metric_props) }} + + union all + + select + bucket_start, + bucket_end, + metric_value as row_count, + 0 as source_priority + from {{ test_metrics_table }} + ), + + {# Deduplicate: prefer freshly computed metrics (priority 0) over cached history (priority 1) #} + ranked_metrics as ( + select + bucket_start, + bucket_end, + row_count, + row_number() over ( + partition by bucket_start, bucket_end order by source_priority asc + ) as rn + from all_metrics + ), + + metrics as ( + select bucket_start, bucket_end, row_count from ranked_metrics where rn = 1 + ), + + {# Use ROW_NUMBER + self-join instead of LAG to avoid a DuckDB internal + binder bug where LAG over UNION ALL sources confuses TIMESTAMP and + FLOAT column types (Failed to bind column reference "bucket_end"). + Cast bucket_num to signed int to avoid ClickHouse UInt64/Int64 type + mismatch in the JOIN condition (bucket_num - 1 promotes to Int64). + Use a separate max_bucket CTE instead of a scalar subquery to avoid + SQL Server / Fabric "Invalid object name" errors when referencing a + CTE inside a nested subquery in another CTE's WHERE clause. #} + bucket_numbered as ( + select + bucket_start, + bucket_end, + row_count, + {{ + elementary.edr_cast_as_int( + "row_number() over (order by bucket_end)" + ) + }} as bucket_num + from metrics + ), + + max_bucket as (select max(bucket_num) as max_num from bucket_numbered), + + comparison as ( + select + curr.bucket_end as current_period, + prev_b.bucket_end as previous_period, + {{ elementary.edr_cast_as_int("curr.row_count") }} as current_row_count, + {{ elementary.edr_cast_as_int("prev_b.row_count") }} + as previous_row_count, + case + when prev_b.row_count is null + then null + when prev_b.row_count = 0 + then case when curr.row_count > 0 then 999999.99 else 0 end + else + round( + cast( + (curr.row_count - prev_b.row_count) + * 100.0 + / prev_b.row_count + as {{ elementary.edr_type_numeric() }} + ), + 2 + ) + end as percent_change + from bucket_numbered curr + inner join max_bucket on curr.bucket_num = max_bucket.max_num + left join bucket_numbered prev_b on prev_b.bucket_num = curr.bucket_num - 1 + ), + + volume_result as ( + select + *, + case + when + previous_row_count is null + or previous_row_count < {{ min_row_count }} + then 0 + when percent_change is null + then 0 + {% if direction == "both" %} + when abs(percent_change) >= {{ error_threshold_percent }} + then 2 + when abs(percent_change) >= {{ warn_threshold_percent }} + then 1 + {% elif direction == "spike" %} + when percent_change >= {{ error_threshold_percent }} + then 2 + when percent_change >= {{ warn_threshold_percent }} + then 1 + {% else %} + when percent_change <= -{{ error_threshold_percent }} + then 2 + when percent_change <= -{{ warn_threshold_percent }} + then 1 + {% endif %} + else 0 + end as severity_level + from comparison + ) + + select + '{{ model_relation.identifier }}' as model_name, + cast(current_period as {{ elementary.edr_type_string() }}) as current_period, + cast(previous_period as {{ elementary.edr_type_string() }}) as previous_period, + current_row_count, + previous_row_count, + current_row_count - previous_row_count as absolute_change, + percent_change, + severity_level, + case + severity_level when 2 then 'error' when 1 then 'warn' else 'pass' + end as severity_name, + {{ + elementary.edr_concat( + [ + "'Row count changed by '", + "cast(percent_change as " ~ elementary.edr_type_string() ~ ")", + "'% (from '", + "cast(previous_row_count as " + ~ elementary.edr_type_string() + ~ ")", + "' to '", + "cast(current_row_count as " ~ elementary.edr_type_string() ~ ")", + "')'", + ] + ) + }} as result_description + from volume_result + +{% endmacro %}