Skip to content

Commit bc30047

Browse files
committed
fix: bug if user_id is not provided
1 parent 89d2cda commit bc30047

File tree

2 files changed

+129
-8
lines changed

2 files changed

+129
-8
lines changed

server/src/handlers/http/query.rs

Lines changed: 109 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8787

8888
let cache_results = req.headers().get(CACHE_RESULTS_HEADER_KEY);
8989
let show_cached = req.headers().get(CACHE_VIEW_HEADER_KEY);
90-
let user_id = req
91-
.headers()
92-
.get(USER_ID_HEADER_KEY)
93-
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
94-
.to_str()
95-
.map_err(|err| anyhow!(err))?;
90+
let user_id = req.headers().get(USER_ID_HEADER_KEY);
9691

9792
// deal with cached data
9893
if let Ok(results) = get_results_from_cache(
@@ -156,7 +151,114 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
156151
Ok(response)
157152
}
158153

159-
fn authorize_and_set_filter_tags(
154+
async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
155+
if CONFIG.parseable.mode == Mode::Query {
156+
for table in tables {
157+
if let Ok(new_schema) = fetch_schema(&table).await {
158+
// commit schema merges the schema internally and updates the schema in storage.
159+
commit_schema_to_storage(&table, new_schema.clone()).await?;
160+
161+
commit_schema(&table, Arc::new(new_schema))?;
162+
}
163+
}
164+
}
165+
166+
Ok(())
167+
}
168+
169+
#[allow(clippy::too_many_arguments)]
170+
async fn put_results_in_cache(
171+
cache_results: Option<&HeaderValue>,
172+
user_id: Option<&HeaderValue>,
173+
query_cache_manager: Option<&QueryCacheManager>,
174+
stream: &str,
175+
records: &[RecordBatch],
176+
start: String,
177+
end: String,
178+
query: String,
179+
) {
180+
match (cache_results, query_cache_manager) {
181+
(Some(_), None) => {
182+
log::warn!(
183+
"Instructed to cache query results but Query Caching is not Enabled in Server"
184+
);
185+
}
186+
// do cache
187+
(Some(_), Some(query_cache_manager)) => {
188+
let user_id = user_id
189+
.expect("User Id was provided")
190+
.to_str()
191+
.expect("is proper ASCII");
192+
193+
if let Err(err) = query_cache_manager
194+
.create_parquet_cache(stream, records, user_id, start, end, query)
195+
.await
196+
{
197+
log::error!("Error occured while caching query results: {:?}", err);
198+
if query_cache_manager
199+
.clear_cache(stream, user_id)
200+
.await
201+
.is_err()
202+
{
203+
log::error!("Error Clearing Unwanted files from cache dir");
204+
}
205+
}
206+
}
207+
(None, _) => {}
208+
}
209+
}
210+
211+
#[allow(clippy::too_many_arguments)]
212+
async fn get_results_from_cache(
213+
show_cached: Option<&HeaderValue>,
214+
query_cache_manager: Option<&QueryCacheManager>,
215+
stream: &str,
216+
user_id: Option<&HeaderValue>,
217+
start_time: &str,
218+
end_time: &str,
219+
query: &str,
220+
send_null: bool,
221+
send_fields: bool,
222+
) -> Result<QueryResponse, QueryError> {
223+
match (show_cached, query_cache_manager) {
224+
(Some(_), None) => {
225+
log::warn!(
226+
"Instructed to show cached results but Query Caching is not Enabled on Server"
227+
);
228+
None
229+
}
230+
(Some(_), Some(query_cache_manager)) => {
231+
let user_id = user_id
232+
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
233+
.to_str()
234+
.map_err(|err| anyhow!(err))?;
235+
236+
let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?;
237+
238+
let (start, end) = parse_human_time(start_time, end_time)?;
239+
let key = format!("{}-{}-{}", start.to_rfc3339(), end.to_rfc3339(), query);
240+
241+
let file_path = query_cache.get_file(key);
242+
if let Some(file_path) = file_path {
243+
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
244+
let response = QueryResponse {
245+
records,
246+
fields,
247+
fill_null: send_null,
248+
with_fields: send_fields,
249+
};
250+
251+
Some(Ok(response))
252+
} else {
253+
None
254+
}
255+
}
256+
(_, _) => None,
257+
}
258+
.map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val)
259+
}
260+
261+
pub fn authorize_and_set_filter_tags(
160262
query: &mut LogicalQuery,
161263
permissions: Vec<Permission>,
162264
table_name: &str,

server/src/querycache.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use arrow_array::RecordBatch;
220
use chrono::Utc;
321
use futures::TryStreamExt;
@@ -8,6 +26,7 @@ use itertools::Itertools;
826
use object_store::{local::LocalFileSystem, ObjectStore};
927
use once_cell::sync::OnceCell;
1028
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
29+
use std::collections::HashMap;
1130
use std::path::{Path, PathBuf};
1231
use tokio::fs as AsyncFs;
1332
use tokio::{fs, sync::Mutex};
@@ -273,7 +292,7 @@ impl QueryCacheManager {
273292
AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?;
274293
let parquet_file = AsyncFs::File::create(&parquet_path).await?;
275294
let time_partition = STREAM_INFO.get_time_partition(table_name)?;
276-
let props = parquet_writer_props(time_partition.clone(), 0).build();
295+
let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build();
277296

278297
let mut arrow_writer = AsyncArrowWriter::try_new(
279298
parquet_file,

0 commit comments

Comments
 (0)