Skip to content

Commit 5f15ea0

Browse files
committed
chore: clean up
1 parent e1ee279 commit 5f15ea0

File tree

12 files changed

+33
-25
lines changed

12 files changed

+33
-25
lines changed

server/src/catalog.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ async fn create_manifest(
230230
.ok_or(IOError::new(
231231
ErrorKind::Other,
232232
"Failed to create upper bound for manifest",
233-
))
234-
.map_err(ObjectStorageError::IoError)?,
233+
))?,
235234
)
236235
.and_utc();
237236

server/src/event/writer/file_writer.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ impl FileWriter {
4949
) -> Result<(), StreamWriterError> {
5050
match self.get_mut(schema_key) {
5151
Some(writer) => {
52-
writer
53-
.writer
54-
.write(record)
55-
.map_err(StreamWriterError::Writer)?;
52+
writer.writer.write(record)?;
5653
}
5754
// entry is not present thus we create it
5855
None => {
@@ -100,8 +97,6 @@ fn init_new_stream_writer_file(
10097
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
10198
.expect("File and RecordBatch both are checked");
10299

103-
stream_writer
104-
.write(record)
105-
.map_err(StreamWriterError::Writer)?;
100+
stream_writer.write(record)?;
106101
Ok((path, stream_writer))
107102
}

server/src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,12 +456,12 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
456456
.await;
457457

458458
if let Ok(res) = res {
459-
let text = res.text().await.map_err(PostError::NetworkError)?;
459+
let text = res.text().await?;
460460
let lines: Vec<Result<String, std::io::Error>> =
461461
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
462462

463463
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
464-
.map_err(|err| PostError::CustomError(err.to_string()))?
464+
.map_err(PostError::Error)?
465465
.samples;
466466

467467
dresses.push(Metrics::from_prometheus_samples(

server/src/handlers/http/ingest.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
124124
let object_store_format = glob_storage
125125
.get_object_store_format(&stream_name)
126126
.await
127-
.map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?;
127+
.map_err(|_| PostError::StreamNotFound(stream_name.clone()))?;
128128

129129
let time_partition = object_store_format.time_partition;
130130
let time_partition_limit = object_store_format.time_partition_limit;
@@ -387,6 +387,9 @@ pub enum PostError {
387387
#[error("{0}")]
388388
CreateStream(#[from] CreateStreamError),
389389
#[error("Error: {0}")]
390+
Error(std::io::Error),
391+
#[allow(unused)]
392+
#[error("Error: {0}")]
390393
CustomError(String),
391394
#[error("Error: {0}")]
392395
NetworkError(#[from] reqwest::Error),
@@ -415,6 +418,7 @@ impl actix_web::ResponseError for PostError {
415418
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
416419
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
417420
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
421+
PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR,
418422
}
419423
}
420424

server/src/handlers/http/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::utils::actix::extract_session_key_from_req;
5353
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5454
#[serde(rename_all = "camelCase")]
5555
pub struct Query {
56-
pub pub query: String,
56+
pub query: String,
5757
pub start_time: String,
5858
pub end_time: String,
5959
#[serde(default)]

server/src/handlers/http/users/dashboards.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
9898
// version: String,
9999
// name: String,
100100
// id: String,
101-
// time-filter: `type_not_defined`
101+
// time_filter: TimeFilter
102102
// refresh_interval: u64,
103103
// pannels: Vec<Pannel>,
104104
// }
@@ -112,6 +112,12 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
112112
// headers: Vec<String>,
113113
// dimensions: (u64, u64),
114114
// }
115+
//
116+
// #[derive(Debug, Serialize, Deserialize)]
117+
// pub struct TimeFilter {
118+
// to: String,
119+
// from: String
120+
// }
115121

116122
#[derive(Debug, thiserror::Error)]
117123
pub enum DashboardError {

server/src/handlers/http/users/filters.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,5 +161,5 @@ impl actix_web::ResponseError for FiltersError {
161161
// stream_name: String,
162162
// filter_name: String,
163163
// query: String,
164-
// time-filter: `type_not_defined`
164+
// time-filter: TimeFilter
165165
// }

server/src/query/listing_table_builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ impl ListingTableBuilder {
167167
})
168168
.try_collect()
169169
.await
170-
// TODO: make the err map better
171170
.map_err(|err| DataFusionError::External(Box::new(err)))?;
172171

173172
let mut res = res.into_iter().flatten().collect_vec();

server/src/query/stream_schema_provider.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ async fn collect_from_snapshot(
180180
.map(|item| item.manifest_path)
181181
.collect(),
182182
)
183-
.await
184-
.map_err(DataFusionError::ObjectStore)?;
183+
.await?;
184+
185185
let mut manifest_files: Vec<_> = manifest_files
186186
.into_iter()
187187
.flat_map(|file| file.files)

server/src/response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ impl QueryResponse {
3333
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
3434
log::info!("{}", "Returning query results");
3535
let records: Vec<&RecordBatch> = self.records.iter().collect();
36-
let mut json_records = record_batches_to_json(&records)
37-
.map_err(|err| QueryError::JsonParse(err.to_string()))?;
36+
let mut json_records = record_batches_to_json(&records)?;
37+
3838
if self.fill_null {
3939
for map in &mut json_records {
4040
for field in &self.fields {

0 commit comments

Comments
 (0)