Skip to content

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889

Open
darynaishchenko wants to merge 11 commits intomainfrom
daryna/fix-substream-partition-router
Open

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
darynaishchenko wants to merge 11 commits intomainfrom
daryna/fix-substream-partition-router

Conversation

@darynaishchenko
Copy link
Contributor

@darynaishchenko darynaishchenko commented Feb 2, 2026

Summary

This PR fixes an issue where SubstreamPartitionRouter would incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughout stream_slices() to gracefully handle:

  1. Empty partition generators - iterate_with_last_flag now yields (None, True) sentinel when the input generator is empty, and stream_slices() breaks out of the partition loop when partition is None (preserving multi-parent behavior)
  2. Empty record iterators - When a partition has no records, cursor observation and slice emission are skipped (guarded by if parent_record is not None)
  3. KeyError during partition value extraction - Uses skip_slice flag instead of continue to ensure close_partition() and ensure_at_least_one_state_emitted() are always called, even when dpath.get fails

Updates since last revision

  • Changed if partition is None: return to if partition is None: break per review feedback. Using break ensures that if one parent stream has no partitions, we continue processing subsequent parent_stream_configs rather than exiting the entire method.
  • Added test_substream_partition_router_closes_all_partitions_even_when_no_records to verify that cursor.close_partition() is called for ALL partitions, including those with no records.
  • Fixed MyPy type error: Updated iterate_with_last_flag return type from tuple[T, bool] to tuple[T | None, bool] to properly reflect that the function yields (None, True) as a sentinel for empty generators.
  • Fixed expected state in test_substream_slicer_parent_state_update_with_cursor to include proper lookback_window: 1 and state values.
  • NEW: Implemented skip_slice flag pattern per CodeRabbit review feedback. Replaced continue with a flag-based approach when dpath.get raises KeyError, ensuring partition closure always runs even when slice emission is skipped. Added test_substream_partition_router_closes_partition_even_when_parent_key_missing to verify this behavior.

Review & Testing Checklist for Human

  • Verify skip_slice logic: The new skip_slice flag ensures close_partition() is called even when parent_key extraction fails with KeyError. Confirm this doesn't introduce regressions in normal slice emission.
  • Verify multi-parent behavior: The if partition is None: break exits only the current parent's partition loop. Test with a connector that has multiple parent_stream_configs where one parent has no partitions to confirm subsequent parents are still processed.
  • Test with real connector: Test with a connector using SubstreamPartitionRouter where parent records are missing the expected parent_key field.
  • Run full test suite: poetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -v

Notes

Summary by CodeRabbit

  • New Features

    • Added an iterator utility that flags the final element and emits a None marker when the source is exhausted.
  • Bug Fixes

    • Prevented emitting slices for empty/None partitions and added early exit when parent partitions yield no records.
    • Guarded partition extraction and cursor/state updates so they run only when a valid parent record exists.
    • Ensured partition cursors are closed correctly even when partitions have no records.
  • Tests

    • Added unit tests covering the new iterator and edge-case partition behaviors.

@darynaishchenko darynaishchenko self-assigned this Feb 2, 2026
@github-actions
Copy link

github-actions bot commented Feb 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-router

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Changed SubstreamPartitionRouter to add iterate_with_last_flag (now yields (T | None, bool)), handle exhausted generators by yielding (None, True), and guard all parent_record-dependent processing (cursor observation, partition tracking, partition_value/extra_fields/lazy pointer extraction, and slice emission) behind non-None checks; outer loop now breaks early on None partitions.

Changes

Cohort / File(s) Summary
Substream partition router
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Added iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]; when input is exhausted it yields (None, True); guarded processing that depends on parent_record (cursor/state observation, partition association, partition_value extraction via dpath, extra_fields, lazy_read_pointer), preserved KeyError handling inside guarded path, emit StreamSlice only for non-None parent_record, and break outer loop early if a partition is None.
Unit tests for edge cases
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Added tests importing iterate_with_last_flag; new tests cover (item, is_last) semantics for empty/single/multiple inputs and SubstreamPartitionRouter behaviors: no slice/no cursor update for partitions with no records, handling empty parent partitions (early exit), and ensuring close_partition is called for all partitions even when no records are emitted.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt?

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main fix: preventing cursor updates when no records are read in a partition, which aligns with the core behavioral changes in SubstreamPartitionRouter.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch daryna/fix-substream-partition-router

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)

42-49: ⚠️ Potential issue | 🔴 Critical

Type annotation mismatch causing pipeline failure.

The pipeline is flagging a type error here. Yielding (None, True) produces tuple[None, bool], but the return type expects tuple[T, bool]. Since the sentinel introduces None as a possible value, would updating the return type to Optional[T] work for you, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Fast)

