Skip to content

Commit 4023b56

Browse files
authored
fix: issues cause by prev commit
1 parent c33d8e9 commit 4023b56

File tree

6 files changed

+40
-29
lines changed

6 files changed

+40
-29
lines changed

server/src/event.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl Event {
7474
&key,
7575
self.rb.clone(),
7676
self.parsed_timestamp,
77-
self.custom_partition_values,
77+
&self.custom_partition_values,
7878
)?;
7979

8080
metadata::STREAM_INFO.update_stats(
@@ -96,18 +96,17 @@ impl Event {
9696
Ok(())
9797
}
9898

99-
pub fn process_unchecked(self) -> Result<Self, PostError> {
99+
pub fn process_unchecked(&self) -> Result<(), PostError> {
100100
let key = get_schema_key(&self.rb.schema().fields);
101101

102102
Self::process_event(
103103
&self.stream_name,
104104
&key,
105105
self.rb.clone(),
106106
self.parsed_timestamp,
107+
&self.custom_partition_values,
107108
)
108-
.map_err(PostError::Event)?;
109-
110-
Ok(self)
109+
.map_err(PostError::Event)
111110
}
112111

113112
pub fn clear(&self, stream_name: &str) {
@@ -121,14 +120,14 @@ impl Event {
121120
schema_key: &str,
122121
rb: RecordBatch,
123122
parsed_timestamp: NaiveDateTime,
124-
custom_partition_values: HashMap<String, String>,
123+
custom_partition_values: &HashMap<String, String>,
125124
) -> Result<(), EventError> {
126125
STREAM_WRITERS.append_to_local(
127126
stream_name,
128127
schema_key,
129128
rb,
130129
parsed_timestamp,
131-
custom_partition_values,
130+
custom_partition_values.clone(),
132131
)?;
133132
Ok(())
134133
}

server/src/event/writer.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Writer {
5353
schema_key: &str,
5454
rb: RecordBatch,
5555
parsed_timestamp: NaiveDateTime,
56-
custom_partition_values: HashMap<String, String>,
56+
custom_partition_values: &HashMap<String, String>,
5757
) -> Result<(), StreamWriterError> {
5858
let rb = utils::arrow::replace_columns(
5959
rb.schema(),
@@ -102,15 +102,22 @@ impl WriterTable {
102102
schema_key,
103103
record,
104104
parsed_timestamp,
105-
custom_partition_values,
105+
&custom_partition_values,
106106
)?;
107107
}
108108
None => {
109109
drop(hashmap_guard);
110110
let map = self.write().unwrap();
111111
// check for race condition
112112
// if map contains entry then just
113-
self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?;
113+
self.handle_missing_writer(
114+
map,
115+
stream_name,
116+
schema_key,
117+
record,
118+
parsed_timestamp,
119+
&custom_partition_values,
120+
)?;
114121
}
115122
};
116123
Ok(())
@@ -123,13 +130,15 @@ impl WriterTable {
123130
schema_key: &str,
124131
record: RecordBatch,
125132
parsed_timestamp: NaiveDateTime,
133+
custom_partition_values: &HashMap<String, String>,
126134
) -> Result<(), StreamWriterError> {
127135
if CONFIG.parseable.mode != Mode::Query {
128136
stream_writer.lock().unwrap().push(
129137
stream_name,
130138
schema_key,
131139
record,
132140
parsed_timestamp,
141+
custom_partition_values,
133142
)?;
134143
} else {
135144
stream_writer
@@ -148,6 +157,7 @@ impl WriterTable {
148157
schema_key: &str,
149158
record: RecordBatch,
150159
parsed_timestamp: NaiveDateTime,
160+
custom_partition_values: &HashMap<String, String>,
151161
) -> Result<(), StreamWriterError> {
152162
match map.get(stream_name) {
153163
Some(writer) => {

server/src/event/writer/file_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl FileWriter {
4545
schema_key: &str,
4646
record: &RecordBatch,
4747
parsed_timestamp: NaiveDateTime,
48-
custom_partition_values: HashMap<String, String>,
48+
custom_partition_values: &HashMap<String, String>,
4949
) -> Result<(), StreamWriterError> {
5050
match self.get_mut(schema_key) {
5151
Some(writer) => {
@@ -89,7 +89,7 @@ fn init_new_stream_writer_file(
8989
schema_key: &str,
9090
record: &RecordBatch,
9191
parsed_timestamp: NaiveDateTime,
92-
custom_partition_values: HashMap<String, String>,
92+
custom_partition_values: &HashMap<String, String>,
9393
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
9494
let dir = StorageDir::new(stream_name);
9595
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);

server/src/handlers/http/ingest.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,19 @@ pub async fn push_logs_unchecked(
102102
batches: RecordBatch,
103103
stream_name: &str,
104104
) -> Result<event::Event, PostError> {
105-
event::Event {
105+
let unchecked_event = event::Event {
106106
rb: batches,
107107
stream_name: stream_name.to_string(),
108108
origin_format: "json",
109109
origin_size: 0,
110110
parsed_timestamp: Utc::now().naive_utc(),
111111
time_partition: None,
112-
is_first_event: true, // NOTE: Maybe should be false
113-
}
114-
.process_unchecked()
112+
is_first_event: true, // NOTE: Maybe should be false
113+
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
114+
};
115+
unchecked_event.process_unchecked()?;
116+
117+
Ok(unchecked_event)
115118
}
116119

117120
async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {

server/src/storage/staging.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use std::{
5252
};
5353

5454
const ARROW_FILE_EXTENSION: &str = "data.arrows";
55-
const PARQUET_FILE_EXTENSION: &str = "data.parquet";
55+
// const PARQUET_FILE_EXTENSION: &str = "data.parquet";
5656

5757
#[derive(Debug)]
5858
pub struct StorageDir {
@@ -68,7 +68,7 @@ impl StorageDir {
6868

6969
pub fn file_time_suffix(
7070
time: NaiveDateTime,
71-
custom_partition_values: HashMap<String, String>,
71+
custom_partition_values: &HashMap<String, String>,
7272
extention: &str,
7373
) -> String {
7474
let mut uri = utils::date_to_prefix(time.date())
@@ -90,7 +90,7 @@ impl StorageDir {
9090
fn filename_by_time(
9191
stream_hash: &str,
9292
time: NaiveDateTime,
93-
custom_partition_values: HashMap<String, String>,
93+
custom_partition_values: &HashMap<String, String>,
9494
) -> String {
9595
format!(
9696
"{}.{}",
@@ -102,7 +102,7 @@ impl StorageDir {
102102
fn filename_by_current_time(
103103
stream_hash: &str,
104104
parsed_timestamp: NaiveDateTime,
105-
custom_partition_values: HashMap<String, String>,
105+
custom_partition_values: &HashMap<String, String>,
106106
) -> String {
107107
Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values)
108108
}
@@ -111,7 +111,7 @@ impl StorageDir {
111111
&self,
112112
stream_hash: &str,
113113
parsed_timestamp: NaiveDateTime,
114-
custom_partition_values: HashMap<String, String>,
114+
custom_partition_values: &HashMap<String, String>,
115115
) -> PathBuf {
116116
let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string();
117117
let mut filename =
@@ -201,13 +201,12 @@ impl StorageDir {
201201
}
202202
}
203203

204-
#[allow(unused)]
205-
pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
206-
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
207-
let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION);
208-
209-
data_path.join(dir)
210-
}
204+
// pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
205+
// let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
206+
// let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION);
207+
//
208+
// data_path.join(dir)
209+
// }
211210

212211
pub fn convert_disk_files_to_parquet(
213212
stream: &str,

server/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub fn date_to_prefix(date: NaiveDate) -> String {
6363
date.replace("UTC", "")
6464
}
6565

66-
pub fn custom_partition_to_prefix(custom_partition: HashMap<String, String>) -> String {
66+
pub fn custom_partition_to_prefix(custom_partition: &HashMap<String, String>) -> String {
6767
let mut prefix = String::default();
6868
for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) {
6969
prefix.push_str(&format!("{key}={value}/", key = key, value = value));

0 commit comments

Comments
 (0)