Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 32 additions & 34 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,43 @@ pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(PipeReader,
///
/// At least one of `source` and `target` must be some sort of pipe.
/// To get around this requirement, consider splicing from your source into
/// a [`pipe`] and then from the pipe into your target (with `splice_exact`):
/// a [`pipe`] and then from the pipe into your target (with `drain_pipe`):
/// this is still very efficient.
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io::Result<usize> {
rustix::pipe::splice(source, None, target, None, len, SpliceFlags::empty())
}

/// Try to splice `len` bytes from `source` into `target`.
/// splice `len` bytes from `pipe` into `dest`.
///
/// Note that this splice_exact does not provide bytes sent when it failed.
/// In the case failed relaying splice via pipe, all content of the pipe
/// should be drained by `read` to keep bytes sent accurate.
/// returns Ok(Ok(())) if splice succeed
/// returns Ok(Err()) if splice failed, but read/write fallback succeed
/// returns std::io::Result if read/write fallback failed too
///
/// Thus, ?.is_err() returns serious error at early stage and checks that splice failed
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_exact(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io::Result<()> {
pub fn drain_pipe(
pipe: &PipeReader,
dest: &impl AsFd,
len: usize,
) -> std::io::Result<Result<(), ()>> {
let mut left = len;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut left = len;
let mut remaining = len;

while left > 0 {
let written = splice(source, target, left)?;
debug_assert_ne!(written, 0, "unexpected end of data");
left -= written;
if let Ok(s) = splice(pipe, dest, left) {
left -= s;
} else {
// read/write fallback
// use read_to_end to make pipe empty for the case write failed
debug_assert!(left <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity(left);
pipe.take(left as u64).read_to_end(&mut drain)?;
RawWriter(&dest).write_all(&drain)?;
return Ok(Err(()));
}
}
Ok(())
Ok(Ok(()))
}

/// check that source is FUSE
Expand Down Expand Up @@ -129,17 +143,7 @@ pub fn splice_unbounded_broker(
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(Ok(())),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using unbuffered read/write. Then
// we tell the caller to fall back.
// use read_to_end to drain pipe for the case write failed
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
RawWriter(&dest).write_all(&drain)?;
if drain_pipe(pipe_rd, dest, n)?.is_err() {
return Ok(Err(()));
}
}
Expand Down Expand Up @@ -207,21 +211,15 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res
match splice(&input, &broker_w, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s) => {
if splice_exact(&broker_r, &target, s).is_ok() {
n -= s as u64;
bytes_written += s as u64;
if n == 0 {
// avoid unnecessary splice for small input
break false;
}
} else {
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
// use read_to_end to drain pipe at this fallback for the case write failed
let mut drain = Vec::with_capacity(s);
broker_r.take(s as u64).read_to_end(&mut drain)?;
RawWriter(&target).write_all(&drain)?;
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break true;
}
if n == 0 {
// avoid unnecessary splice for small input
break false;
}
}
_ => break true,
}
Expand Down
Loading