Skip to content

Commit c5bebb8

Browse files
fix: flattening for kinesis
current: server expects decoded data to be a json object and adds timestamp and recordId to each json object change: the decoded data can be either a json object or a heavily nested json server decodes the data, flattens it (using generic flattening), then adds timestamp and recordId to each flattened json object also, all flattened json objects are converted as json array then ingested using common ingestion flow
1 parent 232e031 commit c5bebb8

File tree

3 files changed

+31
-25
lines changed

3 files changed

+31
-25
lines changed

src/handlers/http/kinesis.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
2121
use serde_json::{Map, Value};
2222
use std::str;
2323

24+
use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};
25+
2426
#[derive(Serialize, Deserialize, Debug)]
2527
#[serde(rename_all = "camelCase")]
2628
pub struct Message {
@@ -57,29 +59,35 @@ struct Data {
5759
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
5860
// "timestamp": "1704964113659"
5961
// }
60-
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
62+
pub async fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
6163
let mut vec_kinesis_json = Vec::new();
6264

6365
for record in message.records.iter() {
6466
let bytes = STANDARD.decode(record.data.clone()).unwrap();
65-
let json_string: String = String::from_utf8(bytes).unwrap();
66-
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
67-
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
68-
Ok(value) => value,
69-
Err(error) => panic!("Failed to deserialize JSON: {}", error),
70-
};
71-
72-
kinesis_json.insert(
73-
"requestId".to_owned(),
74-
Value::String(message.request_id.clone()),
75-
);
76-
kinesis_json.insert(
77-
"timestamp".to_owned(),
78-
Value::String(message.timestamp.to_string()),
79-
);
67+
if let Ok(json_string) = String::from_utf8(bytes) {
68+
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
69+
if !has_more_than_max_allowed_levels(&json, 1) {
70+
let flattened_json_arr = generic_flattening(&json)?;
71+
for flattened_json in flattened_json_arr {
72+
let mut kinesis_json: Map<String, Value> =
73+
match serde_json::from_value(flattened_json) {
74+
Ok(value) => value,
75+
Err(error) => panic!("Failed to deserialize JSON: {}", error),
76+
};
77+
kinesis_json.insert(
78+
"requestId".to_owned(),
79+
Value::String(message.request_id.clone()),
80+
);
81+
kinesis_json.insert(
82+
"timestamp".to_owned(),
83+
Value::String(message.timestamp.to_string()),
84+
);
8085

81-
vec_kinesis_json.push(Value::Object(kinesis_json));
86+
vec_kinesis_json.push(Value::Object(kinesis_json));
87+
}
88+
}
89+
}
8290
}
8391

84-
vec_kinesis_json
92+
Ok(vec_kinesis_json)
8593
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs(
6161
LogSource::Kinesis => {
6262
//custom flattening required for Amazon Kinesis
6363
let message: Message = serde_json::from_value(json)?;
64-
for record in flatten_kinesis_logs(message) {
65-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
66-
}
64+
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
65+
let record = convert_to_array(flattened_kinesis_data)?;
66+
push_logs(stream_name, record, log_source, p_custom_fields).await?;
6767
}
6868
LogSource::OtelLogs => {
6969
//custom flattening required for otel logs

src/utils/json/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,8 @@ pub fn flatten_json_body(
4444
// Flatten the json body only if new schema and has less than 4 levels of nesting
4545
let mut nested_value = if schema_version == SchemaVersion::V1
4646
&& !has_more_than_max_allowed_levels(&body, 1)
47-
&& matches!(
48-
log_source,
49-
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
50-
) {
47+
&& matches!(log_source, LogSource::Json | LogSource::Custom(_))
48+
{
5149
let flattened_json = generic_flattening(&body)?;
5250
convert_to_array(flattened_json)?
5351
} else {

0 commit comments

Comments
 (0)