Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,34 @@ test C/C++ compilation: `cargo test --test compile_and_run_test -- --ignored`
- Arrow C Data Interface for zero-copy data exchange.
- `panic = "abort"` in release to prevent unwinding across FFI.

## Coding Standards

### Cross-Language Bindings
- Keep C/C++ bindings as thin wrappers — centralize validation and logic in the Rust core.
- Keep parameter names consistent across all bindings (Rust, C, C++) — rename everywhere or nowhere.
- Never break public API signatures — deprecate with `#[deprecated]` and add a new method.
- Replace mutually exclusive boolean flags with a single enum/mode parameter.

### Naming
- Name variables after what the value *is* (e.g., `partition_id` not `mask`).
- Drop redundant prefixes when the struct/module already implies the domain.
- Use `indices` (not `indexes`) consistently in all APIs and docs.

### Error Handling
- Validate inputs and reject invalid values with descriptive errors at API boundaries — never silently clamp or adjust.
- Include full context in error messages: variable names, values, sizes, types.

### Testing
- All bugfixes and features must have corresponding tests.
- Cover NULL/empty edge cases.
- Include multi-fragment scenarios for dataset operations.

### Dependencies
- Prefer the standard library or existing workspace dependencies before adding new external crates.
- Keep `Cargo.lock` changes intentional; revert unrelated dependency bumps.

## Adding New APIs
1. Add `extern "C"` function in `src/`.
2. Add declaration to `include/lance.h`.
3. Add C++ wrapper to `include/lance.hpp`.
4. Add test in `tests/c_api_test.rs`.

2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ snafu = "0.9"
[dev-dependencies]
lance = "3.0.1"
lance-datagen = "3.0.1"
lance-file = "3.0.1"
lance-table = "3.0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
arrow-array = "57.0.0"
arrow-schema = "57.0.0"
Expand Down
29 changes: 29 additions & 0 deletions include/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,35 @@ int32_t lance_batch_to_arrow(
/** Free a batch handle. */
void lance_batch_free(LanceBatch* batch);

/* ─── Fragment writer ─── */

/**
* Write an Arrow record batch stream to fragment files at `uri`.
*
* Designed for embedded / robotics C++ pipelines: write Lance fragment files
* locally with minimal overhead. A separate Rust finalizer process later
* reconstructs Fragment metadata from the file footers and commits them
* into a dataset on a remote data lake via CommitBuilder.
*
* The data is written but NOT committed — no dataset manifest is created or
* updated. The written .lance files under <uri>/data/ contain full metadata
* in their footers (schema with field IDs, row counts, format version).
*
* @param uri Directory URI for fragment files (file://, s3://, etc.)
* @param schema Required Arrow schema. The stream schema must match
* or the call fails with LANCE_ERR_INVALID_ARGUMENT.
* @param stream Arrow C Data Interface stream; consumed by this call —
* do not use the stream after returning.
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
* @return 0 on success, -1 on error
*/
int32_t lance_write_fragments(
const char* uri,
const struct ArrowSchema* schema,
struct ArrowArrayStream* stream,
const char* const* storage_opts
);

#ifdef __cplusplus
} /* extern "C" */
#endif
Expand Down
38 changes: 38 additions & 0 deletions include/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,42 @@ class Batch {

} // namespace lance

// ─── Fragment writer (free functions) ────────────────────────────────────────

namespace lance {

/**
* Write an Arrow record batch stream to fragment files at `uri`.
*
* Data files are written under `<uri>/data/`. A Rust finalizer reconstructs
* Fragment metadata from the file footers and commits via CommitBuilder.
* No dynamic memory is returned to the caller.
*
* @param uri Directory URI (file://, s3://, etc.)
* @param schema Required Arrow schema — stream schema must match.
* @param stream ArrowArrayStream to consume. Must not be used after this call.
* @param storage_opts Key-value storage options, or empty for defaults.
* @throws lance::Error on failure.
*/
inline void write_fragments(
const std::string& uri,
const ArrowSchema* schema,
ArrowArrayStream* stream,
const std::vector<std::pair<std::string, std::string>>& storage_opts = {})
{
std::vector<const char*> kv;
for (auto& [k, v] : storage_opts) {
kv.push_back(k.c_str());
kv.push_back(v.c_str());
}
kv.push_back(nullptr);

const char* const* opts_ptr = storage_opts.empty() ? nullptr : kv.data();
if (lance_write_fragments(uri.c_str(), schema, stream, opts_ptr) != 0) {
check_error();
}
}

} // namespace lance

