Skip to content

[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064

Open
steFaiz wants to merge 5 commits into
apache:masterfrom
steFaiz:chunk_shuffle_support
Open

[python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset#8064
steFaiz wants to merge 5 commits into
apache:masterfrom
steFaiz:chunk_shuffle_support

Conversation

@steFaiz
Copy link
Copy Markdown
Contributor

@steFaiz steFaiz commented Jun 1, 2026

Purpose

This PR will close: #8010
I tested a data-evolution table of 100,000,000 records, several structured columns and a blob column.
The result is as below:

Metrics Value
Plan 49.78s
AVG Per chunk read 1.199s
chunk size 100
AVG chunk Arrow size 41.14 MiB
AVG chunk file num 81
columns length, image_name, conversations, width, height, image_count, dataset, image_bytes

I directly test reading tables on dfs, it costs a lot to plan i.e. generate 1 million DataSplits and shuffle them. This is because generating 1 million objects in Python is heavy. This will be completed within several hundred of millisecond is Java.

Next step I'll try to add shuffle and buffered shuffle for Pytorch Paimon Dataset.

Tests

See Unit Tests

@JingsongLi
Copy link
Copy Markdown
Contributor

JingsongLi commented Jun 1, 2026

Hi @steFaiz Thanks for the contribution! Maybe just implementing shuffle in Pytorch Paimon Dataset in this PR? And document it.

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two issues worth addressing:

  1. paimon-python/pypaimon/read/scanner/chunk_shuffle_split_generator.py:358

The DE chunk-shuffle path silently drops files whose row_id_range() is None. _split_by_row_id_with_range() first builds list_ranges from only non-null ranges, then skips every file whose range is None. With normal DE planning, such metadata fails fast; with with_chunk_shuffle, the same table can return incomplete or even empty splits.

A minimal shape is a data-evolution table whose files have no first_row_id (for example, data-evolution.enabled=true without row tracking / legacy metadata). Normal new_scan().plan() raises, while new_scan().with_chunk_shuffle(...).plan() produces []. Please validate this invariant and raise instead of silently continuing, or explicitly fall back to the regular DE split generator.

  1. paimon-python/pypaimon/read/table_scan.py:162

The new API is implemented as with_chunk_shuffle(seed, chunk_size), but the discussion PR it closes documents usage as with_chunk_shuffle(chunk_size, seed). Since positional calls are natural here, users can silently swap the values and get a tiny chunk_size or unexpected shuffle seed. The current tests all use keyword arguments, so they do not catch this. Please align the order with the documented API, or make the parameters keyword-only to prevent accidental misuse.

I verified the added test file locally:

PYTHONPATH=. python -m pytest -q pypaimon/tests/scanner/chunk_shuffle_split_generator_test.py
# 36 passed

@steFaiz
Copy link
Copy Markdown
Contributor Author

steFaiz commented Jun 2, 2026

Hi @steFaiz Thanks for the contribution! Maybe just implementing shuffle in Pytorch Paimon Dataset in this PR? And document it.

Of course! I'll research on existing shuffle and buffer shuffle in pytorch, and implement them in this PR (as well as address your comments)

@steFaiz steFaiz changed the title [python] support chunk shuffle for append table [wip][python] support chunk shuffle for append table Jun 2, 2026
@steFaiz
Copy link
Copy Markdown
Contributor Author

steFaiz commented Jun 2, 2026

I introduced 3-layers shuffle in pytorch integration. including:

  1. File meta layer chunk-shuffle. This relies on random-access-optimized data format.
  2. Interleaving several chunks
  3. A buffer for shuffle

The usage is simple:

from torch.utils.data import DataLoader

seed = 42

# do chunk-shuffle in planning. This is optional
table_scan = read_builder.new_scan().with_chunk_shuffle(
    seed=seed,
    chunk_size=1000,
)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()

dataset = table_read.to_torch(
    splits,
    streaming=True,
    shuffle=True,
    seed=seed,
    # buffer shuffle
    buffer_size=1000,
    # interleave splits
    max_buffer_input_splits=10,
)

dataloader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=2,
    shuffle=False,
)

I refer to the HuggingFace Iterable Dataset: https://github.com/huggingface/datasets/blob/main/src/datasets/iterable_dataset.py

@steFaiz steFaiz changed the title [wip][python] support chunk shuffle for append table [python] support chunk shuffle for planning and 3-layer shuffle for pytorch Dataset Jun 2, 2026
raise ValueError("%s must be a positive int" % name)
return value

def set_epoch(self, epoch: int) -> "TorchShuffledIterDataset":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_epoch() does not work once the dataset is already held by persistent DataLoader workers. With DataLoader(..., num_workers>0, persistent_workers=True), the worker processes keep their own Dataset instances alive across epochs, so calling dataset.set_epoch(1) in the parent process only updates the parent copy. The workers still use the old self.epoch here when building the buffer-shuffle RNG, which makes the shuffle order repeat even though the docs say callers can set the epoch before iterating the DataLoader for the next epoch. Could we either propagate epoch through worker-visible shared state, or document/reject persistent workers and require rebuilding the DataLoader? It would be good to add a test for this mode as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I studied that the Dataloader mechanism is as below:
image

The main process will create multiple worker processes through LINUX fork, they share the same memory but with COW protection.
If persistent worker is true, workers are reused across different epochs. If the main process changes the epoch, workers won't see it because of COW.

Now I also refer to huggingface Dataset, use torch.Tensor.share_memory_() which will store the shared data in a special file, so that the changes are visible to all processes.

@JingsongLi JingsongLi closed this Jun 3, 2026
@JingsongLi JingsongLi reopened this Jun 3, 2026
@JingsongLi
Copy link
Copy Markdown
Contributor

I rechecked the current head (b887962e). The API order and the persistent-worker set_epoch() concern look addressed now, but the DE chunk-shuffle row-id issue from the earlier review is still reproducible.

DataEvolutionChunkShuffleSplitGenerator._split_by_row_id_with_range() still filters out files whose row_id_range() is None and returns no chunks when all files are missing first_row_id. Since data-evolution.enabled and row-tracking.enabled are independent table options, a table can have data-evolution.enabled=true without row tracking. In that case the normal DE scan fails during planning, but the chunk-shuffle path silently returns an empty plan and reading it yields 0 rows.

Minimal shape I verified locally:

schema = Schema.from_pyarrow_schema(
    pa.schema([('id', pa.int32()), ('v', pa.string())]),
    options={'data-evolution.enabled': 'true'},
)
# write 3 rows...
rb = table.new_read_builder()
rb.new_scan().plan()  # AttributeError: 'NoneType' object has no attribute 'from_'
rb.new_scan().with_chunk_shuffle(seed=1, chunk_size=2).plan().splits()  # []
rb.new_read().to_arrow([]).num_rows  # 0

Could we make the chunk-shuffle path fail fast with a clear error when self.data_evolution is true but row tracking / first_row_id is unavailable, or otherwise fall back to the regular DE generator behavior? Silently returning an empty plan is dangerous because it looks like a valid empty table.

I also reran:

PYTHONPATH=. python -m pytest -q pypaimon/tests/scanner/chunk_shuffle_split_generator_test.py
# 36 passed

@steFaiz
Copy link
Copy Markdown
Contributor Author

steFaiz commented Jun 3, 2026

@JingsongLi Thanks for your review! I thought that it's illegal to set data-evolution.enabled = true while row-tracking.enabled = false. This is ensured in SchemaValidation in java.

I've add the fast-fail path if first_row_id is None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants