diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 830409d3d..89c28c372 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -162,43 +162,25 @@ pub trait EventFormat: Sized { )); }; - // prepare the record batch and new fields to be added - let mut new_schema = Arc::new(Schema::new(schema)); - if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + if !is_schema_matching(&schema, storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = - update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let rb = Self::decode(data, new_schema.clone())?; + // prepare the record batch and new fields to be added + let new_schema = update_field_type_in_schema( + Arc::new(Schema::new(schema)), + None, + time_partition, + None, + schema_version, + ); + let rb = Self::decode(data, new_schema.clone())?; let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?; Ok((rb, is_first)) } - fn is_schema_matching( - new_schema: Arc, - storage_schema: &HashMap>, - static_schema_flag: bool, - ) -> bool { - if !static_schema_flag { - return true; - } - for field in new_schema.fields() { - let Some(storage_field) = storage_schema.get(field.name()) else { - return false; - }; - if field.name() != storage_field.name() { - return false; - } - if field.data_type() != storage_field.data_type() { - return false; - } - } - true - } - #[allow(clippy::too_many_arguments)] fn into_event( self, @@ -214,6 +196,24 @@ pub trait EventFormat: Sized { ) -> Result; } +/// Determines if a schema matches the storage schema based on configuration. +/// Returns `true` if the schemas match according to the rules: +/// - If `static_schema_flag` is `false`, always returns `true` (flexible schema mode) +/// - If `static_schema_flag` is `true`, returns `true` only if all fields in `schema` +/// exist in `storage_schema` with exactly matching properties +fn is_schema_matching( + schema: &[Arc], + storage_schema: &HashMap>, + static_schema_flag: bool, +) -> bool { + !static_schema_flag + || !schema.iter().any(|field| { + storage_schema + .get(field.name()) + .is_none_or(|storage_field| storage_field != field) + }) +} + pub fn get_existing_field_names( inferred_schema: Arc, existing_schema: Option<&HashMap>>, @@ -357,3 +357,120 @@ pub fn override_data_type( Arc::new(Schema::new(updated_schema)) } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow_schema::{DataType, Field}; + + use super::*; + + // Helper function to create a test field + fn create_field(name: &str, data_type: DataType) -> Arc { + Arc::new(Field::new(name.to_string(), data_type, true)) + } + + // Helper function to create a storage schema HashMap + fn create_storage_schema(fields: &[Arc]) -> HashMap> { + let mut storage_schema = HashMap::new(); + for field in fields { + storage_schema.insert(field.name().to_string(), field.clone()); + } + storage_schema + } + + #[test] + fn test_static_schema_flag_false() { + // When static_schema_flag is false, should always return true regardless of schemas + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2 + + // Even though schemas don't match, function should return true because static_schema_flag is false + assert!(is_schema_matching(&schema, &storage_schema, false)); + } + + #[test] + fn test_identical_schemas() { + // When schemas are identical, should return true + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&schema); + + assert!(is_schema_matching(&schema, &storage_schema, true)); + } + + #[test] + fn test_missing_field_in_storage() { + // When storage schema is missing a field from new schema, should return false + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2 + + assert!(!is_schema_matching(&schema, &storage_schema, true)); + } + + #[test] + fn test_different_data_type() { + // When field has different data type, should return false + let field1 = create_field("id", DataType::Int32); + // Same name but different type + let field1_different_type = create_field("id", DataType::Int64); + let field2 = create_field("name", DataType::Utf8); + + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1_different_type, field2.clone()]); + + assert!(!is_schema_matching(&schema, &storage_schema, true)); + } + + #[test] + fn test_extra_fields_in_storage() { + // When storage schema has extra fields not in new schema, should still return true + // This is because we only check if fields in new_schema exist in storage_schema + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + let extra_field = create_field("extra", DataType::Boolean); + + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone(), field2.clone(), extra_field]); + + assert!(is_schema_matching(&schema, &storage_schema, true)); + } + + #[test] + fn test_empty_new_schema() { + // When new schema is empty, should return true + let field1 = create_field("id", DataType::Int32); + + let storage_schema = create_storage_schema(&[field1.clone()]); + + assert!(is_schema_matching(&[], &storage_schema, true)); + } + + #[test] + fn test_empty_storage_schema() { + // When storage schema is empty but new schema has fields, should return false + let field1 = create_field("id", DataType::Int32); + + let schema = [field1.clone()]; + let empty_storage_schema = HashMap::new(); + + assert!(!is_schema_matching(&schema, &empty_storage_schema, true)); + } + + #[test] + fn test_both_empty_schemas() { + // When both schemas are empty, should return true + let empty_storage_schema = HashMap::new(); + + assert!(is_schema_matching(&[], &empty_storage_schema, true)); + } +}