Skip to content

Commit a4ca399

Browse files
fix: flattening for kinesis (#1329)
current: server expects decoded data to be a json object and adds timestamp and recordId to each json object change: the decoded data can be either a json object or a heavily nested json server decodes the data, flattens it (using generic flattening), then adds timestamp and recordId to each flattened json object also, all flattened json objects are converted as json array then ingested using common ingestion flow
1 parent d4bf193 commit a4ca399

File tree

3 files changed

+53
-26
lines changed

3 files changed

+53
-26
lines changed

src/handlers/http/kinesis.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
2121
use serde_json::{Map, Value};
2222
use std::str;
2323

24+
use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};
25+
2426
#[derive(Serialize, Deserialize, Debug)]
2527
#[serde(rename_all = "camelCase")]
2628
pub struct Message {
@@ -57,29 +59,56 @@ struct Data {
5759
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
5860
// "timestamp": "1704964113659"
5961
// }
60-
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
62+
pub async fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
6163
let mut vec_kinesis_json = Vec::new();
6264

6365
for record in message.records.iter() {
64-
let bytes = STANDARD.decode(record.data.clone()).unwrap();
65-
let json_string: String = String::from_utf8(bytes).unwrap();
66-
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
67-
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
68-
Ok(value) => value,
69-
Err(error) => panic!("Failed to deserialize JSON: {}", error),
70-
};
71-
72-
kinesis_json.insert(
73-
"requestId".to_owned(),
74-
Value::String(message.request_id.clone()),
75-
);
76-
kinesis_json.insert(
77-
"timestamp".to_owned(),
78-
Value::String(message.timestamp.to_string()),
79-
);
66+
let bytes = STANDARD.decode(record.data.clone())?;
67+
if let Ok(json_string) = String::from_utf8(bytes) {
68+
let json: serde_json::Value = serde_json::from_str(&json_string)?;
69+
// Check if the JSON has more than the allowed levels of nesting
70+
// If it has less than or equal to the allowed levels, we flatten it.
71+
// If it has more than the allowed levels, we just push it as is
72+
// without flattening or modifying it.
73+
if !has_more_than_max_allowed_levels(&json, 1) {
74+
let flattened_json_arr = generic_flattening(&json)?;
75+
for flattened_json in flattened_json_arr {
76+
let mut kinesis_json: Map<String, Value> =
77+
serde_json::from_value(flattened_json)?;
78+
kinesis_json.insert(
79+
"requestId".to_owned(),
80+
Value::String(message.request_id.clone()),
81+
);
82+
kinesis_json.insert(
83+
"timestamp".to_owned(),
84+
Value::String(message.timestamp.to_string()),
85+
);
8086

81-
vec_kinesis_json.push(Value::Object(kinesis_json));
87+
vec_kinesis_json.push(Value::Object(kinesis_json));
88+
}
89+
} else {
90+
// If the JSON has more than the allowed levels, we just push it as is
91+
// without flattening or modifying it.
92+
// This is a fallback to ensure we don't lose data.
93+
tracing::warn!(
94+
"Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
95+
message.request_id, message.timestamp
96+
);
97+
vec_kinesis_json.push(json);
98+
}
99+
} else {
100+
tracing::error!(
101+
"Failed to decode base64 data for kinesis log with requestId {} and timestamp {}",
102+
message.request_id,
103+
message.timestamp
104+
);
105+
return Err(anyhow::anyhow!(
106+
"Failed to decode base64 data for record with requestId {} and timestamp {}",
107+
message.request_id,
108+
message.timestamp
109+
));
110+
}
82111
}
83112

84-
vec_kinesis_json
113+
Ok(vec_kinesis_json)
85114
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs(
6161
LogSource::Kinesis => {
6262
//custom flattening required for Amazon Kinesis
6363
let message: Message = serde_json::from_value(json)?;
64-
for record in flatten_kinesis_logs(message) {
65-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
66-
}
64+
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
65+
let record = convert_to_array(flattened_kinesis_data)?;
66+
push_logs(stream_name, record, log_source, p_custom_fields).await?;
6767
}
6868
LogSource::OtelLogs => {
6969
//custom flattening required for otel logs

src/utils/json/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,8 @@ pub fn flatten_json_body(
4444
// Flatten the json body only if new schema and has less than 4 levels of nesting
4545
let mut nested_value = if schema_version == SchemaVersion::V1
4646
&& !has_more_than_max_allowed_levels(&body, 1)
47-
&& matches!(
48-
log_source,
49-
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
50-
) {
47+
&& matches!(log_source, LogSource::Json | LogSource::Custom(_))
48+
{
5149
let flattened_json = generic_flattening(&body)?;
5250
convert_to_array(flattened_json)?
5351
} else {

0 commit comments

Comments
 (0)