Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,191 changes: 740 additions & 451 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ members = [
"src/daft-cli",
"src/daft-text"
]
exclude = [
"examples/hello"
]

[workspace.dependencies]
arrow = "57.1.0"
Expand Down Expand Up @@ -267,9 +270,11 @@ daft-catalog = {path = "src/daft-catalog"}
daft-context = {path = "src/daft-context"}
daft-core = {path = "src/daft-core"}
daft-dsl = {path = "src/daft-dsl"}
daft-ext-abi = {path = "src/daft-ext-abi", default-features = false}
daft-ext-core = {path = "src/daft-ext-core", default-features = false}
daft-ext-internal = {path = "src/daft-ext-internal", default-features = false}
daft-ext-abi = {path = "src/daft-ext-abi"}
daft-ext-core = {path = "src/daft-ext-core"}
daft-ext-internal = {path = "src/daft-ext-internal"}
daft-ext-macros = {path = "src/daft-ext-macros"}
daft-ext = {path = "src/daft-ext"}
daft-file = {path = "src/daft-file"}
daft-functions = {path = "src/daft-functions"}
daft-functions-binary = {path = "src/daft-functions-binary"}
Expand Down
108 changes: 73 additions & 35 deletions docs/extensions/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
> Please see the [prompt](#prompt) if you want help generating an extension.

This document is a guide for authoring Daft native extensions in Rust.
Daft supports native Rust extensions by leveraging a stable C ABI and Arrow FFI. Today we support authoring native
Daft supports native Rust extensions by leveraging a stable C ABI based on the
[Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html).
Extensions are **not coupled** to any particular Arrow library version. The ABI boundary uses
plain C structs (`ArrowSchema`, `ArrowArray`) so your extension can use any arrow-rs version
(or even a different Arrow implementation entirely). Today we support authoring native
scalar functions, but are actively working on additional native extension features.

## Example
Expand Down Expand Up @@ -93,16 +97,18 @@ crate-type = ["cdylib"]

[dependencies]
daft-ext = <version>
arrow-array = "57.1.0"
arrow-schema = "57.1.0"
daft-ext-abi = { version = <version>, features = ["arrow-58"] }
arrow = { version = "58", features = ["ffi"] }
```

!!! tip "Arrow types"
!!! tip "Arrow version freedom"

Use `arrow-array` builders and downcasting directly for working with data.
The `daft-ext` prelude re-exports common types like `ArrayRef` and `Field`.
Import `arrow_array::Array` for the `len()` and `is_null()` methods, and
`arrow_array::cast::AsArray` for downcasting (e.g., `as_string`).
The `daft-ext` ABI uses C Data Interface types — your extension is **not** pinned to
Daft's arrow-rs version. Enable a feature flag on `daft-ext-abi` matching your arrow-rs
version (`arrow-56`, `arrow-57`, or `arrow-58`) to get safe `.into()` conversions
between arrow-rs FFI types and the ABI types. For unsupported versions, use the
`from_owned`/`into_owned`/`from_raw`/`as_raw` escape hatches on `ArrowArray`
and `ArrowSchema`.

Then update the pyproject to use `setuptools-rust` as the build system.

Expand Down Expand Up @@ -164,14 +170,15 @@ cat src/lib.rs
```

```rust
use std::ffi::CStr;
use std::sync::Arc;
use std::{ffi::CStr, sync::Arc};

use arrow_array::{Array, ArrayRef};
use arrow_array::builder::StringBuilder;
use arrow_array::cast::AsArray;
use arrow_schema::{DataType, Field};
use arrow::{
array::{Array, builder::StringBuilder, cast::AsArray},
datatypes::{DataType, Field, Schema},
ffi::FFI_ArrowSchema,
};
use daft_ext::prelude::*;
use daft_ext_abi::{ArrowData, ArrowSchema};

// ── Module ──────────────────────────────────────────────────────────

Expand All @@ -181,7 +188,6 @@ use daft_ext::prelude::*;
struct HelloExtension;

impl DaftExtension for HelloExtension {

/// This is the extension install hook for defining functions in the session.
/// Called once when the extension is loaded into a session. Register each function here.
fn install(session: &mut dyn DaftSession) {
Expand All @@ -202,25 +208,42 @@ impl DaftScalarFunction for Greet {
}

/// Type checking.
/// Given the input `Field` schemas, validate types and return the output `Field`.
fn return_field(&self, args: &[Field]) -> DaftResult<Field> {
/// Receives input fields as C Data Interface `ArrowSchema` types.
/// Use `.as_raw()` / `.into()` to convert between arrow-rs and ABI types.
fn return_field(&self, args: &[ArrowSchema]) -> DaftResult<ArrowSchema> {
if args.len() != 1 {
return Err(DaftError::TypeError(
format!("greet: expected 1 argument, got {}", args.len()),
));
return Err(DaftError::TypeError(format!(
"greet: expected 1 argument, got {}",
args.len()
)));
}
if *args[0].data_type() != DataType::Utf8 && *args[0].data_type() != DataType::LargeUtf8 {
return Err(DaftError::TypeError(
format!("greet: expected string argument, got {:?}", args[0].data_type()),
));
let ffi_schema: &FFI_ArrowSchema = unsafe { args[0].as_raw() };
let field = Field::try_from(ffi_schema)
.map_err(|e| DaftError::TypeError(e.to_string()))?;
let dt = field.data_type();
if *dt != DataType::Utf8 && *dt != DataType::LargeUtf8 {
return Err(DaftError::TypeError(format!(
"greet: expected string argument, got {:?}",
dt
)));
}
Ok(Field::new("greet", DataType::Utf8, true))
let out_schema = Schema::new(vec![Field::new("greet", DataType::Utf8, true)]);
let ffi = FFI_ArrowSchema::try_from(&out_schema)
.map_err(|e| DaftError::TypeError(e.to_string()))?;
Ok(ffi.into())
}

/// Evaluation. Receives Arrow arrays, returns an Arrow array. Operates on entire columns at once.
/// Evaluation. Receives columns as C Data Interface `ArrowData` types.
/// Use `.into()` to convert to/from arrow-rs FFI types.
/// All data flows through Arrow arrays — no per-row Python overhead.
fn call(&self, args: &[ArrayRef]) -> DaftResult<ArrayRef> {
let names = args[0].as_string::<i64>();
fn call(&self, args: &[ArrowData]) -> DaftResult<ArrowData> {
let data = unsafe { ArrowData::take_arg(args, 0) };
let ffi_array: arrow::ffi::FFI_ArrowArray = data.array.into();
let ffi_schema: arrow::ffi::FFI_ArrowSchema = data.schema.into();
let arrow_data = unsafe { arrow::ffi::from_ffi(ffi_array, &ffi_schema) }
.map_err(|e| DaftError::RuntimeError(e.to_string()))?;
let input = arrow::array::make_array(arrow_data);
let names = input.as_string::<i64>();
let mut builder = StringBuilder::with_capacity(names.len(), names.len() * 16);
for i in 0..names.len() {
if names.is_null(i) {
Expand All @@ -229,11 +252,25 @@ impl DaftScalarFunction for Greet {
builder.append_value(format!("Hello, {}!", names.value(i)));
}
}
Ok(Arc::new(builder.finish()))
let output = builder.finish();
let (out_arr, out_sch) = arrow::ffi::to_ffi(&output.to_data())
.map_err(|e| DaftError::RuntimeError(e.to_string()))?;
Ok(ArrowData {
array: out_arr.into(),
schema: out_sch.into(),
})
}
}
```

!!! tip "ABI pattern"

The `DaftScalarFunction` trait uses C Data Interface types (`ArrowSchema`, `ArrowData`)
at the ABI boundary. Enable a `daft-ext-abi` feature flag (`arrow-56`, `arrow-57`, or
`arrow-58`) matching your arrow-rs version to get `.into()` conversions. Use `.as_raw()`
for zero-copy borrows. This decoupling means your extension is not tied to Daft's
arrow-rs version.

!!! tip "String types"

Daft uses `LargeUtf8` (i64 offsets) for strings internally. When downcasting string arrays,
Expand Down Expand Up @@ -380,17 +417,18 @@ Follow the Daft extension authoring guide at docs/extensions/index.md. Here is a

## Rust conventions

- Use `daft_ext::prelude::*` for all imports.
- Import `arrow_array::Array` for `len()`/`is_null()` and `arrow_array::cast::AsArray` for downcasting.
- Use `daft_ext::prelude::*` for all imports (provides `ArrowSchema`, `ArrowData`, errors, traits).
- Add `daft-ext-abi` with a feature flag matching your arrow version (`arrow-56`, `arrow-57`, or `arrow-58`) for `.into()` conversions.
- Import `arrow::array::Array` for `len()`/`is_null()` and `arrow::array::cast::AsArray` for downcasting.
- Daft uses `LargeUtf8` (i64 offsets) for strings — downcast with `as_string::<i64>()`, never `i32`.
- Apply `#[daft_extension]` to a struct implementing `DaftExtension`.
- Register each function in `install()` via `session.define_function(Arc::new(MyFn))`.
- Each function is a struct implementing `DaftScalarFunction` with:
- `name(&self) -> &CStr` — use `c"<extension_name>_<fn_name>"` prefix to avoid collisions.
- `return_field(&self, args: &[Field]) -> DaftResult<Field>` — validate arg count and types,
return `Err(DaftError::TypeError(...))` for violations.
- `call(&self, args: &[ArrayRef]) -> DaftResult<ArrayRef>` — compute over Arrow arrays,
propagate nulls, return `Err(DaftError::RuntimeError(...))` for failures.
- `return_field(&self, args: &[ArrowSchema]) -> DaftResult<ArrowSchema>` — use `.as_raw()` to
borrow as arrow-rs `FFI_ArrowSchema` for type checking, then `.into()` to return output.
- `call(&self, args: &[ArrowData]) -> DaftResult<ArrowData>` — use `ArrowData::take_arg` then
`.into()` to convert to arrow-rs FFI types, compute, then `.into()` to return the result.

## Python conventions

Expand Down
5 changes: 2 additions & 3 deletions examples/hello/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,5 @@ name = "hello"
crate-type = ["cdylib"]

[dependencies]
daft-ext = {path = "../../src/daft-ext"}
arrow-array = {version = "57.1.0", features = ["chrono-tz"]}
arrow-schema = "57.1.0"
daft-ext = {path = "../../src/daft-ext", features = ["arrow-58"]}
arrow = {version = "58", features = ["ffi"]}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note the different major version now

36 changes: 27 additions & 9 deletions examples/hello/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{ffi::CStr, sync::Arc};

use arrow_array::{Array, ArrayRef, builder::StringBuilder, cast::AsArray};
use arrow_schema::{DataType, Field};
use arrow::{
array::{Array, builder::StringBuilder, cast::AsArray},
datatypes::{DataType, Field},
};
use daft_ext::prelude::*;

// ── Module ──────────────────────────────────────────────────────────
Expand All @@ -24,24 +26,35 @@ impl DaftScalarFunction for Greet {
c"greet"
}

fn return_field(&self, args: &[Field]) -> DaftResult<Field> {
fn return_field(&self, args: &[ArrowSchema]) -> DaftResult<ArrowSchema> {
if args.len() != 1 {
return Err(DaftError::TypeError(format!(
"greet: expected 1 argument, got {}",
args.len()
)));
}
if *args[0].data_type() != DataType::Utf8 && *args[0].data_type() != DataType::LargeUtf8 {
let field = Field::try_from(&args[0])?;
let dt = field.data_type();
if *dt != DataType::Utf8 && *dt != DataType::LargeUtf8 {
return Err(DaftError::TypeError(format!(
"greet: expected string argument, got {:?}",
args[0].data_type()
dt
)));
}
Ok(Field::new("greet", DataType::Utf8, true))
Ok(ArrowSchema::try_from(&Field::new(
"greet",
DataType::Utf8,
true,
))?)
}

fn call(&self, args: &[ArrayRef]) -> DaftResult<ArrayRef> {
let names = args[0].as_string::<i64>();
fn call(&self, args: Vec<ArrowData>) -> DaftResult<ArrowData> {
let data = args.into_iter().next().unwrap();
let ffi_array: arrow::ffi::FFI_ArrowArray = data.array.into();
let ffi_schema: arrow::ffi::FFI_ArrowSchema = data.schema.into();
let arrow_data = unsafe { arrow::ffi::from_ffi(ffi_array, &ffi_schema) }?;
let input = arrow::array::make_array(arrow_data);
let names = input.as_string::<i64>();
let mut builder = StringBuilder::with_capacity(names.len(), names.len() * 16);
for i in 0..names.len() {
if names.is_null(i) {
Expand All @@ -50,6 +63,11 @@ impl DaftScalarFunction for Greet {
builder.append_value(format!("Hello, {}!", names.value(i)));
}
}
Ok(Arc::new(builder.finish()))
let output = builder.finish();
let (out_arr, out_sch) = arrow::ffi::to_ffi(&output.to_data())?;
Ok(ArrowData {
array: out_arr.into(),
schema: out_sch.into(),
})
}
}
33 changes: 26 additions & 7 deletions src/daft-ext-abi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
[dependencies]
arrow = {workspace = true, features = ["ffi"]}
arrow-array = {workspace = true}

[lints]
workspace = true

[package]
description = "Daft extension C ABI contract"
edition = {workspace = true}
name = "daft-ext-abi"
version = {workspace = true}

[package.metadata.cargo-machete]
ignored = [
"arrow-data-56",
"arrow-data-57",
"arrow-data-58",
"arrow-schema-56",
"arrow-schema-57",
"arrow-schema-58"
]

[features]
arrow-56 = ["dep:arrow-schema-56", "dep:arrow-data-56"]
arrow-57 = ["dep:arrow-schema-57", "dep:arrow-data-57"]
arrow-58 = ["dep:arrow-schema-58", "dep:arrow-data-58"]

[dependencies]
arrow-schema-56 = {package = "arrow-schema", version = "56", features = ["ffi"], optional = true}
arrow-data-56 = {package = "arrow-data", version = "56", features = ["ffi"], optional = true}
arrow-schema-57 = {package = "arrow-schema", version = "57", features = ["ffi"], optional = true}
arrow-data-57 = {package = "arrow-data", version = "57", features = ["ffi"], optional = true}
arrow-schema-58 = {package = "arrow-schema", version = "58", features = ["ffi"], optional = true}
arrow-data-58 = {package = "arrow-data", version = "58", features = ["ffi"], optional = true}

[lints]
workspace = true
Loading
Loading