Skip to content

Commit 9362f53

Browse files
feat: python based catalog and schema provider (#1156)
* Exposing FFI to python * Exposing FFI to python * Workin progress on python catalog * Flushing out schema and catalog providers * Adding implementation of python based catalog and schema providers * Small updates after rebase * Add default in memory options for adding schema and catalogs * Add support for creating in memory catalog and schema * Update from database to schema in unit tests * xfailed label no longer applies to these unit tests * Defining abstract methods for catalog and schema providers * Working through issues between custom catalog and build in schema * Check types on schema provider to return * Add docstring * Add documentation about how to use catalog and schema providers * Re-add module to all after rebase * Minor bugfix * Clippy updates from the new rust version --------- Co-authored-by: renato2099 <[email protected]>
1 parent 9545634 commit 9362f53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1340
-258
lines changed

Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ substrait = ["dep:datafusion-substrait"]
3737
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3838
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
3939
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
40+
pyo3-log = "0.12.4"
4041
arrow = { version = "55.1.0", features = ["pyarrow"] }
4142
datafusion = { version = "48.0.0", features = ["avro", "unicode_expressions"] }
4243
datafusion-substrait = { version = "48.0.0", optional = true }
@@ -49,6 +50,7 @@ async-trait = "0.1.88"
4950
futures = "0.3"
5051
object_store = { version = "0.12.1", features = ["aws", "gcp", "azure", "http"] }
5152
url = "2"
53+
log = "0.4.27"
5254

5355
[build-dependencies]
5456
prost-types = "0.13.1" # keep in line with `datafusion-substrait`

docs/source/user-guide/data-sources.rst

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,59 @@ the interface as describe in the :ref:`Custom Table Provider <io_custom_table_pr
185185
section. This is an advanced topic, but a
186186
`user example <https://github.com/apache/datafusion-python/tree/main/examples/ffi-table-provider>`_
187187
is provided in the DataFusion repository.
188+
189+
Catalog
190+
=======
191+
192+
A common technique for organizing tables is using a three level hierarchical approach. DataFusion
193+
supports this form of organizing using the :py:class:`~datafusion.catalog.Catalog`,
194+
:py:class:`~datafusion.catalog.Schema`, and :py:class:`~datafusion.catalog.Table`. By default,
195+
a :py:class:`~datafusion.context.SessionContext` comes with a single Catalog and a single Schema
196+
with the names ``datafusion`` and ``default``, respectively.
197+
198+
The default implementation uses an in-memory approach to the catalog and schema. We have support
199+
for adding additional in-memory catalogs and schemas. This can be done like in the following
200+
example:
201+
202+
.. code-block:: python
203+
204+
from datafusion.catalog import Catalog, Schema
205+
206+
my_catalog = Catalog.memory_catalog()
207+
my_schema = Schema.memory_schema()
208+
209+
my_catalog.register_schema("my_schema_name", my_schema)
210+
211+
ctx.register_catalog("my_catalog_name", my_catalog)
212+
213+
You could then register tables in ``my_schema`` and access them either through the DataFrame
214+
API or via sql commands such as ``"SELECT * from my_catalog_name.my_schema_name.my_table"``.
215+
216+
User Defined Catalog and Schema
217+
-------------------------------
218+
219+
If the in-memory catalogs are insufficient for your uses, there are two approaches you can take
220+
to implementing a custom catalog and/or schema. In the below discussion, we describe how to
221+
implement these for a Catalog, but the approach to implementing for a Schema is nearly
222+
identical.
223+
224+
DataFusion supports Catalogs written in either Rust or Python. If you write a Catalog in Rust,
225+
you will need to export it as a Python library via PyO3. There is a complete example of a
226+
catalog implemented this way in the
227+
`examples folder <https://github.com/apache/datafusion-python/tree/main/examples/>`_
228+
of our repository. Writing catalog providers in Rust provides typically can lead to significant
229+
performance improvements over the Python based approach.
230+
231+
To implement a Catalog in Python, you will need to inherit from the abstract base class
232+
:py:class:`~datafusion.catalog.CatalogProvider`. There are examples in the
233+
`unit tests <https://github.com/apache/datafusion-python/tree/main/python/tests>`_ of
234+
implementing a basic Catalog in Python where we simply keep a dictionary of the
235+
registered Schemas.
236+
237+
One important note for developers is that when we have a Catalog defined in Python, we have
238+
two different ways of accessing this Catalog. First, we register the catalog with a Rust
239+
wrapper. This allows for any rust based code to call the Python functions as necessary.
240+
Second, if the user access the Catalog via the Python API, we identify this and return back
241+
the original Python object that implements the Catalog. This is an important distinction
242+
for developers because we do *not* return a Python wrapper around the Rust wrapper of the
243+
original Python object.

examples/datafusion-ffi-example/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/datafusion-ffi-example/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"]
2727
arrow = { version = "55.0.0" }
2828
arrow-array = { version = "55.0.0" }
2929
arrow-schema = { version = "55.0.0" }
30+
async-trait = "0.1.88"
3031

3132
[build-dependencies]
3233
pyo3-build-config = "0.23"
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pyarrow as pa
21+
from datafusion import SessionContext
22+
from datafusion_ffi_example import MyCatalogProvider
23+
24+
25+
def test_catalog_provider():
26+
ctx = SessionContext()
27+
28+
my_catalog_name = "my_catalog"
29+
expected_schema_name = "my_schema"
30+
expected_table_name = "my_table"
31+
expected_table_columns = ["units", "price"]
32+
33+
catalog_provider = MyCatalogProvider()
34+
ctx.register_catalog_provider(my_catalog_name, catalog_provider)
35+
my_catalog = ctx.catalog(my_catalog_name)
36+
37+
my_catalog_schemas = my_catalog.names()
38+
assert expected_schema_name in my_catalog_schemas
39+
my_database = my_catalog.database(expected_schema_name)
40+
assert expected_table_name in my_database.names()
41+
my_table = my_database.table(expected_table_name)
42+
assert expected_table_columns == my_table.schema.names
43+
44+
result = ctx.table(
45+
f"{my_catalog_name}.{expected_schema_name}.{expected_table_name}"
46+
).collect()
47+
assert len(result) == 2
48+
49+
col0_result = [r.column(0) for r in result]
50+
col1_result = [r.column(1) for r in result]
51+
expected_col0 = [
52+
pa.array([10, 20, 30], type=pa.int32()),
53+
pa.array([5, 7], type=pa.int32()),
54+
]
55+
expected_col1 = [
56+
pa.array([1, 2, 5], type=pa.float64()),
57+
pa.array([1.5, 2.5], type=pa.float64()),
58+
]
59+
assert col0_result == expected_col0
60+
assert col1_result == expected_col1
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
19+
use std::{any::Any, fmt::Debug, sync::Arc};
20+
21+
use arrow::datatypes::Schema;
22+
use async_trait::async_trait;
23+
use datafusion::{
24+
catalog::{
25+
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, TableProvider,
26+
},
27+
common::exec_err,
28+
datasource::MemTable,
29+
error::{DataFusionError, Result},
30+
};
31+
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
32+
use pyo3::types::PyCapsule;
33+
34+
pub fn my_table() -> Arc<dyn TableProvider + 'static> {
35+
use arrow::datatypes::{DataType, Field};
36+
use datafusion::common::record_batch;
37+
38+
let schema = Arc::new(Schema::new(vec![
39+
Field::new("units", DataType::Int32, true),
40+
Field::new("price", DataType::Float64, true),
41+
]));
42+
43+
let partitions = vec![
44+
record_batch!(
45+
("units", Int32, vec![10, 20, 30]),
46+
("price", Float64, vec![1.0, 2.0, 5.0])
47+
)
48+
.unwrap(),
49+
record_batch!(
50+
("units", Int32, vec![5, 7]),
51+
("price", Float64, vec![1.5, 2.5])
52+
)
53+
.unwrap(),
54+
];
55+
56+
Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
57+
}
58+
59+
#[derive(Debug)]
60+
pub struct FixedSchemaProvider {
61+
inner: MemorySchemaProvider,
62+
}
63+
64+
impl Default for FixedSchemaProvider {
65+
fn default() -> Self {
66+
let inner = MemorySchemaProvider::new();
67+
68+
let table = my_table();
69+
70+
let _ = inner.register_table("my_table".to_string(), table).unwrap();
71+
72+
Self { inner }
73+
}
74+
}
75+
76+
#[async_trait]
77+
impl SchemaProvider for FixedSchemaProvider {
78+
fn as_any(&self) -> &dyn Any {
79+
self
80+
}
81+
82+
fn table_names(&self) -> Vec<String> {
83+
self.inner.table_names()
84+
}
85+
86+
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
87+
self.inner.table(name).await
88+
}
89+
90+
fn register_table(
91+
&self,
92+
name: String,
93+
table: Arc<dyn TableProvider>,
94+
) -> Result<Option<Arc<dyn TableProvider>>> {
95+
self.inner.register_table(name, table)
96+
}
97+
98+
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
99+
self.inner.deregister_table(name)
100+
}
101+
102+
fn table_exist(&self, name: &str) -> bool {
103+
self.inner.table_exist(name)
104+
}
105+
}
106+
107+
/// This catalog provider is intended only for unit tests. It prepopulates with one
108+
/// schema and only allows for schemas named after four types of fruit.
109+
#[pyclass(
110+
name = "MyCatalogProvider",
111+
module = "datafusion_ffi_example",
112+
subclass
113+
)]
114+
#[derive(Debug)]
115+
pub(crate) struct MyCatalogProvider {
116+
inner: MemoryCatalogProvider,
117+
}
118+
119+
impl Default for MyCatalogProvider {
120+
fn default() -> Self {
121+
let inner = MemoryCatalogProvider::new();
122+
123+
let schema_name: &str = "my_schema";
124+
let _ = inner.register_schema(schema_name, Arc::new(FixedSchemaProvider::default()));
125+
126+
Self { inner }
127+
}
128+
}
129+
130+
impl CatalogProvider for MyCatalogProvider {
131+
fn as_any(&self) -> &dyn Any {
132+
self
133+
}
134+
135+
fn schema_names(&self) -> Vec<String> {
136+
self.inner.schema_names()
137+
}
138+
139+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
140+
self.inner.schema(name)
141+
}
142+
143+
fn register_schema(
144+
&self,
145+
name: &str,
146+
schema: Arc<dyn SchemaProvider>,
147+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
148+
self.inner.register_schema(name, schema)
149+
}
150+
151+
fn deregister_schema(
152+
&self,
153+
name: &str,
154+
cascade: bool,
155+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
156+
self.inner.deregister_schema(name, cascade)
157+
}
158+
}
159+
160+
#[pymethods]
161+
impl MyCatalogProvider {
162+
#[new]
163+
pub fn new() -> Self {
164+
Self {
165+
inner: Default::default(),
166+
}
167+
}
168+
169+
pub fn __datafusion_catalog_provider__<'py>(
170+
&self,
171+
py: Python<'py>,
172+
) -> PyResult<Bound<'py, PyCapsule>> {
173+
let name = cr"datafusion_catalog_provider".into();
174+
let catalog_provider =
175+
FFI_CatalogProvider::new(Arc::new(MyCatalogProvider::default()), None);
176+
177+
PyCapsule::new(py, catalog_provider, Some(name))
178+
}
179+
}

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::catalog_provider::MyCatalogProvider;
1819
use crate::table_function::MyTableFunction;
1920
use crate::table_provider::MyTableProvider;
2021
use pyo3::prelude::*;
2122

23+
pub(crate) mod catalog_provider;
2224
pub(crate) mod table_function;
2325
pub(crate) mod table_provider;
2426

2527
#[pymodule]
2628
fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
2729
m.add_class::<MyTableProvider>()?;
2830
m.add_class::<MyTableFunction>()?;
31+
m.add_class::<MyCatalogProvider>()?;
2932
Ok(())
3033
}

0 commit comments

Comments
 (0)