diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d15a2473..5532529f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,13 +23,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **AAudio**: `supported_input_configs` and `supported_output_configs` now return an error for direction-mismatched devices (e.g. querying input configs on an output-only device) instead of silently returning an empty list. +- **ALSA**: Device disconnection now stops the stream with `StreamError::DeviceNotAvailable` instead of looping. +- **ALSA**: Polling errors trigger underrun recovery instead of looping. +- **ALSA**: Try to resume from hardware after a system suspend. - **ASIO**: `Device::driver`, `asio_streams`, and `current_callback_flag` are no longer `pub`. ### Fixed - Reintroduce `audio_thread_priority` feature. - Fix numeric overflows in calls to create `StreamInstant` in ASIO, CoreAudio and JACK. +- **ALSA**: Fix capture stream hanging or spinning on overruns. - **ALSA**: Fix spurious timestamp errors during stream startup. +- **ALSA**: Fix spurious timeout errors during polling. +- **ALSA**: Fix rare panics when dropping the stream is interrupted. - **ASIO**: Fix enumeration returning only the first device when using `collect`. - **Emscripten**: Fix build failure introduced by newer `wasm-bindgen` versions. diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index eb9211080..5011bcd68 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -265,23 +265,41 @@ struct TriggerReceiver(libc::c_int); impl TriggerSender { fn wakeup(&self) { let buf = 1u64; - let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) }; - assert_eq!(ret, 8); + loop { + let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) }; + if ret == 8 { + return; + } + // write() can be interrupted by a signal before writing any bytes; retry. + assert_eq!(ret, -1, "wakeup: unexpected return value {ret}"); + if std::io::Error::last_os_error().kind() != std::io::ErrorKind::Interrupted { + panic!("wakeup: {}", std::io::Error::last_os_error()); + } + } } } impl TriggerReceiver { fn clear_pipe(&self) { let mut out = 0u64; - let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) }; - assert_eq!(ret, 8); + loop { + let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) }; + if ret == 8 { + return; + } + // read() can be interrupted by a signal before reading any bytes; retry. + assert_eq!(ret, -1, "clear_pipe: unexpected return value {ret}"); + if std::io::Error::last_os_error().kind() != std::io::ErrorKind::Interrupted { + panic!("clear_pipe: {}", std::io::Error::last_os_error()); + } + } } } -fn trigger() -> (TriggerSender, TriggerReceiver) { +fn trigger() -> (TriggerSender, Arc) { let mut fds = [0, 0]; match unsafe { libc::pipe(fds.as_mut_ptr()) } { - 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])), + 0 => (TriggerSender(fds[1]), Arc::new(TriggerReceiver(fds[0]))), _ => panic!("Could not create pipe"), } } @@ -391,7 +409,6 @@ impl Device { } // Pre-compute a period-sized buffer filled with silence values. - let period_frames = period_samples / conf.channels as usize; let period_bytes = period_samples * sample_format.sample_size(); let mut silence_template = vec![0u8; period_bytes].into_boxed_slice(); @@ -407,7 +424,6 @@ impl Device { num_descriptors, conf: conf.clone(), period_samples, - period_frames, silence_template, can_pause, creation_instant, @@ -692,7 +708,6 @@ struct StreamInner { // Cached values for performance in audio callback hot path period_samples: usize, - period_frames: usize, silence_template: Box<[u8]>, #[allow(dead_code)] @@ -728,6 +743,10 @@ pub struct Stream { /// Used to signal to stop processing. trigger: TriggerSender, + + /// Keeps the read end of the self-pipe alive for the lifetime of the Stream, so that + /// `trigger.wakeup()` never writes to a closed pipe, even if the worker exited early. + _rx: Arc, } // Compile-time assertion that Stream is Send and Sync @@ -788,7 +807,7 @@ impl StreamWorkerContext { } fn input_stream_worker( - rx: TriggerReceiver, + rx: Arc, stream: &StreamInner, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), @@ -798,28 +817,15 @@ fn input_stream_worker( let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx); loop { - let flow = - poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| { - error_callback(err.into()); - PollDescriptorsFlow::Continue - }); - - match flow { - PollDescriptorsFlow::Continue => { - continue; - } - PollDescriptorsFlow::XRun => { - error_callback(StreamError::BufferUnderrun); - if let Err(err) = stream.channel.prepare() { - error_callback(err.into()); - } - continue; - } - PollDescriptorsFlow::Return => return, - PollDescriptorsFlow::Ready { + if stream.dropping.load(Ordering::Acquire) { + return; + } + match poll_for_period(&rx, stream, &mut ctxt) { + Ok(Poll::Pending) => continue, + Ok(Poll::Ready { status, delay_frames, - } => { + }) => { if let Err(err) = process_input( stream, &mut ctxt.transfer_buffer, @@ -830,12 +836,31 @@ fn input_stream_worker( error_callback(err.into()); } } + Err(StreamError::BufferUnderrun) => { + error_callback(StreamError::BufferUnderrun); + + // Input streams don't have an automatic start threshold, so restart manually. + if let Err(err) = stream.channel.prepare() { + error_callback(err.into()); + } else if let Err(err) = stream.channel.start() { + error_callback(err.into()); + } + continue; + } + Err(StreamError::DeviceNotAvailable) => { + error_callback(StreamError::DeviceNotAvailable); + return; + } + Err(err) => { + error_callback(err); + continue; + } } } } fn output_stream_worker( - rx: TriggerReceiver, + rx: Arc, stream: &StreamInner, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), @@ -846,36 +871,39 @@ fn output_stream_worker( let mut ctxt = StreamWorkerContext::new(&timeout, stream, &rx); loop { - let flow = - poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| { - error_callback(err.into()); - PollDescriptorsFlow::Continue - }); - - match flow { - PollDescriptorsFlow::Continue => continue, - PollDescriptorsFlow::XRun => { - error_callback(StreamError::BufferUnderrun); - if let Err(err) = stream.channel.prepare() { - error_callback(err.into()); - } - continue; - } - PollDescriptorsFlow::Return => return, - PollDescriptorsFlow::Ready { + if stream.dropping.load(Ordering::Acquire) { + return; + } + match poll_for_period(&rx, stream, &mut ctxt) { + Ok(Poll::Pending) => continue, + Ok(Poll::Ready { status, delay_frames, - } => { + }) => { if let Err(err) = process_output( stream, &mut ctxt.transfer_buffer, status, delay_frames, data_callback, - error_callback, ) { + error_callback(err); + } + } + Err(StreamError::BufferUnderrun) => { + error_callback(StreamError::BufferUnderrun); + if let Err(err) = stream.channel.prepare() { error_callback(err.into()); } + continue; + } + Err(StreamError::DeviceNotAvailable) => { + error_callback(StreamError::DeviceNotAvailable); + return; + } + Err(err) => { + error_callback(err); + continue; } } } @@ -900,28 +928,20 @@ fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRat #[cfg(not(feature = "audio_thread_priority"))] fn boost_current_thread_priority(_: BufferSize, _: SampleRate) {} -enum PollDescriptorsFlow { - Continue, - Return, +enum Poll { + Pending, Ready { status: alsa::pcm::Status, delay_frames: usize, }, - XRun, } // This block is shared between both input and output stream worker functions. -fn poll_descriptors_and_prepare_buffer( +fn poll_for_period( rx: &TriggerReceiver, stream: &StreamInner, ctxt: &mut StreamWorkerContext, -) -> Result { - if stream.dropping.load(Ordering::Acquire) { - // The stream has been requested to be destroyed. - rx.clear_pipe(); - return Ok(PollDescriptorsFlow::Return); - } - +) -> Result { let StreamWorkerContext { ref mut descriptors, ref poll_timeout, @@ -930,35 +950,43 @@ fn poll_descriptors_and_prepare_buffer( let res = alsa::poll::poll(descriptors, *poll_timeout)?; if res == 0 { - let description = String::from("`alsa::poll()` spuriously returned"); - return Err(BackendSpecificError { description }); + // poll() returned 0: either a timeout or a spurious wakeup. Nothing to do. + return Ok(Poll::Pending); } if descriptors[0].revents != 0 { - // The stream has been requested to be destroyed. + // Self-pipe fired: the stream is being dropped. Clear the pipe and let the + // worker loop detect the dropping flag on the next iteration. rx.clear_pipe(); - return Ok(PollDescriptorsFlow::Return); + return Ok(Poll::Pending); } let revents = stream.channel.revents(&descriptors[1..])?; - if revents.contains(alsa::poll::Flags::ERR) { - let description = String::from("`alsa::poll()` returned POLLERR"); - return Err(BackendSpecificError { description }); + // No events: spurious wakeup, poll again. + if revents.is_empty() { + return Ok(Poll::Pending); } - - // Check if data is ready for processing (either input or output) - if !revents.contains(alsa::poll::Flags::IN) && !revents.contains(alsa::poll::Flags::OUT) { - // Nothing to process, poll again - return Ok(PollDescriptorsFlow::Continue); + // POLLHUP/POLLNVAL: the device has been disconnected. + if revents.intersects(alsa::poll::Flags::HUP | alsa::poll::Flags::NVAL) { + return Err(StreamError::DeviceNotAvailable); } + // POLLERR signals an xrun or suspend; avail() below returns EPIPE/ESTRPIPE accordingly. + // POLLIN/POLLOUT: data is ready, fall through to process it. let status = stream.channel.status()?; let avail_frames = match stream.channel.avail() { - Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun), + // Xrun: recover via prepare() (+ start() for capture, handled by the worker). + Err(err) if err.errno() == libc::EPIPE => return Err(StreamError::BufferUnderrun), + // Suspend: try hardware resume first; fall back to prepare() if unsupported. + Err(err) if err.errno() == libc::ESTRPIPE => match stream.channel.resume() { + Ok(()) => return Ok(Poll::Pending), + Err(e) if e.errno() == libc::EAGAIN => return Ok(Poll::Pending), + Err(e) if e.errno() == libc::ENOSYS => return Err(StreamError::BufferUnderrun), + Err(e) => return Err(e.into()), + }, res => res, }? as usize; let delay_frames = match status.get_delay() { - // Buffer underrun detected, but notification happens in XRun handler d if d < 0 => 0, d => d as usize, }; @@ -969,10 +997,10 @@ fn poll_descriptors_and_prepare_buffer( // Verify we have room for at least one full period before processing. // See: https://bugzilla.kernel.org/show_bug.cgi?id=202499 if available_samples < stream.period_samples { - return Ok(PollDescriptorsFlow::Continue); + return Ok(Poll::Pending); } - Ok(PollDescriptorsFlow::Ready { + Ok(Poll::Ready { status, delay_frames, }) @@ -1010,16 +1038,13 @@ fn process_input( } // Request data from the user's function and write it via ALSA. -// -// Returns `true` fn process_output( stream: &StreamInner, buffer: &mut [u8], status: alsa::pcm::Status, delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), - error_callback: &mut dyn FnMut(StreamError), -) -> Result<(), BackendSpecificError> { +) -> Result<(), StreamError> { // Buffer is always pre-filled with equilibrium, user overwrites what they want buffer.copy_from_slice(&stream.silence_template); { @@ -1044,35 +1069,15 @@ fn process_output( data_callback(&mut data, &info); } - loop { - match stream.channel.io_bytes().writei(buffer) { - Err(err) if err.errno() == libc::EPIPE => { - // ALSA underrun or overrun. - // See https://github.com/alsa-project/alsa-lib/blob/b154d9145f0e17b9650e4584ddfdf14580b4e0d7/src/pcm/pcm.c#L8767-L8770 - // Even if these recover successfully, they still may cause audible glitches. - - error_callback(StreamError::BufferUnderrun); - if let Err(recover_err) = stream.channel.try_recover(err, true) { - error_callback(recover_err.into()); - } - } - Err(err) => { - error_callback(err.into()); - continue; - } - Ok(result) if result != stream.period_frames => { - let description = format!( - "unexpected number of frames written: expected {}, \ - result {result} (this should never happen)", - stream.period_frames - ); - error_callback(BackendSpecificError { description }.into()); - continue; - } - _ => { - break; - } + // try_recover handles both xrun (EPIPE) and suspend (ESTRPIPE) during write. + if let Err(err) = stream.channel.io_bytes().writei(buffer) { + if matches!(err.errno(), libc::EPIPE | libc::ESTRPIPE) { + return match stream.channel.try_recover(err, true) { + Ok(()) => Err(StreamError::BufferUnderrun), + Err(recover_err) => Err(recover_err.into()), + }; } + return Err(err.into()); } Ok(()) } @@ -1150,13 +1155,13 @@ impl Stream { E: FnMut(StreamError) + Send + 'static, { let (tx, rx) = trigger(); - // Clone the handle for passing into worker thread. + let rx_thread = rx.clone(); let stream = inner.clone(); let thread = thread::Builder::new() .name("cpal_alsa_in".to_owned()) .spawn(move || { input_stream_worker( - rx, + rx_thread, &stream, &mut data_callback, &mut error_callback, @@ -1168,6 +1173,7 @@ impl Stream { thread: Some(thread), inner, trigger: tx, + _rx: rx, } } @@ -1182,13 +1188,13 @@ impl Stream { E: FnMut(StreamError) + Send + 'static, { let (tx, rx) = trigger(); - // Clone the handle for passing into worker thread. + let rx_thread = rx.clone(); let stream = inner.clone(); let thread = thread::Builder::new() .name("cpal_alsa_out".to_owned()) .spawn(move || { output_stream_worker( - rx, + rx_thread, &stream, &mut data_callback, &mut error_callback, @@ -1200,6 +1206,7 @@ impl Stream { thread: Some(thread), inner, trigger: tx, + _rx: rx, } } }