Skip to content

Commit 7416b7a

Browse files
refactor
1 parent 5d82a1a commit 7416b7a

File tree

4 files changed

+144
-107
lines changed

4 files changed

+144
-107
lines changed

src/parseable/staging/reader.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,26 @@ impl MergedRecordReader {
4848

4949
for file in files {
5050
//remove empty files before reading
51-
if file.metadata().unwrap().len() == 0 {
52-
error!("Invalid file detected, removing it: {:?}", file);
53-
remove_file(file).unwrap();
54-
} else {
55-
let Ok(reader) =
56-
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
57-
else {
58-
error!("Invalid file detected, ignoring it: {:?}", file);
51+
match file.metadata() {
52+
Err(err) => {
53+
error!("Error when trying to read file: {file:?}; error = {err}");
54+
continue;
55+
}
56+
Ok(metadata) if metadata.len() == 0 => {
57+
error!("Empty file detected, removing it: {:?}", file);
58+
remove_file(file).unwrap();
5959
continue;
60-
};
60+
}
61+
Ok(_) => {
62+
let Ok(reader) =
63+
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
64+
else {
65+
error!("Invalid file detected, ignoring it: {:?}", file);
66+
continue;
67+
};
6168

62-
readers.push(reader);
69+
readers.push(reader);
70+
}
6371
}
6472
}
6573

src/parseable/streams.rs

Lines changed: 107 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,16 @@ use super::{
6767
};
6868

6969
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
70-
fn arrow_path_to_parquet(staging_path: &Path, path: &Path, random_string: &str) -> Option<PathBuf> {
70+
fn arrow_path_to_parquet(
71+
stream_staging_path: &Path,
72+
path: &Path,
73+
random_string: &str,
74+
) -> Option<PathBuf> {
7175
let filename = path.file_stem()?.to_str()?;
7276
let (_, front) = filename.split_once('.')?;
7377
assert!(front.contains('.'), "contains the delim `.`");
7478
let filename_with_random_number = format!("{front}.{random_string}.parquet");
75-
let mut parquet_path = staging_path.to_owned();
79+
let mut parquet_path = stream_staging_path.to_owned();
7680
parquet_path.push(filename_with_random_number);
7781
Some(parquet_path)
7882
}
@@ -345,9 +349,10 @@ impl Stream {
345349
arrow_files.retain(|path| {
346350
let creation = path
347351
.metadata()
348-
.expect("Arrow file should exist on disk")
349-
.created()
350-
.expect("Creation time should be accessible");
352+
.ok()
353+
.and_then(|meta| meta.created().or_else(|_| meta.modified()).ok())
354+
.expect("Arrow file should have a valid creation or modified time");
355+
351356
// Compare if creation time is actually from previous minute
352357
minute_from_system_time(creation) < minute_from_system_time(exclude)
353358
});
@@ -594,7 +599,7 @@ impl Stream {
594599
.values()
595600
.map(|v| {
596601
v.iter()
597-
.map(|file| file.metadata().unwrap().len())
602+
.filter_map(|file| file.metadata().ok().map(|meta| meta.len()))
598603
.sum::<u64>()
599604
})
600605
.sum::<u64>();
@@ -624,92 +629,129 @@ impl Stream {
624629
return Ok(None);
625630
}
626631

627-
//find sum of arrow files in staging directory for a stream
628632
self.update_staging_metrics(&staging_files);
629633

630-
// warn!("staging files-\n{staging_files:?}\n");
631634
for (parquet_path, arrow_files) in staging_files {
632635
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
633636
if record_reader.readers.is_empty() {
634637
continue;
635638
}
636639
let merged_schema = record_reader.merged_schema();
637-
638640
let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition);
639641
schemas.push(merged_schema.clone());
640642
let schema = Arc::new(merged_schema);
641-
let mut part_path = parquet_path.to_owned();
642-
part_path.set_extension("part");
643-
let mut part_file = OpenOptions::new()
644-
.create(true)
645-
.append(true)
646-
.open(&part_path)
647-
.map_err(|_| StagingError::Create)?;
648-
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?;
649-
for ref record in record_reader.merged_iter(schema, time_partition.cloned()) {
650-
writer.write(record)?;
651-
}
652-
writer.close()?;
653643

654-
if part_file.metadata().expect("File was just created").len()
655-
< parquet::file::FOOTER_SIZE as u64
656-
{
657-
error!(
658-
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
659-
&self.stream_name
660-
);
661-
remove_file(part_path).expect("File should be removable if it is invalid");
644+
let part_path = parquet_path.with_extension("part");
645+
if !self.write_parquet_part_file(
646+
&part_path,
647+
record_reader,
648+
&schema,
649+
&props,
650+
time_partition,
651+
)? {
662652
continue;
663653
}
664-
trace!("Parquet file successfully constructed");
665654

666-
if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
655+
if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) {
667656
error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}");
668657
} else {
669-
// delete the files that were grouped to create parquet file
670-
for (i, file) in arrow_files.iter().enumerate() {
671-
match file.metadata() {
672-
Ok(meta) => {
673-
let file_size = meta.len();
674-
match remove_file(file) {
675-
Ok(_) => {
676-
metrics::STORAGE_SIZE
677-
.with_label_values(&[
678-
"staging",
679-
&self.stream_name,
680-
ARROW_FILE_EXTENSION,
681-
])
682-
.sub(file_size as i64);
683-
}
684-
Err(e) => {
685-
warn!("Failed to delete file {}: {e}", file.display());
686-
}
687-
}
658+
self.cleanup_arrow_files_and_dir(&arrow_files);
659+
}
660+
}
661+
662+
if schemas.is_empty() {
663+
return Ok(None);
664+
}
665+
666+
Ok(Some(Schema::try_merge(schemas).unwrap()))
667+
}
668+
669+
fn write_parquet_part_file(
670+
&self,
671+
part_path: &Path,
672+
record_reader: MergedReverseRecordReader,
673+
schema: &Arc<Schema>,
674+
props: &WriterProperties,
675+
time_partition: Option<&String>,
676+
) -> Result<bool, StagingError> {
677+
let mut part_file = OpenOptions::new()
678+
.create(true)
679+
.append(true)
680+
.open(part_path)
681+
.map_err(|_| StagingError::Create)?;
682+
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
683+
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
684+
writer.write(record)?;
685+
}
686+
writer.close()?;
687+
688+
if part_file.metadata().expect("File was just created").len()
689+
< parquet::file::FOOTER_SIZE as u64
690+
{
691+
error!(
692+
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
693+
&self.stream_name
694+
);
695+
remove_file(part_path).expect("File should be removable if it is invalid");
696+
return Ok(false);
697+
}
698+
trace!("Parquet file successfully constructed");
699+
Ok(true)
700+
}
701+
702+
fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> {
703+
std::fs::rename(part_path, parquet_path)
704+
}
705+
706+
fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) {
707+
for (i, file) in arrow_files.iter().enumerate() {
708+
match file.metadata() {
709+
Ok(meta) => {
710+
let file_size = meta.len();
711+
match remove_file(file) {
712+
Ok(_) => {
713+
metrics::STORAGE_SIZE
714+
.with_label_values(&[
715+
"staging",
716+
&self.stream_name,
717+
ARROW_FILE_EXTENSION,
718+
])
719+
.sub(file_size as i64);
688720
}
689-
Err(err) => {
690-
warn!("File ({}) not found; Error = {err}", file.display());
721+
Err(e) => {
722+
warn!("Failed to delete file {}: {e}", file.display());
691723
}
692724
}
725+
}
726+
Err(err) => {
727+
warn!("File ({}) not found; Error = {err}", file.display());
728+
}
729+
}
693730

694-
// After deleting the last file, try to remove the inprocess directory
695-
if i == arrow_files.len() - 1 {
696-
if let Some(parent_dir) = file.parent() {
697-
if let Err(err) = fs::remove_dir(parent_dir) {
698-
warn!(
699-
"Failed to remove inprocess directory {}: {err}",
700-
parent_dir.display()
701-
);
731+
// After deleting the last file, try to remove the inprocess directory if empty
732+
if i == arrow_files.len() - 1 {
733+
if let Some(parent_dir) = file.parent() {
734+
match fs::read_dir(parent_dir) {
735+
Ok(mut entries) => {
736+
if entries.next().is_none() {
737+
if let Err(err) = fs::remove_dir(parent_dir) {
738+
warn!(
739+
"Failed to remove inprocess directory {}: {err}",
740+
parent_dir.display()
741+
);
742+
}
702743
}
703744
}
745+
Err(err) => {
746+
warn!(
747+
"Failed to read inprocess directory {}: {err}",
748+
parent_dir.display()
749+
);
750+
}
704751
}
705752
}
706753
}
707754
}
708-
if schemas.is_empty() {
709-
return Ok(None);
710-
}
711-
712-
Ok(Some(Schema::try_merge(schemas).unwrap()))
713755
}
714756

715757
pub fn updated_schema(&self, current_schema: Schema) -> Schema {

src/storage/object_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,15 +883,15 @@ pub fn sync_all_streams(joinset: &mut JoinSet<Result<(), ObjectStorageError>>) {
883883
let start = Instant::now();
884884
info!("Starting object_store_sync for stream- {stream_name}");
885885
let result = object_store.upload_files_from_staging(&stream_name).await;
886-
if let Err(e) = result {
886+
if let Err(ref e) = result {
887887
error!("Failed to upload files from staging for stream {stream_name}: {e}");
888888
} else {
889889
info!(
890890
"Completed object_store_sync for stream- {stream_name} in {} ms",
891891
start.elapsed().as_millis()
892892
);
893893
}
894-
Ok(())
894+
result
895895
});
896896
}
897897
}

src/sync.rs

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ pub fn object_store_sync() -> (
134134
sync_all_streams(&mut joinset)
135135
},
136136
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
137-
match res {
138-
Ok(Ok(_)) => info!("Successfully uploaded files to object store."),
139-
Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"),
140-
Err(err) => error!("Issue joining object store sync task: {err}"),
141-
}
137+
log_join_result(res, "object store sync");
142138
},
143139
res = &mut inbox_rx => {
144140
match res {
@@ -153,11 +149,7 @@ pub fn object_store_sync() -> (
153149
}
154150
// Drain remaining joinset tasks
155151
while let Some(res) = joinset.join_next().await {
156-
match res {
157-
Ok(Ok(_)) => info!("Successfully uploaded files to object store."),
158-
Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"),
159-
Err(err) => error!("Issue joining object store sync task: {err}"),
160-
}
152+
log_join_result(res, "object store sync");
161153
}
162154
}));
163155

@@ -202,11 +194,7 @@ pub fn local_sync() -> (
202194
},
203195
// Joins and logs errors in spawned tasks
204196
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
205-
match res {
206-
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
207-
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
208-
Err(err) => error!("Issue joining flush+conversion task: {err}"),
209-
}
197+
log_join_result(res, "flush and convert");
210198
}
211199
res = &mut inbox_rx => {match res{
212200
Ok(_) => break,
@@ -220,11 +208,7 @@ pub fn local_sync() -> (
220208

221209
// Drain remaining joinset tasks
222210
while let Some(res) = joinset.join_next().await {
223-
match res {
224-
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
225-
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
226-
Err(err) => error!("Issue joining flush+conversion task: {err}"),
227-
}
211+
log_join_result(res, "flush and convert");
228212
}
229213
}));
230214

@@ -251,25 +235,28 @@ pub async fn sync_start() -> anyhow::Result<()> {
251235
.streams
252236
.flush_and_convert(&mut local_sync_joinset, true, false);
253237
while let Some(res) = local_sync_joinset.join_next().await {
254-
match res {
255-
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
256-
Ok(Err(err)) => return Err(err.into()),
257-
Err(err) => error!("Failed to join async task: {err}"),
258-
}
238+
log_join_result(res, "flush and convert");
259239
}
260240

261241
let mut object_store_joinset = JoinSet::new();
262242
sync_all_streams(&mut object_store_joinset);
263243
while let Some(res) = object_store_joinset.join_next().await {
264-
match res {
265-
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
266-
Ok(Err(err)) => return Err(err.into()),
267-
Err(err) => error!("Failed to join async task: {err}"),
268-
}
244+
log_join_result(res, "object store sync");
269245
}
270246
Ok(())
271247
}
272248

249+
fn log_join_result<T, E>(res: Result<Result<T, E>, tokio::task::JoinError>, context: &str)
250+
where
251+
E: std::fmt::Debug,
252+
{
253+
match res {
254+
Ok(Ok(_)) => info!("Successfully completed {context}."),
255+
Ok(Err(err)) => warn!("Failed to complete {context}. {err:?}"),
256+
Err(err) => error!("Issue joining {context} task: {err}"),
257+
}
258+
}
259+
273260
/// A separate runtime for running all alert tasks
274261
#[tokio::main(flavor = "multi_thread")]
275262
pub async fn alert_runtime(mut rx: mpsc::Receiver<AlertTask>) -> Result<(), anyhow::Error> {

0 commit comments

Comments
 (0)