Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ impl DatasetConsistencyWrapper {

/// Get a mutable reference to the dataset.
/// Always reloads to ensure strong consistency.
///
/// Acquires the write lock before reloading so that tokio's write-fairness
/// prevents reader starvation of the writer.
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.reload().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
let mut write_guard = self.0.write().await;
Self::reload_under_write_lock(&mut write_guard).await?;
Ok(DatasetWriteGuard { guard: write_guard })
}

/// Provide a known latest version of the dataset.
Expand All @@ -166,50 +168,57 @@ impl DatasetConsistencyWrapper {
}
}

/// Reload the dataset to the latest version.
/// Reload the dataset to the latest version (for the read path).
///
/// Takes a read lock first to check if a reload is needed, then upgrades
/// to a write lock only if necessary.
async fn reload(&self) -> Result<()> {
// First check if we need to reload (with read lock)
let read_guard = self.0.read().await;
let dataset_uri = read_guard.uri().to_string();
let current_version = read_guard.version().version;
log::debug!(
"Reload starting for uri={}, current_version={}",
dataset_uri,
current_version
);
let latest_version = read_guard.latest_version_id().await.map_err(|e| {
log::debug!("Reload starting for uri={dataset_uri}, current_version={current_version}",);
let latest_version = read_guard.latest_version_id().await.map_err(|err| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to get latest version: {}", e),
message: format!("Failed to get latest version: {err}"),
})
})?;
log::debug!(
"Reload got latest_version={} for uri={}, current_version={}",
latest_version,
dataset_uri,
current_version
"Reload got latest_version={latest_version} for uri={dataset_uri}, current_version={current_version}",
);
drop(read_guard);

// If already up-to-date, return early
if latest_version == current_version {
log::debug!("Already up-to-date for uri={}", dataset_uri);
log::debug!("Already up-to-date for uri={dataset_uri}");
return Ok(());
}

// Need to reload, acquire write lock
let mut write_guard = self.0.write().await;
Self::reload_under_write_lock(&mut write_guard).await
}

/// Reload the dataset while already holding the write lock.
async fn reload_under_write_lock(
dataset: &mut tokio::sync::RwLockWriteGuard<'_, Dataset>,
) -> Result<()> {
let dataset_uri = dataset.uri().to_string();
let current_version = dataset.version().version;
log::debug!(
"Reload (under write lock) for uri={dataset_uri}, current_version={current_version}",
);

// Double-check after acquiring write lock (someone else might have reloaded)
let latest_version = write_guard.latest_version_id().await.map_err(|e| {
let latest_version = dataset.latest_version_id().await.map_err(|err| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to get latest version: {}", e),
message: format!("Failed to get latest version: {err}"),
})
})?;

if latest_version != write_guard.version().version {
write_guard.checkout_latest().await.map_err(|e| {
if latest_version != current_version {
dataset.checkout_latest().await.map_err(|err| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to checkout latest: {}", e),
message: format!("Failed to checkout latest: {err}"),
})
})?;
}
Expand Down
Loading