Skip to content

[WIP] ENH: support reading directory in read_csv #61275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
84d6bd3
Add Pandas Cookbook to Book Recommendations (#61271)
WillAyd Apr 11, 2025
16cf492
bug fix
fangchenli Apr 12, 2025
b69fad1
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli Apr 13, 2025
822dffc
fix win related error
fangchenli Apr 13, 2025
5637dca
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli Apr 18, 2025
3905f1c
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli Apr 21, 2025
361c41c
add encoding
fangchenli Apr 21, 2025
02f93bd
fix import
fangchenli Apr 21, 2025
c77158e
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 4, 2025
179f911
format
fangchenli May 4, 2025
d7bef62
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 6, 2025
db1c7ed
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 8, 2025
91a7956
improve test
fangchenli May 8, 2025
8b5cdd4
debug for new fsspec
fangchenli May 8, 2025
13c1258
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 9, 2025
abce2fd
debug min version fsspec
fangchenli May 9, 2025
70bcb2a
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 9, 2025
b99b641
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 11, 2025
14d7afc
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 13, 2025
2a445f3
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 19, 2025
3173270
format
fangchenli May 22, 2025
f94a0bf
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 22, 2025
38bed64
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli May 24, 2025
2a66b92
Merge remote-tracking branch 'upstream' into read-csv-from-directory
fangchenli May 30, 2025
a2b65e1
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli Jun 2, 2025
d77d290
fix format
fangchenli Jun 2, 2025
2eee5e2
fix test
fangchenli Jun 5, 2025
b6b48e9
Merge remote-tracking branch 'upstream/main' into read-csv-from-direc…
fangchenli Jun 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 146 additions & 1 deletion pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import defaultdict
from collections.abc import (
Hashable,
Iterable,
Mapping,
Sequence,
)
Expand All @@ -26,7 +27,10 @@
)
import mmap
import os
from pathlib import Path
from pathlib import (
Path,
PurePosixPath,
)
import re
import tarfile
from typing import (
Expand All @@ -42,6 +46,7 @@
overload,
)
from urllib.parse import (
unquote,
urljoin,
urlparse as parse_url,
uses_netloc,
Expand All @@ -55,6 +60,7 @@
BaseBuffer,
ReadCsvBuffer,
)
from pandas.compat import is_platform_windows
from pandas.compat._optional import import_optional_dependency
from pandas.util._decorators import doc
from pandas.util._exceptions import find_stack_level
Expand Down Expand Up @@ -1282,3 +1288,142 @@ def dedup_names(
counts[col] = cur_count + 1

return names


def _infer_protocol(path: str) -> str:
# Treat Windows drive letters like C:\ as local file paths
if is_platform_windows() and re.match(r"^[a-zA-Z]:[\\/]", path):
return "file"

if is_fsspec_url(path):
parsed = parse_url(path)
return parsed.scheme
return "file"


def _match_file(
path: Path | PurePosixPath, extensions: set[str] | None, glob: str | None
) -> bool:
"""
Check if the file matches the given extensions and glob pattern.
Parameters
----------
path : Path or PurePosixPath
The file path to check.
extensions : set[str]
A set of file extensions to match against.
glob : str
A glob pattern to match against.
Returns
-------
bool
True if the file matches the extensions and glob pattern, False otherwise.
"""
return (extensions is None or path.suffix.lower() in extensions) and (
glob is None or path.match(glob)
)


def _resolve_local_path(path_str: str) -> Path:
parsed = parse_url(path_str)
if is_platform_windows() and parsed.netloc:
return Path(f"{parsed.netloc}{parsed.path}")
return Path(unquote(parsed.path))


def iterdir(
path: FilePath | BaseBuffer,
extensions: str | Iterable[str] | None = None,
glob: str | None = None,
) -> list[Path | PurePosixPath] | BaseBuffer:
"""Yield file paths in a directory (no nesting allowed).

Supports:
- Local paths (str, os.PathLike)
- file:// URLs
- Remote paths (e.g., s3://) via fsspec (if installed)

Parameters
----------
path : FilePath
Path to the directory (local or remote).
extensions : str or list of str, optional
Only yield files with the given extension(s). Case-insensitive.
If None, all files are yielded.
glob : str, optional
Only yield files matching the given glob pattern.
If None, all files are yielded.

Returns
------
pathlib.Path or pathlib.PurePosixPath
File paths within the directory.

Raises
------
NotADirectoryError
If the given path is not a directory.
ImportError
If fsspec is required but not installed.
"""
if hasattr(path, "read") or hasattr(path, "write"):
return path

if not isinstance(path, (str, os.PathLike)):
raise TypeError(
f"Expected file path name or file-like object, got {type(path)} type"
)

if extensions is not None:
if isinstance(extensions, str):
extensions = {extensions.lower()}
else:
extensions = {ext.lower() for ext in extensions}

path_str = os.fspath(path)
scheme = _infer_protocol(path_str)

if scheme == "file":
resolved_path = _resolve_local_path(path_str)
if resolved_path.is_file():
if _match_file(
resolved_path,
extensions,
glob,
):
return [resolved_path]

result = []
for entry in resolved_path.iterdir():
if entry.is_file():
if _match_file(
entry,
extensions,
glob,
):
result.append(entry)
return result

# Remote paths
fsspec = import_optional_dependency("fsspec", extra=scheme)
fs = fsspec.filesystem(scheme)
path_without_scheme = fsspec.core.strip_protocol(path_str)
if fs.isfile(path_without_scheme):
if _match_file(
path_without_scheme,
extensions,
glob,
):
return [PurePosixPath(path_without_scheme)]

result = []
for file in fs.ls(path_without_scheme, detail=True):
if file["type"] == "file":
path_obj = PurePosixPath(file["name"])
if _match_file(
path_obj,
extensions,
glob,
):
result.append(path_obj)
return result
49 changes: 38 additions & 11 deletions pandas/io/parsers/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from pandas.io.common import (
IOHandles,
get_handle,
iterdir,
stringify_path,
validate_header_arg,
)
Expand All @@ -73,6 +74,7 @@
if TYPE_CHECKING:
from collections.abc import (
Callable,
Generator,
Hashable,
Iterable,
Mapping,
Expand Down Expand Up @@ -668,9 +670,23 @@ def _validate_names(names: Sequence[Hashable] | None) -> None:
raise ValueError("Names should be an ordered collection.")


def _multi_file_generator(
list_of_files: list[str], kwds
) -> Generator[DataFrame] | Generator[TextFileReader]:
"""Generator for multiple files."""
for file in list_of_files:
parser = TextFileReader(file, **kwds)

if kwds.get("chunksize", None) or kwds.get("iterator", False):
yield parser
else:
with parser:
yield parser.read(kwds.get("nrows", None))


def _read(
filepath_or_buffer: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], kwds
) -> DataFrame | TextFileReader:
) -> DataFrame | TextFileReader | Generator[DataFrame] | Generator[TextFileReader]:
"""Generic reader of line files."""
# if we pass a date_format and parse_dates=False, we should not parse the
# dates GH#44366
Expand Down Expand Up @@ -709,14 +725,26 @@ def _read(
# Check for duplicates in names.
_validate_names(kwds.get("names", None))

# Create the parser.
parser = TextFileReader(filepath_or_buffer, **kwds)
extensions = kwds.get("extensions", None)
glob = kwds.get("glob", None)
files = iterdir(filepath_or_buffer, extensions, glob)

if isinstance(files, list) and not files:
raise FileNotFoundError(
f"No files found in {filepath_or_buffer}, "
f"with extension(s) {extensions} and glob pattern {glob}"
)

if (isinstance(files, list) and len(files) == 1) or not isinstance(files, list):
file = files[0] if isinstance(files, list) else files
parser = TextFileReader(file, **kwds)

if chunksize or iterator:
return parser
if chunksize or iterator:
return parser

with parser:
return parser.read(nrows)
with parser:
return parser.read(nrows)
return _multi_file_generator(files, kwds)


@overload
Expand Down Expand Up @@ -932,10 +960,9 @@ def read_table(
skipfooter: int = 0,
nrows: int | None = None,
# NA and Missing Data Handling
na_values: Hashable
| Iterable[Hashable]
| Mapping[Hashable, Iterable[Hashable]]
| None = None,
na_values: (
Hashable | Iterable[Hashable] | Mapping[Hashable, Iterable[Hashable]] | None
) = None,
keep_default_na: bool = True,
na_filter: bool = True,
skip_blank_lines: bool = True,
Expand Down
29 changes: 29 additions & 0 deletions pandas/tests/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,32 @@ def compression_format(request):
@pytest.fixture(params=_compression_formats_params)
def compression_ext(request):
return request.param[0]


@pytest.fixture
def local_csv_directory(tmp_path):
"""
Fixture to create a directory with dummy CSV files for testing.
"""
for i in range(3):
file_path = tmp_path / f"{i}.csv"
file_path.touch()
return tmp_path


@pytest.fixture
def remote_csv_directory(monkeypatch):
_ = pytest.importorskip("fsspec", reason="fsspec is required for remote tests")

from fsspec.implementations.memory import MemoryFileSystem

fs = MemoryFileSystem()
fs.store.clear()

dir_name = "remote-bucket"
fs.pipe(f"{dir_name}/a.csv", b"a,b,c\n1,2,3\n")
fs.pipe(f"{dir_name}/b.csv", b"a,b,c\n4,5,6\n")
fs.pipe(f"{dir_name}/nested/ignored.csv", b"x,y,z\n")

monkeypatch.setattr("fsspec.filesystem", lambda _: fs)
return f"s3://{dir_name}"
13 changes: 11 additions & 2 deletions pandas/tests/io/parser/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ def parser_and_data(all_parsers, csv1):
return parser, data, expected


@pytest.fixture
def empty_zip_file(tmp_path):
# Create an empty zip file for testing
zip_path = tmp_path / "empty.zip"
with zipfile.ZipFile(zip_path, "w"):
pass
return zip_path


@pytest.mark.parametrize("compression", ["zip", "infer", "zip2"])
def test_zip(parser_and_data, compression):
parser, data, expected = parser_and_data
Expand Down Expand Up @@ -158,14 +167,14 @@ def test_compression_utf_encoding(all_parsers, csv_dir_path, utf_value, encoding


@pytest.mark.parametrize("invalid_compression", ["sfark", "bz3", "zipper"])
def test_invalid_compression(all_parsers, invalid_compression):
def test_invalid_compression(all_parsers, empty_zip_file, invalid_compression):
parser = all_parsers
compress_kwargs = {"compression": invalid_compression}

msg = f"Unrecognized compression type: {invalid_compression}"

with pytest.raises(ValueError, match=msg):
parser.read_csv("test_file.zip", **compress_kwargs)
parser.read_csv(empty_zip_file, **compress_kwargs)


def test_compression_tar_archive(all_parsers, csv_dir_path):
Expand Down
37 changes: 37 additions & 0 deletions pandas/tests/io/parser/test_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from csv import (
DictWriter,
reader as csv_reader,
)

import pytest


@pytest.fixture
def directory_data():
return ["a", "b", "c"], [
{"first": {"a": 1, "b": 2, "c": 3}},
{"second": {"a": 4, "b": 5, "c": 6}},
{"third": {"a": 7, "b": 8, "c": 9}},
]


@pytest.fixture
def directory_data_to_file(tmp_path, directory_data):
field_names, data_list = directory_data
for data in data_list:
file_name = next(iter(data.keys()))
path = tmp_path / f"{file_name}.csv"
with path.open("w", newline="", encoding="utf-8") as file:
writer = DictWriter(file, fieldnames=field_names)
writer.writeheader()
writer.writerow(data[file_name])
return tmp_path


def test_directory_data(directory_data_to_file):
assert len(list(directory_data_to_file.iterdir())) == 3
for file in directory_data_to_file.iterdir():
with file.open(encoding="utf-8") as f:
reader = csv_reader(f)
header = next(reader)
assert header == ["a", "b", "c"]
9 changes: 2 additions & 7 deletions pandas/tests/io/parser/test_unsupported.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,10 @@ def test_close_file_handle_on_invalid_usecols(all_parsers):
os.unlink(fname)


def test_invalid_file_inputs(request, all_parsers):
def test_invalid_file_inputs(all_parsers):
# GH#45957
parser = all_parsers
if parser.engine == "python":
request.applymarker(
pytest.mark.xfail(reason=f"{parser.engine} engine supports lists.")
)

with pytest.raises(ValueError, match="Invalid"):
with pytest.raises(TypeError, match="Expected file path name or file-like"):
parser.read_csv([])


Expand Down
Loading
Loading