Skip to content

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940

Open
aunjgr wants to merge 1 commit into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2
Open

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940
aunjgr wants to merge 1 commit into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2

Conversation

@aunjgr

@aunjgr aunjgr commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #24097

What this PR does / why we need it:

Shuffle v2 (in-process ShufflePoolV2) was only used on single-CN because its Call() method only served the current worker's bucket. On multi-CN source scopes where Mcpu=1 (maxHolders==1), the single worker wrote rows to all N buckets but only drained bucket 0 — data for remote CNs was stranded in the pool.

Fix: When maxHolders == 1, Call() uses getAnyFullBatch() / getAnyLastBatch() to serve batches from all buckets. This lets the downstream Dispatch operator route each batch to the correct target CN based on ShuffleIDX.

Compile changes: The multi-CN constructor paths (constructShuffleOperatorForJoin and constructShuffleArgForGroup) now create v2 shuffle operators instead of v1. Single-CN paths (compileShuffleJoinV2 / compileShuffleGroupV2) are unchanged.

remoterun.go: Added vm.ShuffleV2 serialization/deserialization.

How scattering/gathering works:

Source CN (Mcpu=1):
  [child ops] -> [v2 Shuffle] -> [Dispatch]
                    |                 |
                    | set ShuffleIDX  | routes bat to register[ShuffleIDX]
                    | serve all N     |
                    | buckets         +- LocalRegs  -> PipelineSpool (same CN)
                    |                 +- RemoteRegs -> MoRPC (remote CN)

Target CN: MergeReceiver -> [HashJoin / Group]

🤖 Generated with Claude Code

When maxHolders == 1 (single worker on multi-CN source scope),
Call() serves full batches from all buckets instead of only
CurrentShuffleIdx. This lets dispatch route data to the correct
target CNs.

Also swap multi-CN compile constructors to use v2 operators:
- constructShuffleOperatorForJoin: v1 -> v2
- constructShuffleArgForGroup: v1 -> v2

Add vm.ShuffleV2 serialization/deserialization in remoterun.go.
@aunjgr aunjgr requested a review from ouyuanning as a code owner June 11, 2026 10:41
@qodo-code-review

Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement kind/feature size/S Denotes a PR that changes [10,99] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants