Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a51ff11
Add data_freshness_sla and volume_threshold tests
joostboon Feb 18, 2026
9fc552e
fix: handle missing table in read_table when raise_if_empty=False (Bi…
joostboon Feb 18, 2026
9bfc0b7
Revert "fix: handle missing table in read_table when raise_if_empty=F…
joostboon Feb 18, 2026
9c888fa
Merge remote-tracking branch 'origin/master' into feature/volume-thre…
haritamar Mar 12, 2026
be7de95
Add integration tests for data_freshness_sla and volume_threshold (#965)
devin-ai-integration[bot] Mar 12, 2026
4bb34e6
Fix sqlfmt issues and Postgres round() bug in test macros
devin-ai-integration[bot] Mar 12, 2026
09de7d1
Merge remote-tracking branch 'origin/master' into feature/volume-thre…
devin-ai-integration[bot] Mar 12, 2026
6e75bc8
Fix volume_threshold integration tests to use complete buckets
devin-ai-integration[bot] Mar 12, 2026
0184a34
Fix cross-database compatibility in volume_threshold macro
devin-ai-integration[bot] Mar 12, 2026
c858993
Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword)
devin-ai-integration[bot] Mar 12, 2026
da156c4
Fix Dremio: rename 'result' CTE to 'volume_result' (reserved keyword)
devin-ai-integration[bot] Mar 12, 2026
e83c89c
Fix fusion pytz issue and SQL Server/Fabric concat in data_freshness_sla
devin-ai-integration[bot] Mar 12, 2026
b47df2a
Fix fusion pytz.localize() producing incorrect results in calculate_s…
devin-ai-integration[bot] Mar 12, 2026
b4d1f39
Fix fusion: use naive UTC datetimes to avoid broken tz-aware operations
devin-ai-integration[bot] Mar 12, 2026
cad4331
Fix Fabric, BigQuery, and fusion failures in volume_threshold and dat…
joostboon Mar 13, 2026
c6873c9
Fix sqlfmt formatting in test_data_freshness_sla.sql
joostboon Mar 13, 2026
380a9fa
Fix Fabric: replace scalar subquery with JOIN in previous_bucket CTE
joostboon Mar 13, 2026
5c289d1
Fix sqlfmt formatting in test_volume_threshold.sql
joostboon Mar 13, 2026
75a0779
Fix freshness SLA deadline check and volume threshold Fabric compatib…
haritamar Mar 13, 2026
ccd5be4
Revert fusion path to use naive UTC datetimes instead of datetime.tim…
haritamar Mar 13, 2026
bba307d
Address CodeRabbit review feedback
haritamar Mar 13, 2026
a2f6944
Fix volume threshold assertions: dbt reports 'fail' not 'error'
haritamar Mar 13, 2026
33d17f7
Fix ClickHouse and DuckDB CI failures
haritamar Mar 13, 2026
23db0bf
Fix ClickHouse freshness SLA: rename string-cast alias to avoid shado…
haritamar Mar 13, 2026
b277efa
Fix DuckDB volume_threshold: replace LAG with ROW_NUMBER + self-join
haritamar Mar 14, 2026
ec3220f
Fix fusion: use pytz.utc instead of datetime.timezone.utc; fix brittl…
joostboon Mar 14, 2026
6c5968e
Fix Fabric, ClickHouse, and Vertica failures in volume_threshold and …
joostboon Mar 14, 2026
d809c8b
Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword in Dremio)
devin-ai-integration[bot] Mar 14, 2026
8c5a3c2
Address CodeRabbit review comments: clarify DATA_FRESH semantics, han…
devin-ai-integration[bot] Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions integration_tests/tests/test_data_freshness_sla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from datetime import datetime, timedelta

from data_generator import DATE_FORMAT
from dbt_project import DbtProject

TEST_NAME = "elementary.data_freshness_sla"
TIMESTAMP_COLUMN = "updated_at"


def test_fresh_data_passes(test_id: str, dbt_project: DbtProject):
"""Data updated today should pass when the SLA deadline has already passed."""
utc_now = datetime.utcnow()
data = [
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
{TIMESTAMP_COLUMN: (utc_now - timedelta(hours=1)).strftime(DATE_FORMAT)},
]
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "11:59pm",
"timezone": "UTC",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_stale_data_fails(test_id: str, dbt_project: DbtProject):
"""Data only from previous days should fail when today's SLA deadline has passed."""
utc_now = datetime.utcnow()
yesterday = utc_now - timedelta(days=2)
data = [
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
{TIMESTAMP_COLUMN: (yesterday - timedelta(hours=5)).strftime(DATE_FORMAT)},
]
# Use 12:01am UTC (= 00:01 UTC) so the deadline is always in the past when
# CI runs (typically 07:00+ UTC). Etc/GMT-14 was ambiguous in some pytz
# versions and caused Vertica to return wrong results.
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "12:01am",
"timezone": "UTC",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"


def test_no_data_fails(test_id: str, dbt_project: DbtProject):
"""An empty table (after WHERE filter) should fail when deadline has passed."""
utc_now = datetime.utcnow()
# Seed with data that will be excluded by the where_expression
data = [
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "excluded"},
]
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "12:01am",
"timezone": "UTC",
"where_expression": "category = 'included'",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"


