Skip to content

Commit ef62fa8

Browse files
authored
Add compression_level support to ParquetWriterOptions and enhance write_parquet to accept full options object (#1169)
* feat: Add Parquet writer option autodetection * Add compression_level to ParquetWriterOptions * fix ruff errors * feat: Add overloads for write_parquet method to support various compression options
1 parent 0d3c37f commit ef62fa8

File tree

3 files changed

+54
-5
lines changed

3 files changed

+54
-5
lines changed

python/datafusion/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@
3030
except ImportError:
3131
import importlib_metadata
3232

33-
from datafusion.col import col, column
34-
3533
from . import functions, object_store, substrait, unparser
3634

3735
# The following imports are okay to remain as opaque to the user.
3836
from ._internal import Config
3937
from .catalog import Catalog, Database, Table
38+
from .col import col, column
4039
from .common import (
4140
DFSchema,
4241
)

python/datafusion/dataframe.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ def __init__(
191191
writer_version: str = "1.0",
192192
skip_arrow_metadata: bool = False,
193193
compression: Optional[str] = "zstd(3)",
194+
compression_level: Optional[int] = None,
194195
dictionary_enabled: Optional[bool] = True,
195196
dictionary_page_size_limit: int = 1024 * 1024,
196197
statistics_enabled: Optional[str] = "page",
@@ -213,7 +214,10 @@ def __init__(
213214
self.write_batch_size = write_batch_size
214215
self.writer_version = writer_version
215216
self.skip_arrow_metadata = skip_arrow_metadata
216-
self.compression = compression
217+
if compression_level is not None:
218+
self.compression = f"{compression}({compression_level})"
219+
else:
220+
self.compression = compression
217221
self.dictionary_enabled = dictionary_enabled
218222
self.dictionary_page_size_limit = dictionary_page_size_limit
219223
self.statistics_enabled = statistics_enabled
@@ -870,10 +874,34 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None
870874
"""
871875
self.df.write_csv(str(path), with_header)
872876

877+
@overload
878+
def write_parquet(
879+
self,
880+
path: str | pathlib.Path,
881+
compression: str,
882+
compression_level: int | None = None,
883+
) -> None: ...
884+
885+
@overload
886+
def write_parquet(
887+
self,
888+
path: str | pathlib.Path,
889+
compression: Compression = Compression.ZSTD,
890+
compression_level: int | None = None,
891+
) -> None: ...
892+
893+
@overload
894+
def write_parquet(
895+
self,
896+
path: str | pathlib.Path,
897+
compression: ParquetWriterOptions,
898+
compression_level: None = None,
899+
) -> None: ...
900+
873901
def write_parquet(
874902
self,
875903
path: str | pathlib.Path,
876-
compression: Union[str, Compression] = Compression.ZSTD,
904+
compression: Union[str, Compression, ParquetWriterOptions] = Compression.ZSTD,
877905
compression_level: int | None = None,
878906
) -> None:
879907
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
@@ -894,7 +922,13 @@ def write_parquet(
894922
recommended range is 1 to 22, with the default being 4. Higher levels
895923
provide better compression but slower speed.
896924
"""
897-
# Convert string to Compression enum if necessary
925+
if isinstance(compression, ParquetWriterOptions):
926+
if compression_level is not None:
927+
msg = "compression_level should be None when using ParquetWriterOptions"
928+
raise ValueError(msg)
929+
self.write_parquet_with_options(path, compression)
930+
return
931+
898932
if isinstance(compression, str):
899933
compression = Compression.from_str(compression)
900934

python/tests/test_dataframe.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,6 +2038,22 @@ def test_write_parquet_with_options_column_options(df, tmp_path):
20382038
assert col["encodings"] == result["encodings"]
20392039

20402040

2041+
def test_write_parquet_options(df, tmp_path):
2042+
options = ParquetWriterOptions(compression="gzip", compression_level=6)
2043+
df.write_parquet(str(tmp_path), options)
2044+
2045+
result = pq.read_table(str(tmp_path)).to_pydict()
2046+
expected = df.to_pydict()
2047+
2048+
assert result == expected
2049+
2050+
2051+
def test_write_parquet_options_error(df, tmp_path):
2052+
options = ParquetWriterOptions(compression="gzip", compression_level=6)
2053+
with pytest.raises(ValueError):
2054+
df.write_parquet(str(tmp_path), options, compression_level=1)
2055+
2056+
20412057
def test_dataframe_export(df) -> None:
20422058
# Guarantees that we have the canonical implementation
20432059
# reading our dataframe export

0 commit comments

Comments
 (0)