Conversation
datafusion/datasource/src/morsel.rs
Outdated
| #[derive(Debug)] | ||
| pub struct Morsel { | ||
| /// The original [`PartitionedFile`] that this morsel belongs to | ||
| file: Arc<PartitionedFile>, |
There was a problem hiding this comment.
Do we still want PartitionedFile here? In my PR I figured out it is better to just initialize the config at file level and morselize from there, but perhaps it could be different in other cases?
There was a problem hiding this comment.
That is a good question -- I will see what it would look like without a partition file
I was thinking that some non trivial amount of the information from partitioned file was needed (like the schema and partition columns). However, now that i type that maybe not
|
What I am planning to do next in this PR (after I finish up some other things for work) is
I am also trying to figure out what types of queues would belong in the FIleStream (queues of MorselFutures perhaps 🤔 ) Or maybe I need to make a more explicit state machine
🤔 |
datafusion/datasource/src/morsel.rs
Outdated
| pub struct Morsel { | ||
| /// The original [`PartitionedFile`] that this morsel belongs to | ||
| file: Arc<PartitionedFile>, | ||
| /// File format specific information that describes the morsel, such as byte range, row group, etc. |
There was a problem hiding this comment.
In my PR it also shares ParquetMetadata on this level - without caching that saves doing the metadata IO/decode multiple times over when splitting is on a single file.
Currently the caching mechanism seems not really optimal when splitting a single file in PartitionedFile as there is no sharing - each partition will just try and start reading it.
datafusion/datasource/src/morsel.rs
Outdated
| /// 1. The API is `async` for file formats that require I/O operations to | ||
| /// determine the morsels, such as reading metadata from the file. | ||
| /// 2. | ||
| fn morselize(&self, file: Arc<PartitionedFile>) -> MorselOpenFuture; |
There was a problem hiding this comment.
In some cases we might also want to do (decoding/further processing) happening on a nr of files. E.g. with clickbench query 6, even when having 1 morsel = 1 file, I still see a small slowdown from having 100 files as 100 morsels:
│ QQuery 6 │ 5.77 / 6.12 ±0.35 / 6.72 ms │ 6.63 / 7.95 ±1.29 / 9.75 ms │ 1.30x slower │
There was a problem hiding this comment.
Query 6 is
SELECT MIN("EventDate"), MAX("EventDate") FROM hits
So it is just scanning a super small and very well compressible (sorted) column.
There was a problem hiding this comment.
Hmm... I suppose it was from doing some stuff over and over for morsels (in this case twice instead of once).
When also caching all of the file level stuff, we seem to be doing better here:
│ QQuery 6 │ 6.51 ms │ 7.33 ms │ 1.13x slower │
There was a problem hiding this comment.
Though it seems still for a really small query it seems slightly more eagerly loading/prefetching the ParquetMetadata might help in the uncached case (below is min/avg+std/max from here )
│ QQuery 6 │ 5.50 / 5.90 ±0.31 / 6.26 ms │ 6.65 / 8.86 ±2.00 / 12.13 ms │ 1.50x slower │
datafusion/datasource/src/morsel.rs
Outdated
| /// This is typically an `async` function that opens the file, and returns a | ||
| /// stream of Morsels that can be processed independently. The stream may yield | ||
| /// an error if the file cannot be opened or read. | ||
| pub type MorselOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Morsel>>>; |
There was a problem hiding this comment.
One part that seems clear to me now based on some benchmarking is that we will want to split morsels into smaller ones if we're running out of them (i.e. no more files to morselize and number of morsels is < threads).
One approach that seems useful on the benchmarks is n_threads * 2 or 1MB pieces (whichever leads to larger morsels) to increase parallelism when low amount of morsels while not increasing the overhead too much.
I think this still should allow for that, but at least we want to know:
- how many files yet to read
- how many morsels there are left to read
This might change as well a bit once we start doing split IO and CPU - I guess at that point a smaller morsel size might even start paying off as well (we avoid the syscalls, while perhaps decoding in even smaller chunks is better for cache).
There was a problem hiding this comment.
smaller ones if we're running out of them (i.e. no more files to morselize and number of morsels is < threads).
Is this the same thing as just using smaller morsels to begin with (rather than reading entire row groups, for example, pair down the plan to just read 2MB chunks "morsels")?
I realize we would have to work out of to avoid blowing out the IOs in this case
There was a problem hiding this comment.
With the current approach, using smaller morsels to begin with, some queries slow down because of the extra overhead of small morsels, i think mainly the queries that scan a lot of data are already well parallelizable (a large number of pretty balanced row groups on large columns).
Though it could be due to the current approach which does just the same IO over and over and it shows for these queries.
There was a problem hiding this comment.
Yeah, specifically maybe loading the page index
|
I was playing around with some other ideas this afternoon and I am somewhat worried about over thinking the API. I will try and push some updates tomorrow morning |
|
Update here is I think I have a reasonable API sketched out and I am 50% of the way through integrating it into FileStream. Once I get it more fully integrated into FileStream I'll let you know when it is ready for another look |
|
show benchmark queue |
This comment was marked as outdated.
This comment was marked as outdated.
add50da to
3ed647c
Compare
|
Update here is that I am pretty happy with how this API now looks in theory and how it integrates into the existing FileStream The next thing I will do is to try and implement the parquet opener in terms of the native morsel API and see how that goes and if I can replicate the improvements seen by Daniel in his PR |
| let load_future = async move { | ||
| let stream = file_opener | ||
| // open the file to get a stream | ||
| .open(file)? |
There was a problem hiding this comment.
This (currently) does some bunch of IO/CPU before creating the stream
There was a problem hiding this comment.
I removed this file opener "prefetching" here #20916 and get identical results (on local storage).
Perhaps we should also have a benchmark that either simulates high get latencies or uses s3 calls to see if it has any impact.
There was a problem hiding this comment.
Yeah, the prefetching I think is especially important for object stores (I think @thinkharderdev actually implemented it)
In general I have been thinking about how to best test this (especially the timing of CPU/IO requests). I like the idea of having a object store that simulates high latencies (aka looks like S3) on local disks...
There was a problem hiding this comment.
Let's create an issue seems like it should be pretty easy to add something for this :)
There was a problem hiding this comment.
We may already have one 🤔 https://docs.rs/object_store/0.13.1/object_store/throttle/struct.ThrottledStore.html
There was a problem hiding this comment.
Should probably add some randomization to it so it will simulate ranges like 10-250ms or something.
| /// to get the next morsels. Best practice is to run this in a task on a | ||
| /// separate IO runtime to ensure that CPU work is not blocked by IO work, | ||
| /// but this is not strictly required by the API. | ||
| io_future: Option<BoxFuture<'static, Result<()>>>, |
There was a problem hiding this comment.
I was thinking for IO / CPU separation we should be globally aware of outstanding requests so it also allows bounding the IO (on a local level)
There was a problem hiding this comment.
Something like a shared understanding across all the FileStreams of outstanding requests.. It sounds somewhat like MemoryManager or something similar -- maybe attached to the RuntimeEnv 🤔
It is a good idea -- I haven't quite worked out how the different FileStreams will collaborate (e.g. for work stealing)
| } | ||
|
|
||
| impl Morselizer for FileOpenerMorselizer { | ||
| fn morselize(&self, file: PartitionedFile) -> Result<Vec<Arc<dyn MorselPlanner>>> { |
There was a problem hiding this comment.
We might also want to be able to have morsels span multiple files (or otherwise somehow make reading small rowgroups across files efficient) - for locality / instruction cache / allocations ... it is more efficient to read a number of files across files until we have batch_size rows instead of decoding many record batches that are combined later in the plan.
There was a problem hiding this comment.
OTOH this also means more eager loading, so I guess it's something to carefully tune.
There was a problem hiding this comment.
In my mind the FileOpenMorselizer is basically a backwards compatibility shim -- and to get the best performance we'll have to implement specialized Morselizers for each file format.
| /// without requiring the scan to eagerly buffer an unbounded amount of work. | ||
| /// | ||
| /// TODO make this a config option | ||
| fn max_buffered_morsels() -> usize { |
There was a problem hiding this comment.
In my PR it doesn't do this:
- Any worker/partition can "morselize" a file (and then put them on the morsel queue)
- If morsels are running out, any worker will be able to split morsels and put them back on the queue
The work stealing isn't "stealing" from other partitions, but just picking the first item from a global queue (I think we don't have to have global / local queues - just a single global queue for files / morsels.
There was a problem hiding this comment.
I was thinking the less cross-core communication the better (so better to have per-thread work).
Though I do think the number of outstanding IOs makes more sense as some sort of global / per session setting... Though I am still thinking work stealing is a better strategy. I'll see what I can come up with
There was a problem hiding this comment.
Hmmm it makes it more complex though (also need to implement some work-stealing strategy...) with little benefit (as the morsels themselves don't hold much data) there won't be a lot of contention/cross-communication anyway...
Also I think it might be beneficial in certain cases (topk pruning) to execute the morsels in a predefined global order (e.g. for topk pruning) instead of per-partition.
There was a problem hiding this comment.
Good points. I am hoping that we'll soon be in a position to test these strategies (I think we can express it all in FileStream, and the basic morselizing API is th esame)
There was a problem hiding this comment.
I tend to agree with @Dandandan, a global morsel queue seems like it would be pretty lightweight in terms of cross-core communication as long as the morsels are reasonable sized.
There was a problem hiding this comment.
I guess if morsels start getting small (e.g. ~batch size?) then using
https://docs.rs/crossbeam-deque/latest/crossbeam_deque/ or Rayon for the CPU-bound work starts making sense.
Which issue does this PR close?
Rationale for this change
I am working on reviewing / helping with #20481 and I am trying to work out what a API for morsels can look like. See thoughts here #20529 (comment)
I am therefore sketching out what a morsel API can look like, along with
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?