#endif /* LANCE_HPP */
140 changes: 140 additions & 0 deletions src/fragment_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Fragment writer C API: write Arrow data to local fragment files without committing.
//!
//! Designed for embedded / robotics C++ pipelines where sensor data is ingested
//! at high frequency on edge devices. The C++ process writes Lance fragment files
//! locally with minimal overhead (no manifest, no coordination). A separate Rust
//! finalizer process later reads the file footers, reconstructs fragment metadata,
//! and commits them into a dataset on a remote data lake (S3, GCS, etc.).
//!
//! # Two-process workflow
//!
//! **1. Writer process (C/C++ on edge device):**
//! ```c
//! // Stream sensor batches into local fragment files.
//! int32_t rc = lance_write_fragments(
//! "file:///data/staging/robot.lance", &schema, &stream, NULL);
//! ```
//!
//! **2. Finalizer process (Rust, runs periodically or on sync):**
//! ```text
//! // Scan data/*.lance files, reconstruct Fragment metadata from file footers,
//! // then commit via CommitBuilder to publish to the data lake.
//! ```

use std::ffi::c_char;
use std::sync::Arc;

use arrow::ffi::FFI_ArrowSchema;
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow::record_batch::RecordBatchReader;
use arrow_schema::Schema as ArrowSchema;
use lance::dataset::{InsertBuilder, WriteParams};
use lance_core::Result;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};

use crate::error::ffi_try;
use crate::helpers;
use crate::runtime::block_on;

/// Write an Arrow record batch stream to fragment files at `uri`.
///
/// The data is written but **not committed** — no dataset manifest is created
/// or updated. The written `.lance` files under `<uri>/data/` contain full
/// metadata in their footers (schema with field IDs, row counts, format version).
/// A Rust finalizer can reconstruct `Fragment` metadata by reading these footers
/// and commit via `CommitBuilder`.
///
/// - `uri`: Directory URI where fragment files are written (`file://`, `s3://`, etc.)
/// - `schema`: Required Arrow schema. The stream's schema must match; the call
/// fails fast with `LANCE_ERR_INVALID_ARGUMENT` on mismatch.
/// - `stream`: Arrow C Data Interface stream consumed by this call. The caller
/// must not use the stream after this function returns.
/// - `storage_opts`: NULL-terminated key-value pairs `["key","val",NULL]`, or NULL.
///
/// Returns 0 on success, -1 on error.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_write_fragments(
uri: *const c_char,
schema: *const FFI_ArrowSchema,
stream: *mut FFI_ArrowArrayStream,
storage_opts: *const *const c_char,
) -> i32 {
ffi_try!(
unsafe { write_fragments_inner(uri, schema, stream, storage_opts) },
neg
)
}

unsafe fn write_fragments_inner(
uri: *const c_char,
schema: *const FFI_ArrowSchema,
stream: *mut FFI_ArrowArrayStream,
storage_opts: *const *const c_char,
) -> Result<i32> {
if uri.is_null() || schema.is_null() || stream.is_null() {
return Err(lance_core::Error::InvalidInput {
source: "uri, schema, and stream must not be NULL".into(),
location: snafu::location!(),
});
}

let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| {
lance_core::Error::InvalidInput {
source: "uri must not be empty".into(),
location: snafu::location!(),
}
})?;

// Import the caller-provided schema from the Arrow C Data Interface.
let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("invalid schema: {e}").into(),
location: snafu::location!(),
}
})?;

let opts = unsafe { helpers::parse_storage_options(storage_opts)? };

// Consume the C stream into an Arrow RecordBatch reader.
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| {
lance_core::Error::InvalidInput {
source: e.to_string().into(),
location: snafu::location!(),
}
})?;

// Fail fast: compare the stream schema against the caller-provided schema.
let stream_schema = reader.schema();
if stream_schema.fields() != expected_schema.fields() {
return Err(lance_core::Error::InvalidInput {
source: format!(
"stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}"
)
.into(),
location: snafu::location!(),
});
}

let mut params = WriteParams::default();
if !opts.is_empty() {
params.store_params = Some(ObjectStoreParams {
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
opts,
))),
..ObjectStoreParams::default()
});
}

// Write fragment data files. The Transaction result is discarded —
// the finalizer reconstructs Fragment metadata from the file footers.
let _transaction = block_on(
InsertBuilder::new(uri_str)
.with_params(&params)
.execute_uncommitted_stream(reader),
)?;

Ok(0)
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod async_dispatcher;
mod batch;
mod dataset;
mod error;
mod fragment_writer;
mod helpers;
pub mod runtime;
mod scanner;
Expand All @@ -29,4 +30,5 @@ pub use dataset::*;
pub use error::{
LanceErrorCode, lance_free_string, lance_last_error_code, lance_last_error_message,
};
pub use fragment_writer::*;
pub use scanner::*;
Loading
Loading