3 865 tests  +10   3 853 ✅ +10   6m 34s ⏱️ +7s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 367f291. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Full)

3 868 tests  +10   3 856 ✅ +10   10m 47s ⏱️ -14s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 367f291. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@darynaishchenko
Copy link
Contributor Author

darynaishchenko commented Feb 2, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21601466980

devin-ai-integration bot and others added 2 commits February 3, 2026 15:38
…titionRouter null handling

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)

42-49: ⚠️ Potential issue | 🟠 Major

Fix mypy type mismatch in iterate_with_last_flag.
Line 48 yields (None, True), but the signature advertises tuple[T, bool], which is tripping mypy. Could we widen the return type to Optional[T] to match the sentinel, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:

186-187: ⚠️ Potential issue | 🟡 Minor

Update docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emits parent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?

📝 Suggested doc tweak
-        If a parent slice contains no record, emit a slice with parent_record=None.
+        If a parent slice contains no record, emit no slice.

devin-ai-integration bot and others added 3 commits February 5, 2026 10:36
… when no records

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

Comment on lines 231 to 267
try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
continue

# Add extra fields
extracted_extra_fields = self._extract_extra_fields(
record_data, extra_fields
)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
record_data,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()

yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
if parent_record is not None:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid skipping partition closure when partition-value extraction fails.

The early continue in the extraction-failure path skips close_partition() / ensure_at_least_one_state_emitted() when that record is last, leaving the cursor partition open. Could we switch to a skip_slice flag so closure always runs and only slice emission is skipped, wdyt?

🔧 Possible fix
                     for parent_record, is_last_record_in_slice in iterate_with_last_flag(
                         partition.read()
                     ):
-                        if parent_record is not None:
+                        skip_slice = True
+                        if parent_record is not None:
                             # In the previous CDK implementation, state management was done internally by the stream.
                             # However, this could cause issues when doing availability check for example as the availability
                             # check would progress the state so state management was moved outside of the read method.
                             # Hence, we need to call the cursor here.
                             # Note that we call observe and close_partition before emitting the associated record as the
                             # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
                             # record was consumed.
                             parent_stream.cursor.observe(parent_record)
                             parent_partition = (
                                 parent_record.associated_slice.partition
                                 if parent_record.associated_slice
                                 else {}
                             )
                             record_data = parent_record.data
 
                             try:
                                 partition_value = dpath.get(
                                     record_data,  # type: ignore [arg-type]
                                     parent_field,
                                 )
                             except KeyError:
                                 # FIXME a log here would go a long way for debugging
-                                continue
-
-                            # Add extra fields
-                            extracted_extra_fields = self._extract_extra_fields(
-                                record_data, extra_fields
-                            )
-
-                            if parent_stream_config.lazy_read_pointer:
-                                extracted_extra_fields = {
-                                    "child_response": self._extract_child_response(
-                                        record_data,
-                                        parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
-                                    ),
-                                    **extracted_extra_fields,
-                                }
+                                skip_slice = True
+                            else:
+                                skip_slice = False
+
+                                # Add extra fields
+                                extracted_extra_fields = self._extract_extra_fields(
+                                    record_data, extra_fields
+                                )
+
+                                if parent_stream_config.lazy_read_pointer:
+                                    extracted_extra_fields = {
+                                        "child_response": self._extract_child_response(
+                                            record_data,
+                                            parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
+                                        ),
+                                        **extracted_extra_fields,
+                                    }
 
                         if is_last_record_in_slice:
                             parent_stream.cursor.close_partition(partition)
                             if is_last_slice:
                                 parent_stream.cursor.ensure_at_least_one_state_emitted()
 
-                        if parent_record is not None:
+                        if not skip_slice:
                             yield StreamSlice(
                                 partition={
                                     partition_field: partition_value,
                                     "parent_slice": parent_partition or {},
                                 },
                                 cursor_slice={},
                                 extra_fields=extracted_extra_fields,
                             )
🤖 Prompt for AI Agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`
around lines 231 - 267, The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

…_update_with_cursor

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
devin-ai-integration bot and others added 3 commits February 10, 2026 10:34
…Error

- Replace 'continue' with 'skip_slice' flag when dpath.get fails with KeyError
- This ensures close_partition() and ensure_at_least_one_state_emitted() are
  always called, even when partition value extraction fails
- Add test for partition closure when parent_key is missing from record

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
- Renamed skip_slice to emit_slice and inverted the logic
- Now emit_slice=False by default, set to True when extraction succeeds
- Makes the code more intuitive: we emit slices when conditions are met

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… exists, False on KeyError

- emit_slice = parent_record is not None (default True when we have a record)
- Set emit_slice = False only when KeyError occurs during dpath.get
- This makes the logic clearer: we want to emit slices by default, skip only on error

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants