Skip to content

feat: python based catalog and schema provider #1156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ substrait = ["dep:datafusion-substrait"]
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
pyo3-log = "0.12.4"
arrow = { version = "55.1.0", features = ["pyarrow"] }
datafusion = { version = "48.0.0", features = ["avro", "unicode_expressions"] }
datafusion-substrait = { version = "48.0.0", optional = true }
Expand All @@ -49,6 +50,7 @@ async-trait = "0.1.88"
futures = "0.3"
object_store = { version = "0.12.1", features = ["aws", "gcp", "azure", "http"] }
url = "2"
log = "0.4.27"

[build-dependencies]
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
Expand Down
56 changes: 56 additions & 0 deletions docs/source/user-guide/data-sources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,59 @@ the interface as describe in the :ref:`Custom Table Provider <io_custom_table_pr
section. This is an advanced topic, but a
`user example <https://github.com/apache/datafusion-python/tree/main/examples/ffi-table-provider>`_
is provided in the DataFusion repository.

Catalog
=======

A common technique for organizing tables is using a three level hierarchical approach. DataFusion
supports this form of organizing using the :py:class:`~datafusion.catalog.Catalog`,
:py:class:`~datafusion.catalog.Schema`, and :py:class:`~datafusion.catalog.Table`. By default,
a :py:class:`~datafusion.context.SessionContext` comes with a single Catalog and a single Schema
with the names ``datafusion`` and ``default``, respectively.

The default implementation uses an in-memory approach to the catalog and schema. We have support
for adding additional in-memory catalogs and schemas. This can be done like in the following
example:

.. code-block:: python

from datafusion.catalog import Catalog, Schema

my_catalog = Catalog.memory_catalog()
my_schema = Schema.memory_schema()

my_catalog.register_schema("my_schema_name", my_schema)

ctx.register_catalog("my_catalog_name", my_catalog)

You could then register tables in ``my_schema`` and access them either through the DataFrame
API or via sql commands such as ``"SELECT * from my_catalog_name.my_schema_name.my_table"``.

User Defined Catalog and Schema
-------------------------------

If the in-memory catalogs are insufficient for your uses, there are two approaches you can take
to implementing a custom catalog and/or schema. In the below discussion, we describe how to
implement these for a Catalog, but the approach to implementing for a Schema is nearly
identical.

DataFusion supports Catalogs written in either Rust or Python. If you write a Catalog in Rust,
you will need to export it as a Python library via PyO3. There is a complete example of a
catalog implemented this way in the
`examples folder <https://github.com/apache/datafusion-python/tree/main/examples/>`_
of our repository. Writing catalog providers in Rust provides typically can lead to significant
performance improvements over the Python based approach.

To implement a Catalog in Python, you will need to inherit from the abstract base class
:py:class:`~datafusion.catalog.CatalogProvider`. There are examples in the
`unit tests <https://github.com/apache/datafusion-python/tree/main/python/tests>`_ of
implementing a basic Catalog in Python where we simply keep a dictionary of the
registered Schemas.

One important note for developers is that when we have a Catalog defined in Python, we have
two different ways of accessing this Catalog. First, we register the catalog with a Rust
wrapper. This allows for any rust based code to call the Python functions as necessary.
Second, if the user access the Catalog via the Python API, we identify this and return back
the original Python object that implements the Catalog. This is an important distinction
for developers because we do *not* return a Python wrapper around the Rust wrapper of the
original Python object.
1 change: 1 addition & 0 deletions examples/datafusion-ffi-example/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/datafusion-ffi-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"]
arrow = { version = "55.0.0" }
arrow-array = { version = "55.0.0" }
arrow-schema = { version = "55.0.0" }
async-trait = "0.1.88"

[build-dependencies]
pyo3-build-config = "0.23"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import pyarrow as pa
from datafusion import SessionContext
from datafusion_ffi_example import MyCatalogProvider


def test_catalog_provider():
ctx = SessionContext()

my_catalog_name = "my_catalog"
expected_schema_name = "my_schema"
expected_table_name = "my_table"
expected_table_columns = ["units", "price"]

catalog_provider = MyCatalogProvider()
ctx.register_catalog_provider(my_catalog_name, catalog_provider)
my_catalog = ctx.catalog(my_catalog_name)

my_catalog_schemas = my_catalog.names()
assert expected_schema_name in my_catalog_schemas
my_database = my_catalog.database(expected_schema_name)
assert expected_table_name in my_database.names()
my_table = my_database.table(expected_table_name)
assert expected_table_columns == my_table.schema.names

result = ctx.table(
f"{my_catalog_name}.{expected_schema_name}.{expected_table_name}"
).collect()
assert len(result) == 2

col0_result = [r.column(0) for r in result]
col1_result = [r.column(1) for r in result]
expected_col0 = [
pa.array([10, 20, 30], type=pa.int32()),
pa.array([5, 7], type=pa.int32()),
]
expected_col1 = [
pa.array([1, 2, 5], type=pa.float64()),
pa.array([1.5, 2.5], type=pa.float64()),
]
assert col0_result == expected_col0
assert col1_result == expected_col1
179 changes: 179 additions & 0 deletions examples/datafusion-ffi-example/src/catalog_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
use std::{any::Any, fmt::Debug, sync::Arc};

use arrow::datatypes::Schema;
use async_trait::async_trait;
use datafusion::{
catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, TableProvider,
},
common::exec_err,
datasource::MemTable,
error::{DataFusionError, Result},
};
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use pyo3::types::PyCapsule;

pub fn my_table() -> Arc<dyn TableProvider + 'static> {
use arrow::datatypes::{DataType, Field};
use datafusion::common::record_batch;

let schema = Arc::new(Schema::new(vec![
Field::new("units", DataType::Int32, true),
Field::new("price", DataType::Float64, true),
]));

let partitions = vec![
record_batch!(
("units", Int32, vec![10, 20, 30]),
("price", Float64, vec![1.0, 2.0, 5.0])
)
.unwrap(),
record_batch!(
("units", Int32, vec![5, 7]),
("price", Float64, vec![1.5, 2.5])
)
.unwrap(),
];

Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
}

#[derive(Debug)]
pub struct FixedSchemaProvider {
inner: MemorySchemaProvider,
}

impl Default for FixedSchemaProvider {
fn default() -> Self {
let inner = MemorySchemaProvider::new();

let table = my_table();

let _ = inner.register_table("my_table".to_string(), table).unwrap();

Self { inner }
}
}

#[async_trait]
impl SchemaProvider for FixedSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
self.inner.table(name).await
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
}

/// This catalog provider is intended only for unit tests. It prepopulates with one
/// schema and only allows for schemas named after four types of fruit.
#[pyclass(
name = "MyCatalogProvider",
module = "datafusion_ffi_example",
subclass
)]
#[derive(Debug)]
pub(crate) struct MyCatalogProvider {
inner: MemoryCatalogProvider,
}

impl Default for MyCatalogProvider {
fn default() -> Self {
let inner = MemoryCatalogProvider::new();

let schema_name: &str = "my_schema";
let _ = inner.register_schema(schema_name, Arc::new(FixedSchemaProvider::default()));

Self { inner }
}
}

impl CatalogProvider for MyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name)
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.register_schema(name, schema)
}

fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.deregister_schema(name, cascade)
}
}

#[pymethods]
impl MyCatalogProvider {
#[new]
pub fn new() -> Self {
Self {
inner: Default::default(),
}
}

pub fn __datafusion_catalog_provider__<'py>(
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let name = cr"datafusion_catalog_provider".into();
let catalog_provider =
FFI_CatalogProvider::new(Arc::new(MyCatalogProvider::default()), None);

PyCapsule::new(py, catalog_provider, Some(name))
}
}
3 changes: 3 additions & 0 deletions examples/datafusion-ffi-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::catalog_provider::MyCatalogProvider;
use crate::table_function::MyTableFunction;
use crate::table_provider::MyTableProvider;
use pyo3::prelude::*;

pub(crate) mod catalog_provider;
pub(crate) mod table_function;
pub(crate) mod table_provider;

#[pymodule]
fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<MyTableProvider>()?;
m.add_class::<MyTableFunction>()?;
m.add_class::<MyCatalogProvider>()?;
Ok(())
}
Loading