Skip to content

refactor: schema mismatch check #1259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 145 additions & 27 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,18 @@ pub trait EventFormat: Sized {
)),
);

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
if !is_schema_matching(&schema, storage_schema, static_schema_flag) {
return Err(anyhow!("Schema mismatch"));
}
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);

// prepare the record batch and new fields to be added
let new_schema = update_field_type_in_schema(
Arc::new(Schema::new(schema)),
None,
time_partition,
None,
schema_version,
);

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
Expand All @@ -190,28 +195,6 @@ pub trait EventFormat: Sized {
Ok((rb, is_first))
}

fn is_schema_matching(
new_schema: Arc<Schema>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
) -> bool {
if !static_schema_flag {
return true;
}
for field in new_schema.fields() {
let Some(storage_field) = storage_schema.get(field.name()) else {
return false;
};
if field.name() != storage_field.name() {
return false;
}
if field.data_type() != storage_field.data_type() {
return false;
}
}
true
}

#[allow(clippy::too_many_arguments)]
fn into_event(
self,
Expand All @@ -226,6 +209,24 @@ pub trait EventFormat: Sized {
) -> Result<Event, AnyError>;
}

/// Determines if a schema matches the storage schema based on configuration.
/// Returns `true` if the schemas match according to the rules:
/// - If `static_schema_flag` is `false`, always returns `true` (flexible schema mode)
/// - If `static_schema_flag` is `true`, returns `true` only if all fields in `schema`
/// exist in `storage_schema` with exactly matching properties
fn is_schema_matching(
schema: &[Arc<Field>],
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
) -> bool {
!static_schema_flag
|| !schema.iter().any(|field| {
storage_schema
.get(field.name())
.is_none_or(|storage_field| storage_field != field)
})
}

pub fn get_existing_field_names(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
Expand Down Expand Up @@ -369,3 +370,120 @@ pub fn override_data_type(

Arc::new(Schema::new(updated_schema))
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field};

use super::*;

// Helper function to create a test field
fn create_field(name: &str, data_type: DataType) -> Arc<Field> {
Arc::new(Field::new(name.to_string(), data_type, true))
}

// Helper function to create a storage schema HashMap
fn create_storage_schema(fields: &[Arc<Field>]) -> HashMap<String, Arc<Field>> {
let mut storage_schema = HashMap::new();
for field in fields {
storage_schema.insert(field.name().to_string(), field.clone());
}
storage_schema
}

#[test]
fn test_static_schema_flag_false() {
// When static_schema_flag is false, should always return true regardless of schemas
let field1 = create_field("id", DataType::Int32);
let field2 = create_field("name", DataType::Utf8);

let schema = [field1.clone(), field2.clone()];
let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2

// Even though schemas don't match, function should return true because static_schema_flag is false
assert!(is_schema_matching(&schema, &storage_schema, false));
}

#[test]
fn test_identical_schemas() {
// When schemas are identical, should return true
let field1 = create_field("id", DataType::Int32);
let field2 = create_field("name", DataType::Utf8);

let schema = [field1.clone(), field2.clone()];
let storage_schema = create_storage_schema(&schema);

assert!(is_schema_matching(&schema, &storage_schema, true));
}

#[test]
fn test_missing_field_in_storage() {
// When storage schema is missing a field from new schema, should return false
let field1 = create_field("id", DataType::Int32);
let field2 = create_field("name", DataType::Utf8);

let schema = [field1.clone(), field2.clone()];
let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2

assert!(!is_schema_matching(&schema, &storage_schema, true));
}

#[test]
fn test_different_data_type() {
// When field has different data type, should return false
let field1 = create_field("id", DataType::Int32);
// Same name but different type
let field1_different_type = create_field("id", DataType::Int64);
let field2 = create_field("name", DataType::Utf8);

let schema = [field1.clone(), field2.clone()];
let storage_schema = create_storage_schema(&[field1_different_type, field2.clone()]);

assert!(!is_schema_matching(&schema, &storage_schema, true));
}

#[test]
fn test_extra_fields_in_storage() {
// When storage schema has extra fields not in new schema, should still return true
// This is because we only check if fields in new_schema exist in storage_schema
let field1 = create_field("id", DataType::Int32);
let field2 = create_field("name", DataType::Utf8);
let extra_field = create_field("extra", DataType::Boolean);

let schema = [field1.clone(), field2.clone()];
let storage_schema = create_storage_schema(&[field1.clone(), field2.clone(), extra_field]);

assert!(is_schema_matching(&schema, &storage_schema, true));
}

#[test]
fn test_empty_new_schema() {
// When new schema is empty, should return true
let field1 = create_field("id", DataType::Int32);

let storage_schema = create_storage_schema(&[field1.clone()]);

assert!(is_schema_matching(&[], &storage_schema, true));
}

#[test]
fn test_empty_storage_schema() {
// When storage schema is empty but new schema has fields, should return false
let field1 = create_field("id", DataType::Int32);

let schema = [field1.clone()];
let empty_storage_schema = HashMap::new();

assert!(!is_schema_matching(&schema, &empty_storage_schema, true));
}

#[test]
fn test_both_empty_schemas() {
// When both schemas are empty, should return true
let empty_storage_schema = HashMap::new();

assert!(is_schema_matching(&[], &empty_storage_schema, true));
}
}
Loading