Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ bytes = "1.11.0"
chrono = "0.4.38"
chrono-tz = "0.10.4"
comfy-table = "7.1.1"
csv = "1.3"
common-daft-config = {path = "src/common/daft-config"}
common-display = {path = "src/common/display", default-features = false}
common-error = {path = "src/common/error", default-features = false}
Expand Down
1 change: 0 additions & 1 deletion src/daft-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ arrow = {workspace = true}
arrow-row = {workspace = true}
bincode = {workspace = true}
bytemuck = {version = "1", features = ["derive"]}
daft-arrow = {path = "../daft-arrow"}
chrono = {workspace = true}
chrono-tz = {workspace = true}
comfy-table = {workspace = true}
Expand Down
1 change: 0 additions & 1 deletion src/daft-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! This module re-exports commonly used items from the Daft core library.

pub use daft_arrow::bitmap;
pub use daft_schema::image_property::ImageProperty;
// Re-export core series structures
pub use daft_schema::schema::{Schema, SchemaRef};
Expand Down
1 change: 0 additions & 1 deletion src/daft-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod arrow;
pub mod display;
pub mod identity_hash_set;
pub(crate) mod ord;
Expand Down
4 changes: 3 additions & 1 deletion src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
[dependencies]
arrow-schema = {workspace = true}
async-compat = {workspace = true}
daft-arrow = {path = "../daft-arrow"}
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-py-serde = {path = "../common/py-serde", default-features = false}
common-runtime = {path = "../common/runtime", default-features = false}
csv = {workspace = true}
csv-async = "1.3.0"
daft-compression = {path = "../daft-compression", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-decoding = {path = "../daft-decoding"}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-recordbatch = {path = "../daft-recordbatch", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
futures = {workspace = true}
memchr = "2.7.6"
parking_lot = {workspace = true}
Expand Down
6 changes: 3 additions & 3 deletions src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(deprecated, reason = "arrow2 migration")]

use common_error::DaftError;
use snafu::Snafu;

Expand All @@ -23,13 +21,15 @@ pub enum Error {
IOError { source: daft_io::Error },
#[snafu(display("{source}"))]
CSVError { source: csv_async::Error },
#[snafu(display("{source}"))]
SyncCSVError { source: csv::Error },
#[snafu(display("Invalid char: {}", val))]
WrongChar {
source: std::char::TryFromCharError,
val: char,
},
#[snafu(display("{source}"))]
ArrowError { source: daft_arrow::error::Error },
ArrowError { source: arrow_schema::ArrowError },
#[snafu(display("Error joining spawned task: {}", source))]
JoinError { source: tokio::task::JoinError },
#[snafu(display(
Expand Down
99 changes: 60 additions & 39 deletions src/daft-csv/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
use core::str;
use std::{io::Read, num::NonZeroUsize, sync::Arc};

use arrow_schema::Field as ArrowField;
use common_error::DaftResult;
use daft_arrow::{
datatypes::Field,
io::csv::{
read::{Reader, ReaderBuilder},
read_async::local_read_rows,
},
};
use daft_core::{
prelude::{Schema, Series},
utils::arrow::cast_array_for_daft_if_needed,
};
use daft_core::prelude::{Schema, Series};
use daft_decoding::deserialize::deserialize_column;
use daft_dsl::{Expr, expr::bound_expr::BoundExpr, optimization::get_required_columns};
use daft_io::{IOClient, IOStatsRef};
Expand All @@ -26,7 +17,7 @@ use smallvec::SmallVec;
use snafu::ResultExt;

use crate::{
ArrowSnafu, CsvConvertOptions, CsvParseOptions, CsvReadOptions, JoinSnafu,
CsvConvertOptions, CsvParseOptions, CsvReadOptions, JoinSnafu, SyncCSVSnafu,
metadata::read_csv_schema_single,
read::{fields_to_projection_indices, tables_concat},
};
Expand Down Expand Up @@ -176,7 +167,9 @@ pub async fn read_csv_local(
io_stats,
)
.await?;
return Ok(RecordBatch::empty(Some(Arc::new(schema.into()))));
return Ok(RecordBatch::empty(Some(Arc::new(Schema::try_from(
&schema,
)?))));
}
let concated_table = tables_concat(collected_tables)?;

Expand Down Expand Up @@ -235,12 +228,19 @@ pub async fn stream_csv_local(
let (schema, estimated_mean_row_size, estimated_std_row_size) =
get_schema_and_estimators(uri, &convert_options, &parse_options, io_client, io_stats)
.await?;
let num_fields = schema.fields.len();
let fields: Vec<ArrowField> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
let num_fields = fields.len();
let projection_indices =
fields_to_projection_indices(&schema.fields, &convert_options.clone().include_columns);
fields_to_projection_indices(&fields, &convert_options.clone().include_columns);
let fields_subset = projection_indices
.iter()
.map(|i| schema.fields.get(*i).unwrap().into())
.map(|i| {
let f = fields.get(*i).unwrap();
daft_core::datatypes::Field::new(
f.name(),
daft_schema::dtype::DataType::try_from(f.data_type()).unwrap(),
)
})
.collect::<Vec<daft_core::datatypes::Field>>();
let read_schema = Arc::new(Schema::new(fields_subset));
let read_daft_fields = Arc::new(
Expand Down Expand Up @@ -282,7 +282,7 @@ pub async fn stream_csv_local(
projection_indices,
read_daft_fields,
read_schema,
schema.fields,
fields,
include_columns,
predicate,
limit,
Expand All @@ -297,7 +297,7 @@ async fn get_schema_and_estimators(
parse_options: &CsvParseOptions,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<(daft_arrow::datatypes::Schema, f64, f64)> {
) -> DaftResult<(arrow_schema::Schema, f64, f64)> {
let (inferred_schema, read_stats) = read_csv_schema_single(
uri,
parse_options.clone(),
Expand All @@ -309,21 +309,23 @@ async fn get_schema_and_estimators(
.await?;

let mut schema = if let Some(schema) = convert_options.schema.clone() {
schema.to_arrow2()?
schema.to_arrow()?
} else {
inferred_schema.to_arrow2()?
inferred_schema.to_arrow()?
};
// Rename fields, if necessary.
if let Some(column_names) = convert_options.column_names.clone() {
schema = schema
.fields
.into_iter()
.zip(column_names.iter())
.map(|(field, name)| {
Field::new(name, field.data_type, field.is_nullable).with_metadata(field.metadata)
})
.collect::<Vec<_>>()
.into();
schema = arrow_schema::Schema::new(
schema
.fields()
.iter()
.zip(column_names.iter())
.map(|(field, name)| {
ArrowField::new(name, field.data_type().clone(), field.is_nullable())
.with_metadata(field.metadata().clone())
})
.collect::<Vec<_>>(),
);
}
Ok((
schema,
Expand Down Expand Up @@ -546,7 +548,7 @@ fn stream_csv_as_tables(
projection_indices: Arc<Vec<usize>>,
read_daft_fields: Arc<Vec<Arc<daft_core::datatypes::Field>>>,
read_schema: Arc<Schema>,
fields: Vec<Field>,
fields: Vec<ArrowField>,
include_columns: Option<Vec<String>>,
predicate: Option<Arc<Expr>>,
limit: Option<usize>,
Expand Down Expand Up @@ -842,7 +844,7 @@ fn collect_tables<R>(
parse_options: &CsvParseOptions,
byte_reader: R,
projection_indices: Arc<Vec<usize>>,
fields: Vec<Field>,
fields: Vec<ArrowField>,
read_daft_fields: Arc<Vec<Arc<daft_core::datatypes::Field>>>,
read_schema: Arc<Schema>,
csv_buffer: &mut CsvBuffer,
Expand All @@ -853,7 +855,7 @@ fn collect_tables<R>(
where
R: std::io::Read,
{
let rdr = ReaderBuilder::new()
let rdr = csv::ReaderBuilder::new()
.has_headers(has_header)
.delimiter(parse_options.delimiter)
.double_quote(parse_options.double_quote)
Expand All @@ -877,12 +879,34 @@ where
)
}

/// Reads CSV rows from a sync reader into the provided byte records buffer.
/// Returns (rows_read, has_more).
fn local_read_rows<R: std::io::Read>(
reader: &mut csv::Reader<R>,
rows: &mut [csv::ByteRecord],
limit: Option<usize>,
) -> Result<(usize, bool), csv::Error> {
let mut row_number = 0;
let mut has_more = true;
for row in rows.iter_mut() {
if matches!(limit, Some(limit) if row_number >= limit) {
break;
}
has_more = reader.read_byte_record(row)?;
if !has_more {
break;
}
row_number += 1;
}
Ok((row_number, has_more))
}

/// Helper function that consumes a CSV reader and turns it into a vector of Daft tables.
#[allow(clippy::too_many_arguments)]
fn parse_csv_chunk<R>(
mut reader: Reader<R>,
mut reader: csv::Reader<R>,
projection_indices: Arc<Vec<usize>>,
fields: Vec<daft_arrow::datatypes::Field>,
fields: Vec<ArrowField>,
read_daft_fields: Arc<Vec<Arc<daft_core::datatypes::Field>>>,
read_schema: Arc<Schema>,
csv_buffer: &mut CsvBuffer,
Expand All @@ -898,7 +922,7 @@ where
loop {
let (rows_read, has_more) =
local_read_rows(&mut reader, csv_buffer.buffer.as_mut_slice(), local_limit)
.context(ArrowSnafu {})?;
.context(SyncCSVSnafu {})?;
let chunk = projection_indices
.par_iter()
.enumerate()
Expand All @@ -909,10 +933,7 @@ where
fields[*proj_idx].data_type().clone(),
0,
);
Series::from_arrow(
read_daft_fields[i].clone(),
cast_array_for_daft_if_needed(deserialized_col?).into(),
)
Series::from_arrow(read_daft_fields[i].clone(), deserialized_col?)
})
.collect::<DaftResult<Vec<Series>>>()?;
let num_rows = chunk.first().map(|s| s.len()).unwrap_or(0);
Expand Down
7 changes: 3 additions & 4 deletions src/daft-csv/src/local/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@ use std::{
sync::{Arc, Weak},
};

use daft_arrow::io::csv::read;
use parking_lot::{Mutex, RwLock};

// The default size of a slab used for reading CSV files in chunks. Currently set to 4 MiB. This can be tuned.
pub const SLABSIZE: usize = 4 * 1024 * 1024;

#[derive(Clone, Debug, Default)]
pub struct CsvSlab(Vec<read::ByteRecord>);
pub struct CsvSlab(Vec<csv::ByteRecord>);

impl CsvSlab {
fn new(record_size: usize, num_fields: usize, num_rows: usize) -> Self {
Self(vec![
read::ByteRecord::with_capacity(record_size, num_fields);
csv::ByteRecord::with_capacity(record_size, num_fields);
num_rows
])
}
}

impl Deref for CsvSlab {
type Target = Vec<read::ByteRecord>;
type Target = Vec<csv::ByteRecord>;

fn deref(&self) -> &Self::Target {
&self.0
Expand Down
Loading
Loading