From dfca20a09e4c68dff8f7a6bb99599a0f2887f5b6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 20 May 2025 07:27:49 -0400 Subject: [PATCH 1/3] add env for max level of flattening allowed for events env `P_MAX_FLATTEN_LEVEL` to control the maximum level of flattening allowed default to 10 this is to ensure nested list type fields do not get created eg. with current implementation of hard coded level of 4, field gets created with data type - ``` { "name": "Records_resources_accountId", "data_type": { "List": { "name": "item", "data_type": { "List": { "name": "item", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ``` after this change, data type changes to - ``` { "name": "Records_resources_accountId", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ``` --- src/cli.rs | 10 +++++++ src/handlers/http/modal/utils/ingest_utils.rs | 2 +- src/utils/json/flatten.rs | 30 ++++++------------- src/utils/json/mod.rs | 4 +-- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 2bd86c5a4..26cab2e95 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -378,6 +378,16 @@ pub struct Options { help = "total number of fields recommended in a dataset" )] pub dataset_fields_allowed_limit: usize, + + // maximum level of flattening allowed for events + // this is to prevent nested list type fields from getting created + #[arg( + long, + env = "P_MAX_FLATTEN_LEVEL", + default_value = "10", + help = "Maximum level of flattening allowed for events" + )] + pub event_flatten_level: usize, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 6cffb6d99..04efcbba4 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -62,7 +62,7 @@ pub async fn flatten_and_push_logs( //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelLogs => { diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 06f1e7201..113fbbe8e 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -25,6 +25,8 @@ use serde_json::value::Value; use thiserror::Error; +use crate::parseable::PARSEABLE; + #[derive(Error, Debug)] pub enum JsonFlattenError { #[error("Cannot flatten this JSON")] @@ -314,21 +316,21 @@ pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> } /// recursively checks the level of nesting for the serde Value -/// if Value has more than 4 levels of hierarchy, returns true -/// example - +/// if Value has more than configured `P_MAX_FLATTEN_LEVEL` levels of hierarchy, returns true +/// example - if `P_MAX_FLATTEN_LEVEL` is 4, and the JSON is /// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true /// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false -pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool { - if current_level > 4 { +pub fn has_more_than_max_allowed_levels(value: &Value, current_level: usize) -> bool { + if current_level > PARSEABLE.options.event_flatten_level { return true; } match value { Value::Array(arr) => arr .iter() - .any(|item| has_more_than_four_levels(item, current_level)), + .any(|item| has_more_than_max_allowed_levels(item, current_level)), Value::Object(map) => map .values() - .any(|val| has_more_than_four_levels(val, current_level + 1)), + .any(|val| has_more_than_max_allowed_levels(val, current_level + 1)), _ => false, } } @@ -344,9 +346,7 @@ pub fn convert_to_array(flattened: Vec) -> Result Result { // Flatten the json body only if new schema and has less than 4 levels of nesting let mut nested_value = if schema_version == SchemaVersion::V1 - && !has_more_than_four_levels(&body, 1) + && !has_more_than_max_allowed_levels(&body, 1) && matches!( log_source, LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis From b7f8aff89dee1cf8196067bd7cee665212cc21ee Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 20 May 2025 08:26:19 -0400 Subject: [PATCH 2/3] remove test because of cli options dependency --- src/utils/json/mod.rs | 100 ------------------------------------------ 1 file changed, 100 deletions(-) diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 9a77d986c..e087536be 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -144,50 +144,9 @@ where #[cfg(test)] mod tests { - use crate::event::format::LogSource; - use super::*; use serde::{Deserialize, Serialize}; use serde_json::json; - - #[test] - fn hierarchical_json_flattening_success() { - let value = json!({"a":{"b":{"e":["a","b"]}}}); - let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]); - assert_eq!( - flatten_json_body( - value, - None, - None, - None, - crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() - ) - .unwrap(), - expected - ); - } - - #[test] - fn hierarchical_json_flattening_failure() { - let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); - let expected = json!({"a_b_c_d_e": ["a","b"]}); - assert_eq!( - flatten_json_body( - value, - None, - None, - None, - crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() - ) - .unwrap(), - expected - ); - } - #[derive(Serialize, Deserialize)] struct TestBool { #[serde( @@ -353,63 +312,4 @@ mod tests { flattened_json ); } - - #[test] - fn arr_obj_with_nested_type_v1() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = flatten_json_body( - json, - None, - None, - None, - SchemaVersion::V1, - false, - &crate::event::format::LogSource::default(), - ) - .unwrap(); - - assert_eq!( - json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c_a": 1, - }, - { - "a": 1, - "b": "hello", - "c_a": 1, - "c_b": 2, - }, - ]), - flattened_json - ); - } } From 96b724d00c5eba318134c54cda3aa8e0f17dd0db Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 21 May 2025 11:46:28 -0400 Subject: [PATCH 3/3] flatten fix for empty array --- src/utils/json/flatten.rs | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 113fbbe8e..11b41beb5 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -276,19 +276,31 @@ pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> let results = map .iter() .fold(vec![Map::new()], |results, (key, val)| match val { - Value::Array(arr) => arr - .iter() - .flat_map(|flatten_item| { - generic_flattening(flatten_item).unwrap_or_default() - }) - .flat_map(|flattened_item| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), flattened_item.clone()); - new_obj - }) - }) - .collect(), + Value::Array(arr) => { + if arr.is_empty() { + // Insert empty array for this key in all current results + results + .into_iter() + .map(|mut result| { + result.insert(key.clone(), Value::Array(vec![])); + result + }) + .collect() + } else { + arr.iter() + .flat_map(|flatten_item| { + generic_flattening(flatten_item).unwrap_or_default() + }) + .flat_map(|flattened_item| { + results.iter().map(move |result| { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_obj + }) + }) + .collect() + } + } Value::Object(_) => generic_flattening(val) .unwrap_or_default() .iter()