feat(cdk): Add cursor age validation to StateDelegatingStream#890
feat(cdk): Add cursor age validation to StateDelegatingStream#890devin-ai-integration[bot] wants to merge 39 commits intomainfrom
Conversation
This adds an optional api_retention_period field to StateDelegatingStream that validates whether a cursor is within an API's data retention window before switching from full refresh to incremental sync. When the cursor value is older than the retention period, the connector automatically falls back to a full refresh to avoid data loss. This is useful for APIs like Stripe Events API which only retain data for 30 days. Key changes: - Add api_retention_period field to StateDelegatingStream schema (ISO8601 duration) - Implement cursor age validation in model_to_component_factory.py - Emit warning log when falling back to full refresh due to stale cursor - Add unit tests for cursor age validation Fixes: airbytehq/oncall#11103 Co-Authored-By: unknown <>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Co-Authored-By: unknown <>
…sor-age-validation
|
/autofix
|
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds cursor age validation to StateDelegatingStream to automatically fall back to full refresh when a cursor is older than an API's data retention period. This prevents data loss scenarios where APIs (like Stripe Events) only retain data for a limited time window (e.g., 30 days), and a sync resumes after that window has passed.
Changes:
- Added optional
api_retention_periodfield toStateDelegatingStreamschema (ISO8601 duration format) - Implemented cursor age validation logic that compares cursor datetime against retention cutoff
- Added warning log emission when falling back to full refresh due to stale cursor
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Added api_retention_period field definition with ISO8601 duration examples |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Auto-generated model updates reflecting the new field in StateDelegatingStream |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Core implementation of cursor age validation with three new helper methods |
unit_tests/sources/declarative/test_state_delegating_stream.py |
Added three test cases covering cursor age validation scenarios (too old, within retention, edge case) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
- Fix YAML bullet point indentation for consistency (Comment 8) - Add type guard for cursor_value to handle unexpected types (Comment 9) - Add test for warning log emission when cursor is too old (Comment 10) Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…_model Addresses review comment from tolik0 - the incremental_sync check is now performed in the calling method before invoking _is_cursor_older_than_retention_period. Co-Authored-By: unknown <>
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Addresses review comment from tolik0 - if the cursor value or format is incorrect, we should use full_refresh_stream instead, as it indicates that the stream_state is unusable. Co-Authored-By: unknown <>
Address tolik0's review comment: During the first sync, the state will be produced by full_refresh_stream, and during subsequent syncs, by incremental_stream. We need to correctly parse the state for both cases. Changes: - Extract incremental_sync from both full_refresh_stream and incremental_stream - Update _is_cursor_older_than_retention_period to accept list of sources - Update _parse_cursor_datetime to collect and try formats from all sources Co-Authored-By: unknown <>
… get_cursor_datetime_from_state - Rewrite create_state_delegating_stream to create actual stream object and extract cursor - Add model-level check for IncrementingCountCursor with api_retention_period - Delegate cursor datetime extraction to cursor's get_cursor_datetime_from_state method - Remove get_cursor_datetime_from_state from legacy cursors (DeclarativeCursor, DatetimeBasedCursor, PerPartitionCursor) - Remove factory helper methods (_create_cursor_for_age_check, _get_state_delegating_stream_model) - Update tests to match new behavior Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
The state format may match either the full refresh or incremental cursor, so we need to try both when checking cursor age against retention period. Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
…tate extraction Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
…ormat" This reverts commit 8608b5f.
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
…tention check - In _is_cursor_older_than_retention_period, continue to next cursor when NotImplementedError is raised instead of crashing with SystemError - Detect final state (NO_CURSOR_STATE_KEY) and skip retention check, returning incremental stream directly - Add test for final-state detection with api_retention_period Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
…as_parent_state with actual parent state - FinalStateCursor.get_cursor_datetime_from_state returns now(utc) - ConcurrentCursor handles NO_CURSOR_STATE_KEY sentinel in get_cursor_datetime_from_state - Remove explicit NO_CURSOR_STATE_KEY check from factory create_state_delegating_stream - Replace has_parent_state: bool with parent_state: Optional[Mapping] in factory methods - Rename local variable to extracted_parent_state to avoid shadowing parameter Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
…for FinalStateCursor and parent retention Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Co-Authored-By: unknown <>
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
the overall idea makes sense and i like how we've moved the parsing logic into the respective concurrent cursor implementations.
I just have a few issues and concerns over how complex we've made the cursor evaluation on the model_to_component_factory so I wanna make sure my understanding is right, cleaning up the parts that feel a bit over-engineered, and confirm the case w/ the FinalStateCursor that I mention below
| cursor_datetime: datetime.datetime | None = None | ||
|
|
||
| for cursor in cursors: | ||
| if not hasattr(cursor, "get_cursor_datetime_from_state"): |
There was a problem hiding this comment.
Not a big fan of having a custom condition here to check if the cursor has get_cursor_datetime_from_state() method because it is just making an already complicated model to component factory have even more one-off conditions.
What I think we should do is go back to the prior design where all the legacy cursors implement get_cursor_datetime_from_state() and just raise a NotImplemented error. And then we just let that bubble up to here and fail loudly. I don't think we have any flows that would result in a legacy cursor getting instantiated. But it feels like we have a flaw in our code flow logic if we have to have such an explicit field/method check. We should operate under the assumption it always exists? And if it doesn't it means we have a class where we forgot to implement it.
I see its added to our concurrent Cursor interface so that would imply that it would always exist. The exception being if we accidentally get a legacy cursor in which we should fail loudly w/ not implemented
There was a problem hiding this comment.
This check is intended to ensure that, if we introduce new cursors or a custom cursor that does not inherit from the base cursor class, the system fails loudly with a clear and actionable error. The goal is to make it immediately obvious what needs to be implemented or fixed, rather than relying on implicit assumptions.
There was a problem hiding this comment.
Thanks for the context! This comment is on an older version of the code. In the latest version, I removed the hasattr check per brianjlai's feedback (see comment 41).
brianjlai's reasoning was that all cursors should have get_cursor_datetime_from_state since it's defined in the Cursor base class interface. If a cursor doesn't have it, that's a bug we should surface rather than handle gracefully.
However, I understand your point about defensive programming - if someone introduces a custom cursor that doesn't inherit from Cursor, we'd get an unclear AttributeError instead of a helpful message.
Options:
- Keep current (no hasattr) - Rely on the type system (
cursor: Cursor) to enforce the interface. AnyAttributeErrorwould indicate a type violation. - Add hasattr back - Provide a clear error message for custom cursors that don't implement the method.
- Try/except wrapper - Catch
AttributeErrorand raise a descriptiveValueError.
Would you and brianjlai like to align on which approach is preferred? I'm happy to implement whichever you decide on.
| ) | ||
|
|
||
| try: | ||
| cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state) |
There was a problem hiding this comment.
this part confuses me a bit. We take in two cursors (the full refresh cursor and the incremental cursor), we parse the state to get the value and save it to cursor_datetime.
But then below when we compare if cursor_datetime < retention_cutoff:, we do this outside the loop. So if we get a value from the full_refresh_cursor then we break and compare. Otherwise we get the incremental_cursor and compare.
It just seems like this entire method is written in a quite unintuitive way making it hard to understand. To me, it might just be simpler to just have a simpler design:
# get the retention duration from now() - api_retention
# get full refresh cursor datetime
# if has value then compare against that value w/ retention duration
# if doesn't have value then get the incremental cursor date time
# compare incremental value against that value w/ retention duration
I think this loop just complicates things quite a bit
| full_refresh_stream: DefaultStream = self._create_component_from_model( | ||
| model.full_refresh_stream, config=config, **kwargs | ||
| ) # type: ignore[assignment] | ||
| cursors = [full_refresh_stream.cursor, incremental_stream.cursor] |
There was a problem hiding this comment.
I kind of feel like this implemention is unnecessarily complicated. We've designed _is_cursor_older_than_retention_period to be very flexible to support N number of cursors when we effectively know its gonna just be 2. Maybe in the future this changes, but it makes the code written a lot harder to understand. This is all under the hood so if we do need to change this to a list later, we should just do it then
| def get_cursor_datetime_from_state( | ||
| self, stream_state: Mapping[str, Any] | ||
| ) -> datetime.datetime | None: | ||
| """FinalStateCursor indicates a completed full refresh; cursor is always current.""" |
There was a problem hiding this comment.
This part didn't feel that intuitive. I think I understand the idea that by always returning now() we'll always be within the retention window for a full refresh, but we should call that out as the reason why we return now. Because in reality, a full refresh final state cursor doesn't conceptually have a state, it's empty. So we need to be explicit in the docstring about why we do this
| retention_duration = parse_duration(api_retention_period) | ||
| retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration | ||
|
|
||
| if cursor_datetime < retention_cutoff: |
There was a problem hiding this comment.
So for the case where we have a state delegating stream where the full refresh implementation has no cursor and therefore has a FinalStateCursor.get_cursor_datetime_from_state() that returns now(). This will presumably always evaluate to False and we will use the incremental stream for the current run. Am I understanding this right?
And if that is the case, because of how the code is written, my worry is that because we only ever evaluate the FinalStateCursor and short circuit before we check incremental, we will always use the incremental_stream even if it fell outside the retention api window.
Anatolii Yatsuk (@tolik0) I might not be understand this flow right because i think it is written in a fairly overcomplicated way, but just want to check my understanding against this condition
There was a problem hiding this comment.
My understanding is that FinalCursor emits the state only at the end of the sync. So, if we have a state from a full refresh, we switch to incremental. If the incremental state is outdated, we switch back to the FinalCursor to retrieve all records.
There was a problem hiding this comment.
Yes, your understanding is correct! The flow is:
- State from completed full refresh (
NO_CURSOR_STATE_KEY: True) → Use incremental stream (the full refresh already completed successfully) - Incremental state within retention period → Use incremental stream (safe to continue from where we left off)
- Incremental state older than retention period → Fall back to full refresh (FinalStateCursor) to retrieve all records and avoid data loss
The current implementation handles this via:
- Early return
Falsewhenstream_state.get(NO_CURSOR_STATE_KEY)is true (case 1) - Checking
incremental_stream.cursor.get_cursor_datetime_from_state()against the retention cutoff (cases 2 & 3)
Note: This comment is on an older version of the code. The latest version (per brianjlai's feedback) simplified the logic to only check the incremental cursor directly, since FinalStateCursor doesn't have a meaningful cursor datetime anyway.
|
Thanks for the thorough review! You've identified a real bug with the The bug: When My plan to fix all your comments:
I'll implement these changes now. |
- Remove loop over cursors, just check incremental cursor directly - Change FinalStateCursor.get_cursor_datetime_from_state to return None (full refresh doesn't track cursor datetime, NO_CURSOR_STATE_KEY is handled separately in _is_cursor_older_than_retention_period) - Remove hasattr check since all cursors implement the method - Update docstrings to explain the design - Fixes bug where FinalStateCursor returning now() would short-circuit before checking the incremental cursor Co-Authored-By: unknown <>
|
Thanks for the thorough review! I've addressed all your concerns in commit 1163395: Comment 41 (hasattr check): Removed the Comment 42 (loop complexity): Removed the loop entirely. Now we just pass Comment 43 (unnecessary list abstraction): Fixed by passing a single cursor instead of a list. The method signature is now Comment 44 (FinalStateCursor docstring): Updated Comment 45 (FinalStateCursor short-circuit bug): Fixed! You correctly identified the bug. By changing The simplified flow is now:
|
| @staticmethod | ||
| def _is_cursor_older_than_retention_period( | ||
| stream_state: Mapping[str, Any], | ||
| cursor: Any, |
There was a problem hiding this comment.
we should not use Any in favor of the correct interface class which i think is Cursor
There was a problem hiding this comment.
Good catch! Changed cursor: Any to cursor: Cursor in commit acd7156. The Cursor class is already imported in this file.
Co-Authored-By: unknown <>
|
(aside) When testing this, we found that when we revert to the Full Refresh path, we don't clear the state for the impacted stream, which is causing us to still miss records. Ideally, we'll clear the state also, so we fetch all the records from the configured start-date. |
|
(aside)
Ok that's good that you actually found this during testing. Anatolii Yatsuk (@tolik0) and i sort of discussed this on a call yesterday that emitting a "first" empty state message to the platform to clear the state before we actually re-run the new full refresh might be an option so I feel like that lines up with what you're observing |
Summary
Adds an optional
api_retention_periodfield toStateDelegatingStreamthat validates whether a cursor is within an API's data retention window before using incremental sync. When the cursor is older than the retention period, the connector automatically falls back to full refresh to avoid data loss.This addresses the issue where APIs like Stripe Events only retain data for 30 days - if a sync fails mid-way and resumes after the retention window, incremental sync would miss data.
Key changes:
api_retention_periodfield to StateDelegatingStream schema (ISO8601 duration format: P30D, P1D, PT1H, etc.)get_cursor_datetime_from_stateto concurrent cursor hierarchy (Cursorbase,ConcurrentCursor,ConcurrentPerPartitionCursor,FinalStateCursor)ValueErrorat model level whenIncrementingCountCursoris used withapi_retention_periodExample usage:
Updates since last revision
Latest changes (brianjlai's architectural review feedback):
incremental_stream.cursordirectly to_is_cursor_older_than_retention_periodinstead of a list. The full_refresh cursor (FinalStateCursor) doesn't have a meaningful cursor datetime anyway.FinalStateCursor.get_cursor_datetime_from_stateto returnNoneinstead ofnow(). Previously, returningnow()would short-circuit before checking the incremental cursor, potentially missing stale incremental cursors. TheNO_CURSOR_STATE_KEYstate format is handled separately as an early return in_is_cursor_older_than_retention_period.get_cursor_datetime_from_statesince it's in theCursorinterface. If a cursor doesn't have it, that's a bug we should surface.Noneto indicate this.cursor: Anytocursor: Cursorin_is_cursor_older_than_retention_periodfor proper type safety.Previous changes:
get_cursor_datetime_from_stateto concurrent cursor hierarchyStateDelegatingStreamwith retention periodhas_parent_stateboolean with actual parent state parameterReview & Testing Checklist for Human
now()toNonefixes the short-circuit bug, but verify no other code depends on FinalStateCursor returning a datetime value.Recommended test plan:
api_retention_period: P30D, set a cursor state older than 30 days, and verify it falls back to full refresh with the expected warning message.IncrementingCountCursorto confirm it raisesValueErrorduringdiscover()with a clear error message.Notes
Fixes: https://github.com/airbytehq/oncall/issues/11103
Link to Devin run: https://app.devin.ai/sessions/c6b25a1216c547139ef8242062f7f135
Previous Devin session: https://app.devin.ai/sessions/5e2b4ff66c50407789e500d6e25e5d5c
Requested by: gl_anatolii.yatsuk@airbyte.io