Skip to content

Commit 17d07d6

Browse files
committed
fix: issue creating multiple cache files
1 parent 54db5ef commit 17d07d6

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

server/src/handlers/http/query.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,15 @@ pub async fn put_results_in_cache(
207207
}
208208

209209
let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?;
210+
let mut cache = query_cache_manager.get_cache(stream, user_id).await?;
211+
212+
let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone());
213+
214+
// guard to stop multiple caching of the same content
215+
if let Some(path) = cache.get_file(&cache_key) {
216+
log::info!("File already exists in cache, Removing old file");
217+
cache.delete(&cache_key, path).await?;
218+
}
210219

211220
if let Err(err) = query_cache_manager
212221
.create_parquet_cache(stream, records, user_id, start, end, query)
@@ -262,7 +271,7 @@ pub async fn get_results_from_cache(
262271

263272
let (start, end) = parse_human_time(start_time, end_time)?;
264273

265-
let file_path = query_cache.get_file(CacheMetadata::new(
274+
let file_path = query_cache.get_file(&CacheMetadata::new(
266275
query.to_string(),
267276
start.to_rfc3339(),
268277
end.to_rfc3339(),

server/src/querycache.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,23 @@ impl QueryCache {
7575
}
7676
}
7777

78-
pub fn get_file(&mut self, key: CacheMetadata) -> Option<PathBuf> {
79-
self.files.get(&key).cloned()
78+
pub fn get_file(&mut self, key: &CacheMetadata) -> Option<PathBuf> {
79+
self.files.get(key).cloned()
8080
}
8181

8282
pub fn used_cache_size(&self) -> u64 {
8383
self.current_size
8484
}
8585

86-
pub fn remove(&mut self, key: CacheMetadata) -> Option<PathBuf> {
87-
self.files.remove(&key)
86+
pub fn remove(&mut self, key: &CacheMetadata) -> Option<PathBuf> {
87+
self.files.remove(key)
88+
}
89+
90+
pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> {
91+
self.files.delete(key);
92+
AsyncFs::remove_file(path).await?;
93+
94+
Ok(())
8895
}
8996

9097
pub fn queries(&self) -> Vec<&CacheMetadata> {
@@ -257,7 +264,7 @@ impl QueryCacheManager {
257264
) -> Result<(), CacheError> {
258265
let mut cache = self.get_cache(stream, user_id).await?;
259266

260-
if let Some(remove_result) = cache.remove(key) {
267+
if let Some(remove_result) = cache.remove(&key) {
261268
self.put_cache(stream, &cache, user_id).await?;
262269
tokio::spawn(fs::remove_file(remove_result));
263270
Ok(())
@@ -339,12 +346,7 @@ impl QueryCacheManager {
339346
return Ok(());
340347
};
341348

342-
343-
let mut arrow_writer = AsyncArrowWriter::try_new(
344-
parquet_file,
345-
sch,
346-
Some(props),
347-
)?;
349+
let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?;
348350

349351
for record in records {
350352
if let Err(e) = arrow_writer.write(record).await {

0 commit comments

Comments
 (0)