From 70bc7d0a3801516cf677ed82271d836ea643abf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 12 Mar 2026 22:53:28 +0100 Subject: [PATCH 1/4] Remove file prefetching from FileStream Simplify the FileStream state machine by removing the mechanism that opens the next file in parallel while scanning the current one. Files are now opened sequentially (Scan -> Idle -> Open) instead of prefetching. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 110 ++--------------------- 1 file changed, 7 insertions(+), 103 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index b75e66849b7a1..35937cf28bb4b 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -22,7 +22,6 @@ //! compliant with the `SendableRecordBatchStream` trait. use std::collections::VecDeque; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -98,10 +97,6 @@ impl FileStream { self } - /// Begin opening the next file in parallel while decoding the current file in FileStream. - /// - /// Since file opening is mostly IO (and may involve a - /// bunch of sequential IO), it can be parallelized with decoding. fn start_next_file(&mut self) -> Option> { let part_file = self.file_iter.pop_front()?; Some(self.file_opener.open(part_file)) @@ -125,35 +120,10 @@ impl FileStream { FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { self.file_stream_metrics.files_opened.add(1); - // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); - let next = { - let scanning_total_metric = self - .file_stream_metrics - .time_scanning_total - .metrics - .clone(); - let _timer = scanning_total_metric.timer(); - self.start_next_file().transpose() - }; self.file_stream_metrics.time_scanning_until_data.start(); self.file_stream_metrics.time_scanning_total.start(); - - match next { - Ok(Some(next_future)) => { - self.state = FileStreamState::Scan { - reader, - next: Some(NextOpen::Pending(next_future)), - }; - } - Ok(None) => { - self.state = FileStreamState::Scan { reader, next: None }; - } - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - } + self.state = FileStreamState::Scan { reader }; } Err(e) => { self.file_stream_metrics.file_open_errors.add(1); @@ -170,14 +140,7 @@ impl FileStream { } } }, - FileStreamState::Scan { reader, next } => { - // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some(next_open_future) = next - && let NextOpen::Pending(f) = next_open_future - && let Poll::Ready(reader) = f.as_mut().poll(cx) - { - *next_open_future = NextOpen::Ready(reader); - } + FileStreamState::Scan { reader } => { match ready!(reader.poll_next_unpin(cx)) { Some(Ok(batch)) => { self.file_stream_metrics.time_scanning_until_data.stop(); @@ -189,12 +152,9 @@ impl FileStream { batch } else { let batch = batch.slice(0, *remain); - // Count this file, the prefetched next file - // (if any), and all remaining files we will - // never open. - let done = 1 - + self.file_iter.len() - + usize::from(next.is_some()); + // Count this file and all remaining files + // we will never open. + let done = 1 + self.file_iter.len(); self.file_stream_metrics .files_processed .add(done); @@ -214,29 +174,9 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); match self.on_error { - // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => { self.file_stream_metrics.files_processed.add(1); - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = - FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin( - std::future::ready(reader), - ), - } - } - } - } - None => return Poll::Ready(None), - } + self.state = FileStreamState::Idle; } OnError::Fail => { self.state = FileStreamState::Error; @@ -248,26 +188,7 @@ impl FileStream { self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); - - match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), - } - } - } - } - None => return Poll::Ready(None), - } + self.state = FileStreamState::Idle; } } } @@ -323,14 +244,6 @@ pub trait FileOpener: Unpin + Send + Sync { fn open(&self, partitioned_file: PartitionedFile) -> Result; } -/// Represents the state of the next `FileOpenFuture`. Since we need to poll -/// this future while scanning the current file, we need to store the result if it -/// is ready -pub enum NextOpen { - Pending(FileOpenFuture), - Ready(Result>>), -} - pub enum FileStreamState { /// The idle state, no file is currently being read Idle, @@ -345,10 +258,6 @@ pub enum FileStreamState { Scan { /// The reader instance reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option, }, /// Encountered an error Error, @@ -388,11 +297,6 @@ pub struct FileStreamMetrics { /// /// Time between when [`FileOpener::open`] is called and when the /// [`FileStream`] receives a stream for reading. - /// - /// If there are multiple files being scanned, the stream - /// will open the next file in the background while scanning the - /// current file. This metric will only capture time spent opening - /// while not also scanning. /// [`FileStream`]: pub time_opening: StartableTime, /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding From 38fe60a77465d316eae2392f0711bd0726ae0358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 12 Mar 2026 23:02:11 +0100 Subject: [PATCH 2/4] More coalesce --- .../core/tests/datasource/object_store_access.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 30654c687f8d2..58c91de6a61de 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -478,8 +478,8 @@ async fn query_single_parquet_file() { RequestCountingObjectStore() Total Requests: 3 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=4-534,534-1064 - - GET (ranges) path=parquet_table.parquet ranges=1064-1594,1594-2124 + - GET (ranges) path=parquet_table.parquet ranges=4-1064 + - GET (ranges) path=parquet_table.parquet ranges=1064-2124 " ); } @@ -502,7 +502,7 @@ async fn query_single_parquet_file_with_single_predicate() { RequestCountingObjectStore() Total Requests: 2 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 + - GET (ranges) path=parquet_table.parquet ranges=1064-2124 " ); } @@ -526,8 +526,8 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() { RequestCountingObjectStore() Total Requests: 3 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=4-421,421-534,534-951,951-1064 - - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 + - GET (ranges) path=parquet_table.parquet ranges=4-1064 + - GET (ranges) path=parquet_table.parquet ranges=1064-2124 " ); } From e2f0106488a1609baa3857591bbc31bcac182d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 12 Mar 2026 23:03:18 +0100 Subject: [PATCH 3/4] Revert "More coalesce" This reverts commit 38fe60a77465d316eae2392f0711bd0726ae0358. --- .../core/tests/datasource/object_store_access.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 58c91de6a61de..30654c687f8d2 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -478,8 +478,8 @@ async fn query_single_parquet_file() { RequestCountingObjectStore() Total Requests: 3 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=4-1064 - - GET (ranges) path=parquet_table.parquet ranges=1064-2124 + - GET (ranges) path=parquet_table.parquet ranges=4-534,534-1064 + - GET (ranges) path=parquet_table.parquet ranges=1064-1594,1594-2124 " ); } @@ -502,7 +502,7 @@ async fn query_single_parquet_file_with_single_predicate() { RequestCountingObjectStore() Total Requests: 2 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=1064-2124 + - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 " ); } @@ -526,8 +526,8 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() { RequestCountingObjectStore() Total Requests: 3 - GET (opts) path=parquet_table.parquet head=true - - GET (ranges) path=parquet_table.parquet ranges=4-1064 - - GET (ranges) path=parquet_table.parquet ranges=1064-2124 + - GET (ranges) path=parquet_table.parquet ranges=4-421,421-534,534-951,951-1064 + - GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124 " ); } From ba2863c610af5439441d98fb764be391ccddd3dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 12 Mar 2026 23:20:59 +0100 Subject: [PATCH 4/4] Metric --- datafusion/datasource/src/file_stream.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 35937cf28bb4b..76279ab9ffa19 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -105,18 +105,17 @@ impl FileStream { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { - FileStreamState::Idle => { - self.file_stream_metrics.time_opening.start(); - - match self.start_next_file().transpose() { - Ok(Some(future)) => self.state = FileStreamState::Open { future }, - Ok(None) => return Poll::Ready(None), - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } + FileStreamState::Idle => match self.start_next_file().transpose() { + Ok(Some(future)) => { + self.file_stream_metrics.time_opening.start(); + self.state = FileStreamState::Open { future }; } - } + Ok(None) => return Poll::Ready(None), + Err(e) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { self.file_stream_metrics.files_opened.add(1);