def test_deadline_not_passed_does_not_fail(test_id: str, dbt_project: DbtProject):
"""Even if data is stale, the test should pass if the deadline hasn't passed yet."""
utc_now = datetime.utcnow()
yesterday = utc_now - timedelta(days=2)
data = [
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
]
# Set the deadline to 11:59pm UTC so it reliably hasn't passed yet.
# (Etc/GMT-14 = UTC+14 means 11:59pm there = 09:59 UTC — not reliably future)
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "11:59pm",
"timezone": "UTC",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_with_where_expression(test_id: str, dbt_project: DbtProject):
"""The where_expression should filter which rows count toward freshness."""
utc_now = datetime.utcnow()
yesterday = utc_now - timedelta(days=2)
data = [
# Fresh data for category A
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "a"},
# Stale data for category B
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"},
]
# Test with category A (fresh data) -> should pass
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "11:59pm",
"timezone": "UTC",
"where_expression": "category = 'a'",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"

# Test with category B (stale data) and early deadline -> should fail
test_args_stale = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "12:01am",
"timezone": "UTC",
"where_expression": "category = 'b'",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args_stale)
assert test_result["status"] == "fail"


def test_with_timezone(test_id: str, dbt_project: DbtProject):
"""Test that timezone conversion works correctly."""
utc_now = datetime.utcnow()
data = [
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
]
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"sla_time": "11:59pm",
"timezone": "America/New_York",
}
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"
224 changes: 224 additions & 0 deletions integration_tests/tests/test_volume_threshold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from datetime import datetime, timedelta

from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject

TIMESTAMP_COLUMN = "updated_at"
DBT_TEST_NAME = "elementary.volume_threshold"
DBT_TEST_ARGS = {
"timestamp_column": TIMESTAMP_COLUMN,
"time_bucket": {"period": "day", "count": 1},
"days_back": 14,
"backfill_days": 14,
}


def _generate_stable_data(rows_per_day=100, days_back=14):
"""Generate data with a consistent number of rows per day bucket.

Note: Elementary only processes *complete* buckets (the latest full bucket
before ``run_started_at``). With daily buckets that means "yesterday" is
the newest bucket the macro will ever look at. We therefore generate data
up to yesterday only so that all buckets are complete.
"""
yesterday = datetime.utcnow().date() - timedelta(days=1)
data = []
for cur_date in generate_dates(base_date=yesterday, days_back=days_back):
for _ in range(rows_per_day):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
return data


def test_stable_volume_passes(test_id: str, dbt_project: DbtProject):
"""Consistent row counts across buckets should pass."""
data = _generate_stable_data(rows_per_day=100)
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "pass"


def test_large_spike_fails(test_id: str, dbt_project: DbtProject):
"""A large spike in row count (>10% default error threshold) should fail."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago (previous bucket): 100 rows
for _ in range(100):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday (current bucket — latest complete bucket): 150 rows (50% spike)
for _ in range(150):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"


def test_large_drop_fails(test_id: str, dbt_project: DbtProject):
"""A large drop in row count (>10% default error threshold) should fail."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago (previous bucket): 100 rows
for _ in range(100):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday (current bucket — latest complete bucket): 50 rows (50% drop)
for _ in range(50):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"


def test_direction_spike_ignores_drop(test_id: str, dbt_project: DbtProject):
"""With direction=spike, a drop should not trigger a failure."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago: 100 rows
for _ in range(100):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday: 50 rows (50% drop)
for _ in range(50):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

test_args = {**DBT_TEST_ARGS, "direction": "spike"}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_direction_drop_ignores_spike(test_id: str, dbt_project: DbtProject):
"""With direction=drop, a spike should not trigger a failure."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago: 100 rows
for _ in range(100):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday: 150 rows (50% spike)
for _ in range(150):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

test_args = {**DBT_TEST_ARGS, "direction": "drop"}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"


def test_min_row_count_skips_small_baseline(test_id: str, dbt_project: DbtProject):
"""When previous bucket has fewer rows than min_row_count, check is skipped (pass)."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: only 5 rows each (below default min_row_count=100)
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(5):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago: 5 rows
for _ in range(5):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday: 50 rows (huge spike but baseline is too small)
for _ in range(50):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "pass"


def test_custom_thresholds(test_id: str, dbt_project: DbtProject):
"""Custom thresholds should control the sensitivity of the test."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
# Two days ago: 100 rows
for _ in range(100):
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
# Yesterday: 108 rows (8% change)
for _ in range(108):
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})

# With default thresholds (warn=5, error=10), 8% should warn but not error
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "warn"

# With high thresholds (warn=20, error=50), 8% should pass
test_args_high = {
**DBT_TEST_ARGS,
"warn_threshold_percent": 20,
"error_threshold_percent": 50,
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args_high,
test_vars={"force_metrics_backfill": True},
)
assert test_result["status"] == "pass"


def test_where_expression(test_id: str, dbt_project: DbtProject):
"""The where_expression should filter which rows are counted."""
yesterday = datetime.utcnow().date() - timedelta(days=1)
two_days_ago = yesterday - timedelta(days=1)
data = []
# Older days: 100 rows of category A each
for cur_date in generate_dates(base_date=yesterday, days_back=14):
if cur_date < two_days_ago:
for _ in range(100):
data.append(
{TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "category": "a"}
)
# Two days ago: 100 rows of category A
for _ in range(100):
data.append(
{TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT), "category": "a"}
)
# Yesterday: 100 rows of category A (stable) + 200 rows of category B (noise)
for _ in range(100):
data.append(
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "a"}
)
for _ in range(200):
data.append(
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"}
)

# Without filter: total yesterday = 300 vs 100 two days ago -> big spike -> error
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"

# With filter on category A: 100 yesterday vs 100 two days ago -> stable -> pass
test_args_filtered = {
**DBT_TEST_ARGS,
"where_expression": "category = 'a'",
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args_filtered,
test_vars={"force_metrics_backfill": True},
)
assert test_result["status"] == "pass"
Loading
Loading