Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,080 changes: 669 additions & 411 deletions Cargo.lock

Large diffs are not rendered by default.

42 changes: 30 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ arbitrary = "1.3.2"
arc-swap = "1.8"
arcref = "0.2.0"
arrayref = "0.3.7"
arrow-arith = "57.1"
arrow-array = "57.1"
arrow-buffer = "57.1"
arrow-cast = "57.1"
arrow-data = "57.1"
arrow-ipc = "57.1"
arrow-ord = "57.1"
arrow-schema = "57.1"
arrow-select = "57.1"
arrow-string = "57.1"
arrow-arith = "58"
arrow-array = "58"
arrow-buffer = "58"
arrow-cast = "58"
arrow-data = "58"
arrow-ipc = "58"
arrow-ord = "58"
arrow-schema = "58"
arrow-select = "58"
arrow-string = "58"
async-fs = "2.2.0"
async-lock = "3.4"
async-stream = "0.3.6"
Expand Down Expand Up @@ -170,14 +170,14 @@ noodles-bgzf = "0.44.0"
noodles-vcf = "0.82.0"
num-traits = "0.2.19"
num_enum = { version = "0.7.3", default-features = false }
object_store = { version = "0.12.4", default-features = false }
object_store = { version = "0.13.1", default-features = false }
once_cell = "1.21"
oneshot = "0.1.13"
opentelemetry = "0.31.0"
opentelemetry-otlp = "0.31.0"
opentelemetry_sdk = "0.31.0"
parking_lot = { version = "0.12.3", features = ["nightly"] }
parquet = "57.1"
parquet = "58"
paste = "1.0.15"
pco = "1.0.1"
pin-project-lite = "0.2.15"
Expand Down Expand Up @@ -367,3 +367,21 @@ lto = false
[profile.bench_assert]
debug-assertions = true
inherits = "bench"

