fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889darynaishchenko wants to merge 11 commits intomainfrom
Conversation
…re read in partition
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-routerPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughChanged SubstreamPartitionRouter to add Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt? 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
42-49:⚠️ Potential issue | 🔴 CriticalType annotation mismatch causing pipeline failure.
The pipeline is flagging a type error here. Yielding
(None, True)producestuple[None, bool], but the return type expectstuple[T, bool]. Since the sentinel introducesNoneas a possible value, would updating the return type toOptional[T]work for you, wdyt?🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]: +def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Outdated
Show resolved
Hide resolved
|
/prerelease
|
…titionRouter null handling Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
42-49:⚠️ Potential issue | 🟠 MajorFix mypy type mismatch in
iterate_with_last_flag.
Line 48 yields(None, True), but the signature advertisestuple[T, bool], which is tripping mypy. Could we widen the return type toOptional[T]to match the sentinel, wdyt?🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]: +def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
186-187:⚠️ Potential issue | 🟡 MinorUpdate docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emitsparent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?📝 Suggested doc tweak
- If a parent slice contains no record, emit a slice with parent_record=None. + If a parent slice contains no record, emit no slice.
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Show resolved
Hide resolved
… when no records Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.
| try: | ||
| partition_value = dpath.get( | ||
| record_data, # type: ignore [arg-type] | ||
| parent_field, | ||
| ) | ||
| except KeyError: | ||
| # FIXME a log here would go a long way for debugging | ||
| continue | ||
|
|
||
| # Add extra fields | ||
| extracted_extra_fields = self._extract_extra_fields( | ||
| record_data, extra_fields | ||
| ) | ||
|
|
||
| if parent_stream_config.lazy_read_pointer: | ||
| extracted_extra_fields = { | ||
| "child_response": self._extract_child_response( | ||
| record_data, | ||
| parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config | ||
| ), | ||
| **extracted_extra_fields, | ||
| } | ||
|
|
||
| if is_last_record_in_slice: | ||
| parent_stream.cursor.close_partition(partition) | ||
| if is_last_slice: | ||
| parent_stream.cursor.ensure_at_least_one_state_emitted() | ||
|
|
||
| yield StreamSlice( | ||
| partition={ | ||
| partition_field: partition_value, | ||
| "parent_slice": parent_partition or {}, | ||
| }, | ||
| cursor_slice={}, | ||
| extra_fields=extracted_extra_fields, | ||
| ) | ||
| if parent_record is not None: | ||
| yield StreamSlice( | ||
| partition={ | ||
| partition_field: partition_value, | ||
| "parent_slice": parent_partition or {}, | ||
| }, | ||
| cursor_slice={}, | ||
| extra_fields=extracted_extra_fields, | ||
| ) |
There was a problem hiding this comment.
Avoid skipping partition closure when partition-value extraction fails.
The early continue in the extraction-failure path skips close_partition() / ensure_at_least_one_state_emitted() when that record is last, leaving the cursor partition open. Could we switch to a skip_slice flag so closure always runs and only slice emission is skipped, wdyt?
🔧 Possible fix
for parent_record, is_last_record_in_slice in iterate_with_last_flag(
partition.read()
):
- if parent_record is not None:
+ skip_slice = True
+ if parent_record is not None:
# In the previous CDK implementation, state management was done internally by the stream.
# However, this could cause issues when doing availability check for example as the availability
# check would progress the state so state management was moved outside of the read method.
# Hence, we need to call the cursor here.
# Note that we call observe and close_partition before emitting the associated record as the
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
# record was consumed.
parent_stream.cursor.observe(parent_record)
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
record_data = parent_record.data
try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
- continue
-
- # Add extra fields
- extracted_extra_fields = self._extract_extra_fields(
- record_data, extra_fields
- )
-
- if parent_stream_config.lazy_read_pointer:
- extracted_extra_fields = {
- "child_response": self._extract_child_response(
- record_data,
- parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
- ),
- **extracted_extra_fields,
- }
+ skip_slice = True
+ else:
+ skip_slice = False
+
+ # Add extra fields
+ extracted_extra_fields = self._extract_extra_fields(
+ record_data, extra_fields
+ )
+
+ if parent_stream_config.lazy_read_pointer:
+ extracted_extra_fields = {
+ "child_response": self._extract_child_response(
+ record_data,
+ parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
+ ),
+ **extracted_extra_fields,
+ }
if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()
- if parent_record is not None:
+ if not skip_slice:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)🤖 Prompt for AI Agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`
around lines 231 - 267, The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.
…_update_with_cursor Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…Error - Replace 'continue' with 'skip_slice' flag when dpath.get fails with KeyError - This ensures close_partition() and ensure_at_least_one_state_emitted() are always called, even when partition value extraction fails - Add test for partition closure when parent_key is missing from record Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
- Renamed skip_slice to emit_slice and inverted the logic - Now emit_slice=False by default, set to True when extraction succeeds - Makes the code more intuitive: we emit slices when conditions are met Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… exists, False on KeyError - emit_slice = parent_record is not None (default True when we have a record) - Set emit_slice = False only when KeyError occurs during dpath.get - This makes the logic clearer: we want to emit slices by default, skip only on error Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Summary
This PR fixes an issue where
SubstreamPartitionRouterwould incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughoutstream_slices()to gracefully handle:iterate_with_last_flagnow yields(None, True)sentinel when the input generator is empty, andstream_slices()breaks out of the partition loop when partition is None (preserving multi-parent behavior)if parent_record is not None)skip_sliceflag instead ofcontinueto ensureclose_partition()andensure_at_least_one_state_emitted()are always called, even whendpath.getfailsUpdates since last revision
if partition is None: returntoif partition is None: breakper review feedback. Usingbreakensures that if one parent stream has no partitions, we continue processing subsequentparent_stream_configsrather than exiting the entire method.test_substream_partition_router_closes_all_partitions_even_when_no_recordsto verify thatcursor.close_partition()is called for ALL partitions, including those with no records.iterate_with_last_flagreturn type fromtuple[T, bool]totuple[T | None, bool]to properly reflect that the function yields(None, True)as a sentinel for empty generators.test_substream_slicer_parent_state_update_with_cursorto include properlookback_window: 1andstatevalues.skip_sliceflag pattern per CodeRabbit review feedback. Replacedcontinuewith a flag-based approach whendpath.getraisesKeyError, ensuring partition closure always runs even when slice emission is skipped. Addedtest_substream_partition_router_closes_partition_even_when_parent_key_missingto verify this behavior.Review & Testing Checklist for Human
skip_sliceflag ensuresclose_partition()is called even whenparent_keyextraction fails with KeyError. Confirm this doesn't introduce regressions in normal slice emission.if partition is None: breakexits only the current parent's partition loop. Test with a connector that has multipleparent_stream_configswhere one parent has no partitions to confirm subsequent parents are still processed.SubstreamPartitionRouterwhere parent records are missing the expectedparent_keyfield.poetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -vNotes
Summary by CodeRabbit
New Features
Bug Fixes
Tests