Skip to content

perf(rollout): pack loss_masks as np.int8 at the ray.put boundary#2006

Open
Chasing1020 wants to merge 1 commit into
THUDM:mainfrom
Chasing1020:compress-loss-mask
Open

perf(rollout): pack loss_masks as np.int8 at the ray.put boundary#2006
Chasing1020 wants to merge 1 commit into
THUDM:mainfrom
Chasing1020:compress-loss-mask

Conversation

@Chasing1020

Copy link
Copy Markdown
Contributor

Address TODO at slime/ray/rollout.py:688. Per-sample loss masks are converted to np.int8 ndarrays in _convert_samples_to_train_data, right before train_data crosses into the Ray plasma store.

  • Sample.loss_mask upstream still produces/mutates list[int]; this preserves the existing list contract relied on by sglang_rollout's '+= [1]*n' increment, sglang_streaming_rollout's 'base + [1]*n' splice, trajectory.write_segment_to_sample's 'list(segment.loss_mask)' copy, and sft_rollout's '[-response_length:]' slice.
  • Downstream actor._get_rollout_data already does torch.tensor(t, dtype=torch.int, device=cuda) per sample, which accepts ndarray transparently.
rollout_data["loss_masks"] = [
    torch.tensor(t, dtype=torch.int, device=torch.cuda.current_device()) for t in rollout_data["loss_masks"] 
] # from list[numpy(int8)] -> list[tensor(int)]

Effects:

  • ~35x lower trainer-process Python heap after deserialization (1 B/token vs ~36 B/token in CPython list[int])
  • smaller Ray plasma payload

Address TODO at slime/ray/rollout.py:688. Per-sample loss masks are
converted to np.int8 ndarrays in _convert_samples_to_train_data, right
before train_data crosses into the Ray plasma store. This is purely a
storage-format change at the boundary:

- Sample.loss_mask upstream still produces/mutates list[int]; this
  preserves the existing list contract relied on by sglang_rollout's
  '+= [1]*n' increment, sglang_streaming_rollout's 'base + [1]*n' splice,
  trajectory.write_segment_to_sample's 'list(segment.loss_mask)' copy,
  and sft_rollout's '[-response_length:]' slice.
- Downstream actor._get_rollout_data already does
  torch.tensor(t, dtype=torch.int, device=cuda) per sample, which accepts
  ndarray transparently.
- log_rollout_data has loss_masks in its explicit skip list, and
  log_multi_turn_data uses v.shape[0] which works on ndarray.

Effects:
- ~35x lower trainer-process Python heap after deserialization
  (1 B/token vs ~36 B/token in CPython list[int])
- ~2x smaller Ray plasma payload (pickle's BININT1 opcode already
  compacts list[int] to ~2 B/token; the win is on the deserialized side)
- ~180x faster H2D prep at actor.py:223 (torch.tensor accepts the ndarray
  buffer directly instead of iterating the list element-by-element);
  measured 1549 us -> 8.5 us for a 32K-element mask

Also removes the side-effect on sample.loss_mask in the default-mask
and remove_sample branches: the function no longer mutates the input
sample, only its derived ndarray copy lives in train_data.
Copilot AI review requested due to automatic review settings June 2, 2026 07:46

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

This PR reduces serialization overhead for loss_mask data during Ray rollouts by converting masks to compact np.int8 arrays as part of training-data preparation.

Changes:

  • Replaced per-sample loss_mask list construction with np.int8 arrays (np.zeros/np.ones/np.asarray).
  • Consolidated remove_sample handling into the new mask construction path.
  • Updated comments to reflect the new packing strategy.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread slime/ray/rollout.py
Comment on lines +695 to +697
assert (
len(sample.loss_mask) == sample.response_length
), f"loss mask length {len(sample.loss_mask)} != response length {sample.response_length}"
Comment thread slime/ray/rollout.py
Comment on lines 688 to 700
loss_masks = []
for sample in samples:
# always instantiate loss_mask if not provided
if sample.loss_mask is None:
sample.loss_mask = [1] * sample.response_length

assert (
len(sample.loss_mask) == sample.response_length
), f"loss mask length {len(sample.loss_mask)} != response length {sample.response_length}"
if sample.remove_sample:
sample.loss_mask = [0] * sample.response_length
loss_masks.append(sample.loss_mask)
mask = np.zeros(sample.response_length, dtype=np.int8)
elif sample.loss_mask is None:
mask = np.ones(sample.response_length, dtype=np.int8)
else:
assert (
len(sample.loss_mask) == sample.response_length
), f"loss mask length {len(sample.loss_mask)} != response length {sample.response_length}"
mask = np.asarray(sample.loss_mask, dtype=np.int8)
loss_masks.append(mask)
train_data["loss_masks"] = loss_masks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants