Skip to content

Commit caa9604

Browse files
authored
fix: Strict JSON value to fail on duplicate keys when ingesting (#1334)
Implements a `StrictValue` for JSON deserialization that flags duplicate keys in JSON map instead of accepting the last key (like the default implementation). Added unit tests for verifying deserialization. Resolves: #1217
1 parent b93ae42 commit caa9604

File tree

3 files changed

+403
-12
lines changed

3 files changed

+403
-12
lines changed

src/handlers/http/ingest.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use arrow_array::RecordBatch;
2424
use bytes::Bytes;
2525
use chrono::Utc;
2626
use http::StatusCode;
27-
use serde_json::Value;
2827

2928
use crate::event::error::EventError;
3029
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
@@ -39,7 +38,7 @@ use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
3938
use crate::parseable::{StreamNotFound, PARSEABLE};
4039
use crate::storage::{ObjectStorageError, StreamType};
4140
use crate::utils::header_parsing::ParseHeaderError;
42-
use crate::utils::json::flatten::JsonFlattenError;
41+
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};
4342

4443
use super::logstream::error::{CreateStreamError, StreamError};
4544
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
@@ -51,7 +50,7 @@ use super::users::filters::FiltersError;
5150
// creates if stream does not exist
5251
pub async fn ingest(
5352
req: HttpRequest,
54-
Json(mut json): Json<Value>,
53+
Json(json): Json<StrictValue>,
5554
) -> Result<HttpResponse, PostError> {
5655
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
5756
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -83,6 +82,8 @@ pub async fn ingest(
8382

8483
let mut p_custom_fields = get_custom_fields_from_header(&req);
8584

85+
let mut json = json.into_inner();
86+
8687
let fields = match &log_source {
8788
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
8889
&mut json,
@@ -127,13 +128,13 @@ pub async fn ingest(
127128

128129
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
129130
let size: usize = body.len();
130-
let json: Value = serde_json::from_slice(&body)?;
131+
let json: StrictValue = serde_json::from_slice(&body)?;
131132
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
132133
let mut p_custom_fields = HashMap::new();
133134
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
134135
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
135136
// For internal streams, use old schema
136-
format::json::Event::new(json)
137+
format::json::Event::new(json.into_inner())
137138
.into_event(
138139
stream_name,
139140
size as u64,
@@ -155,7 +156,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
155156
// creates if stream does not exist
156157
pub async fn handle_otel_logs_ingestion(
157158
req: HttpRequest,
158-
Json(json): Json<Value>,
159+
Json(json): Json<StrictValue>,
159160
) -> Result<HttpResponse, PostError> {
160161
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
161162
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -205,7 +206,13 @@ pub async fn handle_otel_logs_ingestion(
205206

206207
let p_custom_fields = get_custom_fields_from_header(&req);
207208

208-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
209+
flatten_and_push_logs(
210+
json.into_inner(),
211+
&stream_name,
212+
&log_source,
213+
&p_custom_fields,
214+
)
215+
.await?;
209216

210217
Ok(HttpResponse::Ok().finish())
211218
}
@@ -215,7 +222,7 @@ pub async fn handle_otel_logs_ingestion(
215222
// creates if stream does not exist
216223
pub async fn handle_otel_metrics_ingestion(
217224
req: HttpRequest,
218-
Json(json): Json<Value>,
225+
Json(json): Json<StrictValue>,
219226
) -> Result<HttpResponse, PostError> {
220227
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
221228
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -263,7 +270,13 @@ pub async fn handle_otel_metrics_ingestion(
263270

264271
let p_custom_fields = get_custom_fields_from_header(&req);
265272

266-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
273+
flatten_and_push_logs(
274+
json.into_inner(),
275+
&stream_name,
276+
&log_source,
277+
&p_custom_fields,
278+
)
279+
.await?;
267280

268281
Ok(HttpResponse::Ok().finish())
269282
}
@@ -273,7 +286,7 @@ pub async fn handle_otel_metrics_ingestion(
273286
// creates if stream does not exist
274287
pub async fn handle_otel_traces_ingestion(
275288
req: HttpRequest,
276-
Json(json): Json<Value>,
289+
Json(json): Json<StrictValue>,
277290
) -> Result<HttpResponse, PostError> {
278291
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
279292
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -322,7 +335,13 @@ pub async fn handle_otel_traces_ingestion(
322335

323336
let p_custom_fields = get_custom_fields_from_header(&req);
324337

325-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
338+
flatten_and_push_logs(
339+
json.into_inner(),
340+
&stream_name,
341+
&log_source,
342+
&p_custom_fields,
343+
)
344+
.await?;
326345

327346
Ok(HttpResponse::Ok().finish())
328347
}
@@ -333,7 +352,7 @@ pub async fn handle_otel_traces_ingestion(
333352
pub async fn post_event(
334353
req: HttpRequest,
335354
stream_name: Path<String>,
336-
Json(mut json): Json<Value>,
355+
Json(json): Json<StrictValue>,
337356
) -> Result<HttpResponse, PostError> {
338357
let stream_name = stream_name.into_inner();
339358

@@ -369,6 +388,7 @@ pub async fn post_event(
369388
.get(EXTRACT_LOG_KEY)
370389
.and_then(|h| h.to_str().ok());
371390
let mut p_custom_fields = get_custom_fields_from_header(&req);
391+
let mut json = json.into_inner();
372392
match &log_source {
373393
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
374394
return Err(PostError::OtelNotSupported)

src/utils/json/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::event::format::LogSource;
2828
use crate::metadata::SchemaVersion;
2929

3030
pub mod flatten;
31+
pub mod strict;
3132

3233
/// calls the function `flatten_json` which results Vec<Value> or Error
3334
/// in case when Vec<Value> is returned, converts the Vec<Value> to Value of Array

0 commit comments

Comments
 (0)