[patch.crates-io]
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-catalog = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-common = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-common-runtime = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-datasource = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-execution = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-expr = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-functions = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-pruning = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-sqllogictest = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
tpchgen = { git = "https://github.com/AdamGS/tpchgen-rs.git", branch = "adamg/bump-arrow-match-df" }
tpchgen-arrow = { git = "https://github.com/AdamGS/tpchgen-rs.git", branch = "adamg/bump-arrow-match-df" }
3 changes: 2 additions & 1 deletion vortex-bench/src/random_access/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use itertools::Itertools;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::file::metadata::PageIndexPolicy;
use stream::StreamExt;
use tokio::fs::File;
use vortex::array::Canonical;
Expand Down Expand Up @@ -100,7 +101,7 @@ impl ParquetRandomAccessor {
/// Open a Parquet file, parse the footer, and return a ready-to-use accessor.
pub async fn open(path: PathBuf, name: impl Into<String>) -> anyhow::Result<Self> {
let mut file = File::open(&path).await?;
let options = ArrowReaderOptions::new().with_page_index(true);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?;

let row_group_offsets = once(0)
Expand Down
1 change: 1 addition & 0 deletions vortex-cuda/src/pooled_read_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use object_store::GetOptions;
use object_store::GetRange;
use object_store::GetResultPayload;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path as ObjectPath;
use vortex::array::buffer::BufferHandle;
use vortex::buffer::Alignment;
Expand Down
54 changes: 32 additions & 22 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::file_sink_config::FileSinkConfig;
use datafusion_datasource::sink::DataSinkExec;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -251,16 +252,19 @@ impl FileFormat for VortexFormat {
let cache = file_metadata_cache.clone();

SpawnedTask::spawn(async move {
// Check if we have cached metadata for this file
if let Some(cached) = cache.get(&object)
&& let Some(cached_vortex) =
cached.as_any().downcast_ref::<CachedVortexMetadata>()
// Check if we have entry metadata for this file
if let Some(entry) = cache.get(&object.location)
&& entry.is_valid_for(&object)
&& let Some(cached_vortex) = entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
{
let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
return VortexResult::Ok((object.location, inferred_schema));
}

// Not cached or invalid - open the file
// Not entry or invalid - open the file
let reader = Arc::new(ObjectStoreReadAt::new(
store,
object.location.clone(),
Expand All @@ -276,7 +280,8 @@ impl FileFormat for VortexFormat {

// Cache the metadata
let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
cache.put(&object, cached_metadata);
let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
cache.put(&object.location, entry);

let inferred_schema = vxf.dtype().to_arrow_schema()?;
VortexResult::Ok((object.location, inferred_schema))
Expand Down Expand Up @@ -310,24 +315,28 @@ impl FileFormat for VortexFormat {
let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

SpawnedTask::spawn(async move {
// Try to get cached metadata first
let cached_metadata = file_metadata_cache.get(&object).and_then(|cached| {
cached
.as_any()
.downcast_ref::<CachedVortexMetadata>()
.map(|m| {
(
m.footer().dtype().clone(),
m.footer().statistics().cloned(),
m.footer().row_count(),
)
})
});
// Try to get entry metadata first
let cached_metadata = file_metadata_cache
.get(&object.location)
.filter(|entry| entry.is_valid_for(&object))
.and_then(|entry| {
entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
.map(|m| {
(
m.footer().dtype().clone(),
m.footer().statistics().cloned(),
m.footer().row_count(),
)
})
});

let (dtype, file_stats, row_count) = match cached_metadata {
Some(metadata) => metadata,
None => {
// Not cached - open the file
// Not entry - open the file
let reader = Arc::new(ObjectStoreReadAt::new(
store,
object.location.clone(),
Expand All @@ -348,8 +357,9 @@ impl FileFormat for VortexFormat {
})?;

// Cache the metadata
let cached = Arc::new(CachedVortexMetadata::new(&vxf));
file_metadata_cache.put(&object, cached);
let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
file_metadata_cache.put(&object.location, entry);

(
vxf.dtype().clone(),
Expand Down
8 changes: 5 additions & 3 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ impl FileOpener for VortexOpener {
.with_labels(labels);

if let Some(file_metadata_cache) = file_metadata_cache
&& let Some(file_metadata) = file_metadata_cache.get(&file.object_meta)
&& let Some(vortex_metadata) = file_metadata
&& let Some(entry) = file_metadata_cache.get(file.path())
&& entry.is_valid_for(&file.object_meta)
&& let Some(vortex_metadata) = entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
{
Expand All @@ -212,7 +214,7 @@ impl FileOpener for VortexOpener {
let expr_adapter = expr_adapter_factory.create(
Arc::clone(&unified_file_schema),
Arc::clone(&this_file_schema),
);
)?;

let simplifier = PhysicalExprSimplifier::new(&this_file_schema);

Expand Down
1 change: 1 addition & 0 deletions vortex-io/src/object_store/read_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use object_store::GetOptions;
use object_store::GetRange;
use object_store::GetResultPayload;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path as ObjectPath;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
Expand Down
1 change: 1 addition & 0 deletions vortex-io/src/object_store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::TryStreamExt;
use futures::stream::FuturesUnordered;
use object_store::MultipartUpload;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::path::Path;
Expand Down
1 change: 1 addition & 0 deletions vortex-jni/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use jni::objects::ReleaseMode;
use jni::sys::jlong;
use jni::sys::jobject;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path;
use prost::Message;
use url::Url;
Expand Down
2 changes: 1 addition & 1 deletion vortex-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ parking_lot = { workspace = true }
pyo3 = { workspace = true, features = ["abi3", "abi3-py311"] }
pyo3-bytes = { workspace = true }
pyo3-log = { workspace = true }
pyo3-object_store = { version = "0.7" }
pyo3-object_store = { version = "0.8" }
tokio = { workspace = true, features = ["fs", "rt-multi-thread"] }
url = { workspace = true }
vortex = { workspace = true, features = ["object_store", "tokio"] }
Expand Down
2 changes: 1 addition & 1 deletion vortex-sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ datafusion = { workspace = true }
datafusion-sqllogictest = { workspace = true }
futures.workspace = true
indicatif.workspace = true
sqllogictest = "0.28"
sqllogictest = "0.29.1"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex = { workspace = true, features = ["tokio"] }
Expand Down
Loading