Skip to content

fix: Raise error by default on invalid CSV quotes #22876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
/// * `projection` - Indices of the columns to project.
/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
/// fields are written to the buffers. The UTF8 data will be parsed later.
///
/// Returns the number of bytes parsed successfully.
#[allow(clippy::too_many_arguments)]
pub(super) fn parse_lines(
mut bytes: &[u8],
Expand Down
24 changes: 23 additions & 1 deletion crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ impl<'a> CoreReader<'a> {
Ok(df)
}

// The code adheres to RFC 4180 in a strict sense, unless explicitly documented otherwise.
// Malformed CSV is common, see e.g. the use of lazy_quotes, whitespace and comments.
// In case malformed CSV is detected, a warning or an error will be issued.
// Not all malformed CSV will be detected, as that would impact performance.
fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
let (bytes, _) = self.find_starting_point(
bytes,
Expand Down Expand Up @@ -390,10 +394,12 @@ impl<'a> CoreReader<'a> {

let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
let mut total_offset = 0;
let mut previous_total_offset = 0;
let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
&& self.schema.iter_fields().any(|f| f.dtype().is_string());

pool.scope(|s| {
// Pass 1: identify chunks for parallel processing (line parsing).
loop {
let b = unsafe { bytes.get_unchecked(total_offset..) };
if b.is_empty() {
Expand All @@ -402,6 +408,9 @@ impl<'a> CoreReader<'a> {
debug_assert!(
total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
);

// Count is the number of rows for the next chunk. In case of malformed CSV data,
// count may not be as expected.
let (count, position) = counter.find_next(b, &mut chunk_size);
debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);

Expand All @@ -420,10 +429,12 @@ impl<'a> CoreReader<'a> {
let end = total_offset + position + 1;
let b = unsafe { bytes.get_unchecked(total_offset..end) };

previous_total_offset = total_offset;
total_offset = end;
(b, count)
};

// Pass 2: process each individual chunk in parallel (field parsing)
if !b.is_empty() {
let results = results.clone();
let projection = projection.as_ref();
Expand All @@ -441,7 +452,18 @@ impl<'a> CoreReader<'a> {
let result = slf
.read_chunk(b, projection, 0, count, Some(0), b.len())
.and_then(|mut df| {
debug_assert!(df.height() <= count);

// Check malformed
if df.height() > count || (df.height() < count && slf.parse_options.comment_prefix.is_none()) {
// Note: in case data is malformed, df.height() is more likely to be correct than count.
let msg = format!("CSV malformed: expected {} rows, actual {} rows, in chunk starting at byte offset {}, length {}",
count, df.height(), previous_total_offset, b.len());
if slf.ignore_errors {
polars_warn!(msg);
} else {
polars_bail!(ComputeError: msg);
}
}

if slf.n_rows.is_some() {
total_line_count.fetch_add(df.height(), Ordering::Relaxed);
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-io/src/csv/read/splitfields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod inner {
Some((self.v, need_escaping))
}

fn eof_oel(&self, current_ch: u8) -> bool {
fn eof_eol(&self, current_ch: u8) -> bool {
current_ch == self.separator || current_ch == self.eol_char
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ mod inner {
in_field = !in_field;
}

if !in_field && self.eof_oel(c) {
if !in_field && self.eof_eol(c) {
if c == self.eol_char {
// SAFETY:
// we are in bounds
Expand All @@ -109,7 +109,7 @@ mod inner {

idx as usize
} else {
match self.v.iter().position(|&c| self.eof_oel(c)) {
match self.v.iter().position(|&c| self.eof_eol(c)) {
None => return self.finish(needs_escaping),
Some(idx) => unsafe {
// SAFETY:
Expand Down Expand Up @@ -202,7 +202,7 @@ mod inner {
Some((self.v, need_escaping))
}

fn eof_oel(&self, current_ch: u8) -> bool {
fn eof_eol(&self, current_ch: u8) -> bool {
current_ch == self.separator || current_ch == self.eol_char
}
}
Expand Down Expand Up @@ -255,6 +255,7 @@ mod inner {
// SAFETY:
// we have checked bounds
let pos = if self.quoting && unsafe { *self.v.get_unchecked(0) } == self.quote_char {
// Start of an enclosed field
let mut total_idx = 0;
needs_escaping = true;
let mut not_in_field_previous_iter = true;
Expand Down Expand Up @@ -324,7 +325,7 @@ mod inner {
in_field = !in_field;
}

if !in_field && self.eof_oel(c) {
if !in_field && self.eof_eol(c) {
if c == self.eol_char {
// SAFETY:
// we are in bounds
Expand Down Expand Up @@ -352,6 +353,7 @@ mod inner {
}
total_idx
} else {
// Start of an unenclosed field
let mut total_idx = 0;

loop {
Expand All @@ -376,7 +378,7 @@ mod inner {
total_idx += SIMD_SIZE;
}
} else {
match bytes.iter().position(|&c| self.eof_oel(c)) {
match bytes.iter().position(|&c| self.eof_eol(c)) {
None => return self.finish(needs_escaping),
Some(idx) => {
total_idx += idx;
Expand Down
21 changes: 20 additions & 1 deletion crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use polars_core::StringCacheHolder;
use polars_core::prelude::{Column, Field};
use polars_core::schema::{SchemaExt, SchemaRef};
use polars_error::{PolarsResult, polars_bail, polars_err};
use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};
use polars_io::RowIndex;
use polars_io::cloud::CloudOptions;
use polars_io::prelude::_csv_read_internal::{
Expand Down Expand Up @@ -679,6 +679,25 @@ impl ChunkReader {
let height = df.height();
let n_lines_is_correct = df.height() == n_lines;

// Check malformed
if df.height() > n_lines
|| (df.height() < n_lines && self.parse_options.comment_prefix.is_none())
{
// Note: in case data is malformed, df.height() is more likely to be correct than n_lines.
let msg = format!(
"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",
n_lines,
df.height(),
chunk_row_offset,
chunk.len()
);
if self.ignore_errors {
polars_warn!(msg);
} else {
polars_bail!(ComputeError: msg);
}
}

if slice != NO_SLICE {
assert!(slice != SLICE_ENDED);
assert!(n_lines_is_correct);
Expand Down
3 changes: 3 additions & 0 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def read_csv(
r"""
Read a CSV file into a DataFrame.

Polars expects CSV data to strictly conform to RFC 4180, unless documented
otherwise. Malformed data, though common, may lead to undefined behavior.

.. versionchanged:: 0.20.31
The `dtypes` parameter was renamed `schema_overrides`.
.. versionchanged:: 0.20.4
Expand Down
25 changes: 25 additions & 0 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2605,3 +2605,28 @@ def test_csv_write_scalar_empty_chunk_20273(filter_value: int, expected: str) ->
df2 = pl.DataFrame({"c": [99]})
df3 = df1.join(df2, how="cross").filter(pl.col("a").eq(filter_value))
assert df3.write_csv() == expected


def test_csv_malformed_quote_in_unenclosed_field_22395() -> None:
# Note - the malformed detection logic is very basic, and fails to detect many
# types at this point (for eaxample: 'a,b"c,x"y' will not be detected).
# Below is a one pattern that will be flagged (odd number of quotes in a row).
malformed = b"""\
a,b,x"y
a,x"y,c
x"y,b,c
"""
# short: non-SIMD code path
with pytest.raises(pl.exceptions.ComputeError):
pl.read_csv(malformed, has_header=False)
with pytest.raises(pl.exceptions.ComputeError):
pl.scan_csv(malformed, has_header=False).collect()
with pytest.warns(UserWarning):
pl.read_csv(malformed, has_header=False, ignore_errors=True)

# long: trigger SIMD code path (> 64 bytes)
malformed_long = malformed + ("k,l,m\n" * 10).encode()
with pytest.raises(pl.exceptions.ComputeError):
pl.read_csv(malformed_long, has_header=False)
with pytest.raises(pl.exceptions.ComputeError):
pl.scan_csv(malformed_long, has_header=False).collect()
Loading