Skip to content

Commit b8371f5

Browse files
ENH: Implement to_iceberg (#61507)
1 parent de357d3 commit b8371f5

File tree

7 files changed

+224
-7
lines changed

7 files changed

+224
-7
lines changed

doc/source/getting_started/install.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ Dependency Minimum Version pip ex
308308
`zlib <https://github.com/madler/zlib>`__ hdf5 Compression for HDF5
309309
`fastparquet <https://github.com/dask/fastparquet>`__ 2024.2.0 - Parquet reading / writing (pyarrow is default)
310310
`pyarrow <https://github.com/apache/arrow>`__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing
311-
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading
311+
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading / writing
312312
`pyreadstat <https://github.com/Roche/pyreadstat>`__ 1.2.6 spss SPSS files (.sav) reading
313313
`odfpy <https://github.com/eea/odfpy>`__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing
314314
====================================================== ================== ================ ==========================================================

doc/source/reference/io.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ Iceberg
162162
:toctree: api/
163163

164164
read_iceberg
165+
DataFrame.to_iceberg
165166

166167
.. warning:: ``read_iceberg`` is experimental and may change without warning.
167168

doc/source/user_guide/io.rst

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like
2929
binary,`HDF5 Format <https://support.hdfgroup.org/documentation/hdf5/latest/_intro_h_d_f5.html>`__, :ref:`read_hdf<io.hdf5>`, :ref:`to_hdf<io.hdf5>`
3030
binary,`Feather Format <https://github.com/wesm/feather>`__, :ref:`read_feather<io.feather>`, :ref:`to_feather<io.feather>`
3131
binary,`Parquet Format <https://parquet.apache.org/>`__, :ref:`read_parquet<io.parquet>`, :ref:`to_parquet<io.parquet>`
32-
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , NA
32+
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , :ref:`to_iceberg<io.iceberg>`
3333
binary,`ORC Format <https://orc.apache.org/>`__, :ref:`read_orc<io.orc>`, :ref:`to_orc<io.orc>`
3434
binary,`Stata <https://en.wikipedia.org/wiki/Stata>`__, :ref:`read_stata<io.stata_reader>`, :ref:`to_stata<io.stata_writer>`
3535
binary,`SAS <https://en.wikipedia.org/wiki/SAS_(software)>`__, :ref:`read_sas<io.sas_reader>` , NA
@@ -5417,7 +5417,7 @@ engines to safely work with the same tables at the same time.
54175417

54185418
Iceberg support predicate pushdown and column pruning, which are available to pandas
54195419
users via the ``row_filter`` and ``selected_fields`` parameters of the :func:`~pandas.read_iceberg`
5420-
function. This is convenient to extract from large tables a subset that fits in memory asa
5420+
function. This is convenient to extract from large tables a subset that fits in memory as a
54215421
pandas ``DataFrame``.
54225422

54235423
Internally, pandas uses PyIceberg_ to query Iceberg.
@@ -5497,6 +5497,29 @@ parameter:
54975497
Reading a particular snapshot is also possible providing the snapshot ID as an argument to
54985498
``snapshot_id``.
54995499

5500+
To save a ``DataFrame`` to Iceberg, it can be done with the :meth:`DataFrame.to_iceberg`
5501+
method:
5502+
5503+
.. code-block:: python
5504+
5505+
df.to_iceberg("my_table", catalog_name="my_catalog")
5506+
5507+
To specify the catalog, it works in the same way as for :func:`read_iceberg` with the
5508+
``catalog_name`` and ``catalog_properties`` parameters.
5509+
5510+
The location of the table can be specified with the ``location`` parameter:
5511+
5512+
.. code-block:: python
5513+
5514+
df.to_iceberg(
5515+
"my_table",
5516+
catalog_name="my_catalog",
5517+
location="s://my-data-lake/my-iceberg-tables",
5518+
)
5519+
5520+
It is possible to add properties to the table snapshot by passing a dictionary to the
5521+
``snapshot_properties`` parameter.
5522+
55005523
More information about the Iceberg format can be found in the `Apache Iceberg official
55015524
page <https://iceberg.apache.org/>`__.
55025525

doc/source/whatsnew/v3.0.0.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ Other enhancements
7979
- :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`)
8080
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
8181
- Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`)
82-
- Added support to read from Apache Iceberg tables with the new :func:`read_iceberg` function (:issue:`61383`)
82+
- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` functions (:issue:`61383`)
8383
- Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`)
8484
- Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`)
8585
- Improved deprecation message for offset aliases (:issue:`60820`)

pandas/core/frame.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3547,6 +3547,62 @@ def to_xml(
35473547

35483548
return xml_formatter.write_output()
35493549

3550+
def to_iceberg(
3551+
self,
3552+
table_identifier: str,
3553+
catalog_name: str | None = None,
3554+
*,
3555+
catalog_properties: dict[str, Any] | None = None,
3556+
location: str | None = None,
3557+
append: bool = False,
3558+
snapshot_properties: dict[str, str] | None = None,
3559+
) -> None:
3560+
"""
3561+
Write a DataFrame to an Apache Iceberg table.
3562+
3563+
.. versionadded:: 3.0.0
3564+
3565+
.. warning::
3566+
3567+
to_iceberg is experimental and may change without warning.
3568+
3569+
Parameters
3570+
----------
3571+
table_identifier : str
3572+
Table identifier.
3573+
catalog_name : str, optional
3574+
The name of the catalog.
3575+
catalog_properties : dict of {str: str}, optional
3576+
The properties that are used next to the catalog configuration.
3577+
location : str, optional
3578+
Location for the table.
3579+
append : bool, default False
3580+
If ``True``, append data to the table, instead of replacing the content.
3581+
snapshot_properties : dict of {str: str}, optional
3582+
Custom properties to be added to the snapshot summary
3583+
3584+
See Also
3585+
--------
3586+
read_iceberg : Read an Apache Iceberg table.
3587+
DataFrame.to_parquet : Write a DataFrame in Parquet format.
3588+
3589+
Examples
3590+
--------
3591+
>>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]})
3592+
>>> df.to_iceberg("my_table", catalog_name="my_catalog") # doctest: +SKIP
3593+
"""
3594+
from pandas.io.iceberg import to_iceberg
3595+
3596+
to_iceberg(
3597+
self,
3598+
table_identifier,
3599+
catalog_name,
3600+
catalog_properties=catalog_properties,
3601+
location=location,
3602+
append=append,
3603+
snapshot_properties=snapshot_properties,
3604+
)
3605+
35503606
# ----------------------------------------------------------------------
35513607
@doc(INFO_DOCSTRING, **frame_sub_kwargs)
35523608
def info(

pandas/io/iceberg.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
def read_iceberg(
1111
table_identifier: str,
1212
catalog_name: str | None = None,
13+
*,
1314
catalog_properties: dict[str, Any] | None = None,
1415
row_filter: str | None = None,
1516
selected_fields: tuple[str] | None = None,
@@ -21,6 +22,8 @@ def read_iceberg(
2122
"""
2223
Read an Apache Iceberg table into a pandas DataFrame.
2324
25+
.. versionadded:: 3.0.0
26+
2427
.. warning::
2528
2629
read_iceberg is experimental and may change without warning.
@@ -71,7 +74,6 @@ def read_iceberg(
7174
"""
7275
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
7376
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")
74-
7577
if catalog_properties is None:
7678
catalog_properties = {}
7779
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
@@ -91,3 +93,59 @@ def read_iceberg(
9193
limit=limit,
9294
)
9395
return result.to_pandas()
96+
97+
98+
def to_iceberg(
99+
df: DataFrame,
100+
table_identifier: str,
101+
catalog_name: str | None = None,
102+
*,
103+
catalog_properties: dict[str, Any] | None = None,
104+
location: str | None = None,
105+
append: bool = False,
106+
snapshot_properties: dict[str, str] | None = None,
107+
) -> None:
108+
"""
109+
Write a DataFrame to an Apache Iceberg table.
110+
111+
.. versionadded:: 3.0.0
112+
113+
Parameters
114+
----------
115+
table_identifier : str
116+
Table identifier.
117+
catalog_name : str, optional
118+
The name of the catalog.
119+
catalog_properties : dict of {str: str}, optional
120+
The properties that are used next to the catalog configuration.
121+
location : str, optional
122+
Location for the table.
123+
append : bool, default False
124+
If ``True``, append data to the table, instead of replacing the content.
125+
snapshot_properties : dict of {str: str}, optional
126+
Custom properties to be added to the snapshot summary
127+
128+
See Also
129+
--------
130+
read_iceberg : Read an Apache Iceberg table.
131+
DataFrame.to_parquet : Write a DataFrame in Parquet format.
132+
"""
133+
pa = import_optional_dependency("pyarrow")
134+
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
135+
if catalog_properties is None:
136+
catalog_properties = {}
137+
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
138+
arrow_table = pa.Table.from_pandas(df)
139+
table = catalog.create_table_if_not_exists(
140+
identifier=table_identifier,
141+
schema=arrow_table.schema,
142+
location=location,
143+
# we could add `partition_spec`, `sort_order` and `properties` in the
144+
# future, but it may not be trivial without exposing PyIceberg objects
145+
)
146+
if snapshot_properties is None:
147+
snapshot_properties = {}
148+
if append:
149+
table.append(arrow_table, snapshot_properties=snapshot_properties)
150+
else:
151+
table.overwrite(arrow_table, snapshot_properties=snapshot_properties)

pandas/tests/io/test_iceberg.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog")
2323
pq = pytest.importorskip("pyarrow.parquet")
2424

25-
Catalog = collections.namedtuple("Catalog", ["name", "uri"])
25+
Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"])
2626

2727

2828
@pytest.fixture
@@ -58,7 +58,7 @@ def catalog(request, tmp_path):
5858

5959
importlib.reload(pyiceberg_catalog) # needed to reload the config file
6060

61-
yield Catalog(name=catalog_name or "default", uri=uri)
61+
yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse)
6262

6363
if catalog_name is not None:
6464
config_path.unlink()
@@ -141,3 +141,82 @@ def test_read_with_limit(self, catalog):
141141
limit=2,
142142
)
143143
tm.assert_frame_equal(result, expected)
144+
145+
def test_write(self, catalog):
146+
df = pd.DataFrame(
147+
{
148+
"A": [1, 2, 3],
149+
"B": ["foo", "foo", "foo"],
150+
}
151+
)
152+
df.to_iceberg(
153+
"ns.new_table",
154+
catalog_properties={"uri": catalog.uri},
155+
location=catalog.warehouse,
156+
)
157+
result = read_iceberg(
158+
"ns.new_table",
159+
catalog_properties={"uri": catalog.uri},
160+
)
161+
tm.assert_frame_equal(result, df)
162+
163+
@pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True)
164+
def test_write_by_catalog_name(self, catalog):
165+
df = pd.DataFrame(
166+
{
167+
"A": [1, 2, 3],
168+
"B": ["foo", "foo", "foo"],
169+
}
170+
)
171+
df.to_iceberg(
172+
"ns.new_table",
173+
catalog_name=catalog.name,
174+
)
175+
result = read_iceberg(
176+
"ns.new_table",
177+
catalog_name=catalog.name,
178+
)
179+
tm.assert_frame_equal(result, df)
180+
181+
def test_write_existing_table_with_append_true(self, catalog):
182+
original = read_iceberg(
183+
"ns.my_table",
184+
catalog_properties={"uri": catalog.uri},
185+
)
186+
new = pd.DataFrame(
187+
{
188+
"A": [1, 2, 3],
189+
"B": ["foo", "foo", "foo"],
190+
}
191+
)
192+
expected = pd.concat([original, new], ignore_index=True)
193+
new.to_iceberg(
194+
"ns.my_table",
195+
catalog_properties={"uri": catalog.uri},
196+
location=catalog.warehouse,
197+
append=True,
198+
)
199+
result = read_iceberg(
200+
"ns.my_table",
201+
catalog_properties={"uri": catalog.uri},
202+
)
203+
tm.assert_frame_equal(result, expected)
204+
205+
def test_write_existing_table_with_append_false(self, catalog):
206+
df = pd.DataFrame(
207+
{
208+
"A": [1, 2, 3],
209+
"B": ["foo", "foo", "foo"],
210+
}
211+
)
212+
df.to_iceberg(
213+
"ns.my_table",
214+
catalog_properties={"uri": catalog.uri},
215+
location=catalog.warehouse,
216+
append=False,
217+
)
218+
result = read_iceberg(
219+
"ns.my_table",
220+
catalog_properties={"uri": catalog.uri},
221+
)
222+
tm.assert_frame_equal(result, df)

0 commit comments

Comments
 (0)