Skip to content

Commit 733a117

Browse files
feat!(catalog-managed): New copy_atomic StorageHandler method (delta-io#1400)
## What changes are proposed in this pull request? Adds a new required method: `copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>` to `StorageHandler`. This PR also adds support for the default engine via the (dumb) way of GET/PUT. Note that I've elected to pursue the simple/correct thing here and we can attempt to optimize in the future (and can open a follow-up if others agree). ~This implementation proposes a slight departure from existing `Engine` APIs: instead of returning a `DeltaResult<()>` we return `Result<(), CopyError>` with CopyError defined as:~ <details> <summary>old pieces on CopyError omitted</summary> ```rust #[derive(thiserror::Error, Debug)] pub enum CopyError { #[error("Destination file already exists: {0}")] DestinationAlreadyExists(String), #[error(transparent)] Other(#[from] Box<dyn std::error::Error + Send + Sync>), } ``` It captures the only things we care about from the `copy` API perspective: either the destination already exists and we can return a nice error message to the user saying their commit has already been published (considering publishing is the main use case of this API for now) _or_ we just got back some other random error which we don't really care what it is, but rather just something we can surface to the user and fail the overall publish API. I've used this PR as an opportunity to introduce an Engine API more aligned with our pursuit of finer-grainer errors (especially for Engine trait) but happy to split out if we think it's better to just retain existing `DeltaResult` pattern. </details> ### Motivation This PR will be used for commit publishing - basically copying commits from staged commits to published commits. See delta-io#1377 for some context on future usage. ### This PR affects the following public APIs New required method in `StorageHandler` trait: `copy_atomic` ## How was this change tested? new UT for default engine impl
1 parent 09a82a3 commit 733a117

File tree

4 files changed

+69
-1
lines changed

4 files changed

+69
-1
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use delta_kernel_derive::internal_api;
55
use futures::stream::StreamExt;
66
use itertools::Itertools;
77
use object_store::path::Path;
8-
use object_store::{DynObjectStore, ObjectStore};
8+
use object_store::{DynObjectStore, ObjectStore, PutMode};
99
use url::Url;
1010

1111
use super::UrlExt;
@@ -175,6 +175,31 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
175175

176176
Ok(Box::new(receiver.into_iter()))
177177
}
178+
179+
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
180+
let src_path = Path::from_url_path(src.path())?;
181+
let dest_path = Path::from_url_path(dest.path())?;
182+
let dest_path_str = dest_path.to_string();
183+
let store = self.inner.clone();
184+
185+
// Read source file then write atomically with PutMode::Create. Note that a GET/PUT is not
186+
// necessarily atomic, but since the source file is immutable, we aren't exposed to the
187+
// possiblilty of source file changing while we do the PUT.
188+
self.task_executor.block_on(async move {
189+
let data = store.get(&src_path).await?.bytes().await?;
190+
191+
store
192+
.put_opts(&dest_path, data.into(), PutMode::Create.into())
193+
.await
194+
.map_err(|e| match e {
195+
object_store::Error::AlreadyExists { .. } => {
196+
Error::FileAlreadyExists(dest_path_str)
197+
}
198+
e => e.into(),
199+
})?;
200+
Ok(())
201+
})
202+
}
178203
}
179204

180205
#[cfg(test)]
@@ -300,4 +325,36 @@ mod tests {
300325
}
301326
assert_eq!(len, 10, "list_from should have returned 10 files");
302327
}
328+
329+
#[tokio::test]
330+
async fn test_copy() {
331+
let tmp = tempfile::tempdir().unwrap();
332+
let store = Arc::new(LocalFileSystem::new());
333+
let executor = Arc::new(TokioBackgroundExecutor::new());
334+
let handler = ObjectStoreStorageHandler::new(store.clone(), executor);
335+
336+
// basic
337+
let data = Bytes::from("test-data");
338+
let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap();
339+
store.put(&src_path, data.clone().into()).await.unwrap();
340+
let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap();
341+
let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap();
342+
assert!(handler.copy_atomic(&src_url, &dest_url).is_ok());
343+
let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap();
344+
assert_eq!(
345+
store.get(&dest_path).await.unwrap().bytes().await.unwrap(),
346+
data
347+
);
348+
349+
// copy to existing fails
350+
assert!(matches!(
351+
handler.copy_atomic(&src_url, &dest_url),
352+
Err(Error::FileAlreadyExists(_))
353+
));
354+
355+
// copy from non-existing fails
356+
let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap();
357+
let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap();
358+
assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err());
359+
}
303360
}

kernel/src/engine/sync/storage.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ impl StorageHandler for SyncStorageHandler {
7070
});
7171
Ok(Box::new(iter))
7272
}
73+
74+
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
75+
unimplemented!("SyncStorageHandler does not implement copy");
76+
}
7377
}
7478

7579
#[cfg(test)]

kernel/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,10 @@ pub trait StorageHandler: AsAny {
534534
&self,
535535
files: Vec<FileSlice>,
536536
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
537+
538+
/// Copy a file atomically from source to destination. If the destination file already exists,
539+
/// it must return Err(Error::FileAlreadyExists).
540+
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
537541
}
538542

539543
/// Provides JSON handling functionality to Delta Kernel.

kernel/src/listed_log_files.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,9 @@ mod list_log_files_with_log_tail_tests {
634634
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<bytes::Bytes>>>> {
635635
panic!("read_files used");
636636
}
637+
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
638+
panic!("copy used from {src} to {dest}");
639+
}
637640
}
638641

639642
// when log_tail covers the entire requested range, no filesystem listing should occur

0 commit comments

Comments
 (0)