Skip to content

Commit 566730e

Browse files
committed
clean up
1 parent 7cc7bcd commit 566730e

File tree

5 files changed

+55
-39
lines changed

5 files changed

+55
-39
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,11 @@ pub async fn sync_cache_with_ingestors(
5959
ingestor: IngestorMetadata,
6060
body: bool,
6161
) -> Result<(), StreamError> {
62-
if !utils::check_liveness(&ingestor.domain_name).await {
63-
return Ok(());
64-
}
65-
let request_body: Bytes = Bytes::from(body.to_string());
66-
let client = reqwest::Client::new();
67-
let resp = client
62+
reqwest::Client::new()
6863
.put(url)
6964
.header(header::CONTENT_TYPE, "application/json")
7065
.header(header::AUTHORIZATION, ingestor.token)
71-
.body(request_body)
66+
.body(Bytes::from(body.to_string()))
7267
.send()
7368
.await
7469
.map_err(|err| {
@@ -83,13 +78,15 @@ pub async fn sync_cache_with_ingestors(
8378

8479
// if the response is not successful, log the error and return a custom error
8580
// this could be a bit too much, but we need to be sure it covers all cases
81+
/*
8682
if !resp.status().is_success() {
8783
log::error!(
8884
"failed to set cache: {}\nResponse Returned: {:?}",
8985
ingestor.domain_name,
9086
resp.text().await
9187
);
9288
}
89+
*/
9390

9491
Ok(())
9592
}
@@ -570,20 +567,17 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
570567
.run(move || async {
571568
let result: Result<(), PostError> = async {
572569
let cluster_metrics = fetch_cluster_metrics().await;
573-
if let Ok(metrics) = cluster_metrics {
570+
if let Ok(metrics) = cluster_metrics{
574571
if !metrics.is_empty() {
575572
log::info!("Cluster metrics fetched successfully from all ingestors");
576573
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
577574
let stream_name = INTERNAL_STREAM_NAME;
578575

579-
if matches!(
580-
ingest_internal_stream(
581-
stream_name.to_string(),
582-
bytes::Bytes::from(metrics_bytes),
583-
)
584-
.await,
585-
Ok(())
586-
) {
576+
if ingest_internal_stream(
577+
stream_name,
578+
metrics_bytes,
579+
)
580+
.await.is_ok() {
587581
log::info!(
588582
"Cluster metrics successfully ingested into internal stream"
589583
);

server/src/handlers/http/ingest.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,16 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
7171
}
7272
}
7373

74-
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
75-
create_stream_if_not_exists(&stream_name).await?;
74+
pub async fn ingest_internal_stream(stream_name: &str, body: Vec<u8>) -> Result<(), PostError> {
75+
create_stream_if_not_exists(stream_name).await?;
7676
let size: usize = body.len();
7777
let parsed_timestamp = Utc::now().naive_utc();
7878
let (rb, is_first) = {
7979
let body_val: Value = serde_json::from_slice(&body)?;
8080
let hash_map = STREAM_INFO.read().unwrap();
8181
let schema = hash_map
82-
.get(&stream_name)
83-
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
82+
.get(stream_name)
83+
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
8484
.schema
8585
.clone();
8686
let event = format::json::Event {
@@ -92,7 +92,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9292
};
9393
event::Event {
9494
rb,
95-
stream_name,
95+
stream_name: stream_name.to_string(),
9696
origin_format: "json",
9797
origin_size: size as u64,
9898
is_first_event: is_first,

server/src/handlers/http/logstream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ pub async fn put_enable_cache(
430430
stream_name
431431
);
432432

433-
super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
433+
super::cluster::sync_cache_with_ingestors(&url, ingestor, *body).await?;
434434
}
435435
}
436436
Mode::Ingest => {
@@ -439,22 +439,22 @@ pub async fn put_enable_cache(
439439
}
440440
// here the ingest server has not found the stream
441441
// so it should check if the stream exists in storage
442-
let check = storage
442+
let check_if_stream_exists = storage
443443
.list_streams()
444444
.await?
445445
.iter()
446-
.map(|stream| stream.name.clone())
446+
.map(|stream| &stream.name)
447447
.contains(&stream_name);
448448

449-
if !check {
450-
log::error!("Stream {} not found", stream_name.clone());
449+
if !check_if_stream_exists {
450+
log::error!("Stream {} not found", &stream_name);
451451
return Err(StreamError::StreamNotFound(stream_name.clone()));
452452
}
453453
metadata::STREAM_INFO
454454
.upsert_stream_info(
455455
&*storage,
456456
LogStream {
457-
name: stream_name.clone().to_owned(),
457+
name: stream_name.clone(),
458458
},
459459
)
460460
.await

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,29 @@ impl IngestServer {
189189
.authorize_for_stream(Action::GetStats),
190190
),
191191
)
192+
.service(
193+
web::scope("/retention")
194+
// PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream
195+
.route(
196+
"",
197+
web::put()
198+
.to(logstream::put_retention)
199+
.authorize_for_stream(Action::PutRetention),
200+
)
201+
// GET "/logstream/{logstream}/retention" ==> Get retention for given logstream
202+
.route(
203+
"",
204+
web::get()
205+
.to(logstream::get_retention)
206+
.authorize_for_stream(Action::GetRetention),
207+
)
208+
.route(
209+
"/cleanup",
210+
web::post()
211+
.to(logstream::retention_cleanup)
212+
.authorize_for_stream(Action::PutRetention),
213+
),
214+
)
192215
.service(
193216
web::resource("/cache")
194217
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
@@ -203,15 +226,6 @@ impl IngestServer {
203226
.to(logstream::get_cache_enabled)
204227
.authorize_for_stream(Action::GetCacheEnabled),
205228
),
206-
)
207-
.service(
208-
web::scope("/retention").service(
209-
web::resource("/cleanup").route(
210-
web::post()
211-
.to(logstream::retention_cleanup)
212-
.authorize_for_stream(Action::PutRetention),
213-
),
214-
),
215229
),
216230
)
217231
}

server/src/handlers/http/modal/server.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,29 +304,37 @@ impl Server {
304304
),
305305
)
306306
.service(
307-
web::resource("/retention")
307+
web::scope("/retention")
308308
// PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream
309309
.route(
310+
"",
310311
web::put()
311312
.to(logstream::put_retention)
312313
.authorize_for_stream(Action::PutRetention),
313314
)
314315
// GET "/logstream/{logstream}/retention" ==> Get retention for given logstream
315316
.route(
317+
"",
316318
web::get()
317319
.to(logstream::get_retention)
318320
.authorize_for_stream(Action::GetRetention),
321+
)
322+
.route(
323+
"/cleanup",
324+
web::post()
325+
.to(logstream::retention_cleanup)
326+
.authorize_for_stream(Action::PutRetention),
319327
),
320328
)
321329
.service(
322330
web::resource("/cache")
323-
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
331+
// PUT "/logstream/{logstream}/cache" ==> Set cache for given logstream
324332
.route(
325333
web::put()
326334
.to(logstream::put_enable_cache)
327335
.authorize_for_stream(Action::PutCacheEnabled),
328336
)
329-
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
337+
// GET "/logstream/{logstream}/cache" ==> Get cache for given logstream
330338
.route(
331339
web::get()
332340
.to(logstream::get_cache_enabled)

0 commit comments

Comments
 (0)