From c5bebb82430fa7216a715fc8405a0b4df05e3be3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 28 May 2025 00:01:29 -0400 Subject: [PATCH 1/2] fix: flattening for kinesis current: server expects decoded data to be a json object and adds timestamp and recordId to each json object change: the decoded data can be either a json object or a heavily nested json server decodes the data, flattens it (using generic flattening), then adds timestamp and recordId to each flattened json object also, all flattened json objects are converted as json array then ingested using common ingestion flow --- src/handlers/http/kinesis.rs | 44 +++++++++++-------- src/handlers/http/modal/utils/ingest_utils.rs | 6 +-- src/utils/json/mod.rs | 6 +-- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index e2f245f73..d5b80514b 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::str; +use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels}; + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Message { @@ -57,29 +59,35 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub fn flatten_kinesis_logs(message: Message) -> Vec { +pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { let bytes = STANDARD.decode(record.data.clone()).unwrap(); - let json_string: String = String::from_utf8(bytes).unwrap(); - let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); - let mut kinesis_json: Map = match serde_json::from_value(json) { - Ok(value) => value, - Err(error) => panic!("Failed to deserialize JSON: {}", error), - }; - - kinesis_json.insert( - "requestId".to_owned(), - Value::String(message.request_id.clone()), - ); - kinesis_json.insert( - "timestamp".to_owned(), - Value::String(message.timestamp.to_string()), - ); + if let Ok(json_string) = String::from_utf8(bytes) { + let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); + if !has_more_than_max_allowed_levels(&json, 1) { + let flattened_json_arr = generic_flattening(&json)?; + for flattened_json in flattened_json_arr { + let mut kinesis_json: Map = + match serde_json::from_value(flattened_json) { + Ok(value) => value, + Err(error) => panic!("Failed to deserialize JSON: {}", error), + }; + kinesis_json.insert( + "requestId".to_owned(), + Value::String(message.request_id.clone()), + ); + kinesis_json.insert( + "timestamp".to_owned(), + Value::String(message.timestamp.to_string()), + ); - vec_kinesis_json.push(Value::Object(kinesis_json)); + vec_kinesis_json.push(Value::Object(kinesis_json)); + } + } + } } - vec_kinesis_json + Ok(vec_kinesis_json) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 04efcbba4..ee1a90cfd 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs( LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; - for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, log_source, p_custom_fields).await?; - } + let flattened_kinesis_data = flatten_kinesis_logs(message).await?; + let record = convert_to_array(flattened_kinesis_data)?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } LogSource::OtelLogs => { //custom flattening required for otel logs diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index e087536be..cb1e2fb81 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -44,10 +44,8 @@ pub fn flatten_json_body( // Flatten the json body only if new schema and has less than 4 levels of nesting let mut nested_value = if schema_version == SchemaVersion::V1 && !has_more_than_max_allowed_levels(&body, 1) - && matches!( - log_source, - LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis - ) { + && matches!(log_source, LogSource::Json | LogSource::Custom(_)) + { let flattened_json = generic_flattening(&body)?; convert_to_array(flattened_json)? } else { From 0f95717b589edc09343a96a595f74807fe179227 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 28 May 2025 00:26:08 -0400 Subject: [PATCH 2/2] error handling --- src/handlers/http/kinesis.rs | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index d5b80514b..c714729c7 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -63,17 +63,18 @@ pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { - let bytes = STANDARD.decode(record.data.clone()).unwrap(); + let bytes = STANDARD.decode(record.data.clone())?; if let Ok(json_string) = String::from_utf8(bytes) { - let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); + let json: serde_json::Value = serde_json::from_str(&json_string)?; + // Check if the JSON has more than the allowed levels of nesting + // If it has less than or equal to the allowed levels, we flatten it. + // If it has more than the allowed levels, we just push it as is + // without flattening or modifying it. if !has_more_than_max_allowed_levels(&json, 1) { let flattened_json_arr = generic_flattening(&json)?; for flattened_json in flattened_json_arr { let mut kinesis_json: Map = - match serde_json::from_value(flattened_json) { - Ok(value) => value, - Err(error) => panic!("Failed to deserialize JSON: {}", error), - }; + serde_json::from_value(flattened_json)?; kinesis_json.insert( "requestId".to_owned(), Value::String(message.request_id.clone()), @@ -85,7 +86,27 @@ pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow vec_kinesis_json.push(Value::Object(kinesis_json)); } + } else { + // If the JSON has more than the allowed levels, we just push it as is + // without flattening or modifying it. + // This is a fallback to ensure we don't lose data. + tracing::warn!( + "Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.", + message.request_id, message.timestamp + ); + vec_kinesis_json.push(json); } + } else { + tracing::error!( + "Failed to decode base64 data for kinesis log with requestId {} and timestamp {}", + message.request_id, + message.timestamp + ); + return Err(anyhow::anyhow!( + "Failed to decode base64 data for record with requestId {} and timestamp {}", + message.request_id, + message.timestamp + )); } }