diff --git a/src/cli.rs b/src/cli.rs index d98499783..2bd86c5a4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -35,6 +35,7 @@ use crate::{ pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; +pub const DATASET_FIELD_COUNT_LIMIT: usize = 250; #[derive(Parser)] #[command( name = "parseable", @@ -368,6 +369,15 @@ pub struct Options { #[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] pub ms_clarity_tag: Option, + + #[arg( + long, + env = "P_DATASET_FIELD_COUNT_LIMIT", + default_value_t = DATASET_FIELD_COUNT_LIMIT, + value_parser = validation::validate_dataset_fields_allowed_limit, + help = "total number of fields recommended in a dataset" + )] + pub dataset_fields_allowed_limit: usize, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index a742057f9..8e252b14f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -467,6 +467,8 @@ pub enum PostError { KnownFormat(#[from] known_schema::Error), #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), + #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] + FieldsCountLimitExceeded(String, usize, usize), } impl actix_web::ResponseError for PostError { @@ -495,6 +497,7 @@ impl actix_web::ResponseError for PostError { PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, + PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 9b32b924b..6cffb6d99 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -54,6 +54,9 @@ pub async fn flatten_and_push_logs( log_source: &LogSource, p_custom_fields: &HashMap, ) -> Result<(), PostError> { + // Verify the dataset fields count + verify_dataset_fields_count(stream_name)?; + match log_source { LogSource::Kinesis => { //custom flattening required for Amazon Kinesis @@ -205,6 +208,38 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap Result<(), PostError> { + let fields_count = PARSEABLE + .get_stream(stream_name)? + .get_schema() + .fields() + .len(); + let dataset_fields_warn_threshold = 0.8 * PARSEABLE.options.dataset_fields_allowed_limit as f64; + // Check if the fields count exceeds the warn threshold + if fields_count > dataset_fields_warn_threshold as usize { + tracing::warn!( + "Dataset {0} has {1} fields, which exceeds the warning threshold of {2}. Ingestion will not be possible after reaching {3} fields. We recommend creating a new dataset.", + stream_name, + fields_count, + dataset_fields_warn_threshold as usize, + PARSEABLE.options.dataset_fields_allowed_limit + ); + } + // Check if the fields count exceeds the limit + // Return an error if the fields count exceeds the limit + if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { + let error = PostError::FieldsCountLimitExceeded( + stream_name.to_string(), + fields_count, + PARSEABLE.options.dataset_fields_allowed_limit, + ); + tracing::error!("{}", error); + // Return an error if the fields count exceeds the limit + return Err(error); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/option.rs b/src/option.rs index 26b5f4664..db9c94097 100644 --- a/src/option.rs +++ b/src/option.rs @@ -91,6 +91,7 @@ pub mod validation { path::{Path, PathBuf}, }; + use crate::cli::DATASET_FIELD_COUNT_LIMIT; use path_clean::PathClean; use super::{Compression, Mode}; @@ -173,4 +174,19 @@ pub mod validation { Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string()) } } + + pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { + if let Ok(size) = s.parse::() { + if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) { + Ok(size) + } else { + Err(format!( + "Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}", + DATASET_FIELD_COUNT_LIMIT + )) + } + } else { + Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string()) + } + } } diff --git a/src/otel/logs.rs b/src/otel/logs.rs index dfd60b8ec..f301869d2 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -15,7 +15,9 @@ * along with this program. If not, see . * */ - +use super::otel_utils::collect_json_from_values; +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; @@ -23,19 +25,23 @@ use opentelemetry_proto::tonic::logs::v1::SeverityNumber; use serde_json::Map; use serde_json::Value; -use super::otel_utils::add_other_attributes_if_not_empty; -use super::otel_utils::collect_json_from_values; -use super::otel_utils::convert_epoch_nano_to_timestamp; -use super::otel_utils::insert_attributes; -use super::otel_utils::merge_attributes_in_json; - -pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ +pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [ + "scope_name", + "scope_version", + "scope_log_schema_url", + "scope_dropped_attributes_count", + "resource_dropped_attributes_count", + "schema_url", "time_unix_nano", + "observed_time_unix_nano", "severity_number", "severity_text", "body", + "flags", + "log_record_dropped_attributes_count", "span_id", "trace_id", + "event_name", ]; /// otel log event has severity number /// there is a mapping of severity number to severity text provided in proto @@ -60,7 +66,6 @@ fn flatten_severity(severity_number: i32) -> Map { /// this function is called recursively for each log record object in the otel logs pub fn flatten_log_record(log_record: &LogRecord) -> Map { let mut log_record_json: Map = Map::new(); - let mut other_attributes = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -83,11 +88,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { log_record_json.insert(key.to_owned(), body_json[key].to_owned()); } } - insert_attributes( - &mut log_record_json, - &log_record.attributes, - &mut other_attributes, - ); + insert_attributes(&mut log_record_json, &log_record.attributes); log_record_json.insert( "log_record_dropped_attributes_count".to_string(), Value::Number(log_record.dropped_attributes_count.into()), @@ -106,9 +107,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { Value::String(hex::encode(&log_record.trace_id)), ); - // Add the `other_attributes` to the log record json - add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes); - log_record_json } @@ -117,18 +115,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { let mut vec_scope_log_json = Vec::new(); let mut scope_log_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_log_json.insert( "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_log_json, - &scope.attributes, - &mut other_attributes, - ); + insert_attributes(&mut scope_log_json, &scope.attributes); scope_log_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -146,9 +139,6 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json.push(combined_json); } - // Add the `other_attributes` to the scope log json - merge_attributes_in_json(other_attributes, &mut vec_scope_log_json); - vec_scope_log_json } @@ -156,16 +146,10 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); - for record in &message.resource_logs { let mut resource_log_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_log_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_log_json, &resource.attributes); resource_log_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -176,6 +160,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { for scope_log in &record.scope_logs { vec_resource_logs_json.extend(flatten_scope_log(scope_log)); } + resource_log_json.insert( "schema_url".to_string(), Value::String(record.schema_url.clone()), @@ -183,12 +168,10 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { for resource_logs_json in &mut vec_resource_logs_json { resource_logs_json.extend(resource_log_json.clone()); - } - - // Add the `other_attributes` to the resource log json - merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json); - vec_otel_json.extend(vec_resource_logs_json); + vec_otel_json.push(Value::Object(resource_logs_json.clone())); + } } - vec_otel_json.into_iter().map(Value::Object).collect() + + vec_otel_json } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 97a27ad74..4edff77fe 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -15,7 +15,6 @@ * along with this program. If not, see . * */ - use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, @@ -24,16 +23,44 @@ use opentelemetry_proto::tonic::metrics::v1::{ use serde_json::{Map, Value}; use super::otel_utils::{ - add_other_attributes_if_not_empty, convert_epoch_nano_to_timestamp, insert_attributes, - insert_number_if_some, merge_attributes_in_json, + convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; -pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ +pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 34] = [ "metric_name", "metric_description", "metric_unit", "start_time_unix_nano", "time_unix_nano", + "exemplar_time_unix_nano", + "exemplar_span_id", + "exemplar_trace_id", + "exemplar_value", + "data_point_value", + "data_point_count", + "data_point_sum", + "data_point_bucket_counts", + "data_point_explicit_bounds", + "data_point_scale", + "data_point_zero_count", + "data_point_flags", + "data_point_flags_description", + "positive_offset", + "positive_bucket_count", + "negative_offset", + "negative_bucket_count", + "quantile", + "value", + "is_monotonic", + "aggregation_temporality", + "aggregation_temporality_description", + "metric_type", + "scope_name", + "scope_version", + "scope_schema_url", + "scope_dropped_attributes_count", + "resource_dropped_attributes_count", + "resource_schema_url", ]; /// otel metrics event has json array for exemplar @@ -45,12 +72,7 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { .iter() .map(|exemplar| { let mut exemplar_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut exemplar_json, - &exemplar.filtered_attributes, - &mut other_attributes, - ); + insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); exemplar_json.insert( "exemplar_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -83,7 +105,6 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { } } } - add_other_attributes_if_not_empty(&mut exemplar_json, &other_attributes); exemplar_json }) .collect() @@ -98,12 +119,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { let mut data_point_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut data_point_json, - &data_point.attributes, - &mut other_attributes, - ); + insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -253,16 +261,13 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let mut exemplar_json = flatten_exemplar(&data_point.exemplars); + let exemplar_json = flatten_exemplar(&data_point.exemplars); if !exemplar_json.is_empty() { - merge_attributes_in_json(other_attributes, &mut exemplar_json); for exemplar in exemplar_json { for (key, value) in exemplar { data_point_json.insert(key, value); } } - } else { - add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); } data_point_json.extend(flatten_data_point_flags(data_point.flags)); @@ -309,12 +314,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { let mut data_point_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut data_point_json, - &data_point.attributes, - &mut other_attributes, - ); + insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -445,8 +437,6 @@ fn flatten_summary(summary: &Summary) -> Vec> { ), ); - add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); - data_points_json.push(data_point_json); } data_points_json @@ -460,7 +450,6 @@ fn flatten_summary(summary: &Summary) -> Vec> { pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); let mut metric_json = Map::new(); - let mut other_attributes = Map::new(); let mut metric_type = String::default(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { @@ -498,11 +487,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec Value::String(metrics_record.unit.clone()), ); metric_json.insert("metric_type".to_string(), Value::String(metric_type)); - insert_attributes( - &mut metric_json, - &metrics_record.metadata, - &mut other_attributes, - ); + insert_attributes(&mut metric_json, &metrics_record.metadata); for data_point_json in &mut data_points_json { for (key, value) in &metric_json { data_point_json.insert(key.clone(), value.clone()); @@ -511,7 +496,6 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec if data_points_json.is_empty() { data_points_json.push(metric_json); } - merge_attributes_in_json(other_attributes, &mut data_points_json); data_points_json } @@ -521,25 +505,21 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_metrics_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_metrics_json, &resource.attributes); resource_metrics_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), ); } + let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { let mut scope_metrics_json = Map::new(); - let mut scope_other_attributes = Map::new(); for metrics_record in &scope_metric.metrics { vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } + if let Some(scope) = &scope_metric.scope { scope_metrics_json .insert("scope_name".to_string(), Value::String(scope.name.clone())); @@ -547,18 +527,15 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_metrics_json, - &scope.attributes, - &mut scope_other_attributes, - ); + insert_attributes(&mut scope_metrics_json, &scope.attributes); scope_metrics_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), ); } + scope_metrics_json.insert( - "scope_metrics_schema_url".to_string(), + "scope_schema_url".to_string(), Value::String(scope_metric.schema_url.clone()), ); @@ -567,21 +544,23 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { scope_metric_json.insert(key.clone(), value.clone()); } } - merge_attributes_in_json(scope_other_attributes, &mut vec_scope_metrics_json); } + resource_metrics_json.insert( - "resource_metrics_schema_url".to_string(), + "resource_schema_url".to_string(), Value::String(record.schema_url.clone()), ); + for resource_metric_json in &mut vec_scope_metrics_json { for (key, value) in &resource_metrics_json { resource_metric_json.insert(key.clone(), value.clone()); } + + vec_otel_json.push(Value::Object(resource_metric_json.clone())); } - merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json); - vec_otel_json.extend(vec_scope_metrics_json); } - vec_otel_json.into_iter().map(Value::Object).collect() + + vec_otel_json } /// otel metrics event has json object for aggregation temporality diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 3c37c0e0a..9cf0e1644 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -22,10 +22,6 @@ use opentelemetry_proto::tonic::common::v1::{ }; use serde_json::{Map, Value}; -/// Prefixes of attribute keys that should be preserved as individual fields in flattened output. -/// Other attributes will be collected in a separate JSON object under `other_attributes`. -const KNOWN_ATTRIBUTES_PREFIX: [&str; 6] = ["http", "url", "service", "os", "host", "telemetry"]; -pub const OTHER_ATTRIBUTES_KEY: &str = "other_attributes"; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { let mut value_json: Map = Map::new(); @@ -153,24 +149,14 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes( - attributes: &Vec, - other_attributes_json: &mut Map, -) -> Map { +pub fn flatten_attributes(attributes: &[KeyValue]) -> Map { let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values(value, &key.to_string()); for (attr_key, attr_val) in &value_json { - if KNOWN_ATTRIBUTES_PREFIX - .iter() - .any(|prefix| attr_key.starts_with(prefix)) - { - attributes_json.insert(attr_key.clone(), attr_val.clone()); - } else { - other_attributes_json.insert(attr_key.clone(), attr_val.clone()); - } + attributes_json.insert(attr_key.clone(), attr_val.clone()); } } attributes_json @@ -196,12 +182,8 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes( - map: &mut Map, - attributes: &Vec, - other_attributes_json: &mut Map, -) { - let attributes_json = flatten_attributes(attributes, other_attributes_json); +pub fn insert_attributes(map: &mut Map, attributes: &[KeyValue]) { + let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); } @@ -211,111 +193,3 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc(); dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } - -/// fetch `other_attributes` from array of JSON objects -/// merge them with the provided attributes -/// and return the merged array of JSON object -pub fn merge_attributes_in_json( - attributes: Map, - vec_json: &mut Vec>, -) { - if attributes.is_empty() { - return; - } - - for json in vec_json { - let merged_attributes = match json.get(OTHER_ATTRIBUTES_KEY) { - Some(Value::String(attrs_str)) => { - merge_with_existing_attributes(&attributes, attrs_str) - } - Some(Value::Object(existing_attrs)) => { - merge_with_existing_object(&attributes, existing_attrs) - } - _ => serialize_attributes(&attributes), - }; - - if let Some(merged_str) = merged_attributes { - json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(merged_str)); - } - } -} - -/// Merge attributes with an existing JSON string of attributes -fn merge_with_existing_attributes( - attributes: &Map, - attrs_str: &str, -) -> Option { - match serde_json::from_str::>(attrs_str) { - Ok(mut existing_attrs) => { - for (key, value) in attributes { - existing_attrs.insert(key.clone(), value.clone()); - } - serde_json::to_string(&existing_attrs).ok() - } - Err(e) => { - tracing::warn!("failed to deserialize existing attributes: {e}"); - None - } - } -} - -/// Merge attributes with an existing JSON object of attributes -fn merge_with_existing_object( - attributes: &Map, - existing_attrs: &Map, -) -> Option { - let mut merged_attrs = existing_attrs.clone(); - for (key, value) in attributes { - merged_attrs.insert(key.clone(), value.clone()); - } - serde_json::to_string(&merged_attrs).ok() -} - -/// Serialize attributes into a JSON string -fn serialize_attributes(attributes: &Map) -> Option { - serde_json::to_string(attributes).ok() -} - -/// fetch `other_attributes` from array of JSON objects -/// and merge them into a single map -/// and return the merged map -pub fn fetch_attributes_from_json(json_arr: &Vec>) -> Map { - let mut merged_attributes = Map::new(); - - for json in json_arr { - if let Some(Value::String(attrs_str)) = json.get(OTHER_ATTRIBUTES_KEY) { - if let Ok(attrs) = serde_json::from_str::>(attrs_str) { - for (key, value) in attrs { - merged_attributes.insert(key, value); - } - } - } - } - merged_attributes -} - -/// convert attributes map to a string -/// and return the string -/// if serialisation fails, return an empty string -pub fn fetch_attributes_string(attributes: &Map) -> String { - match serde_json::to_string(attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - } -} - -/// add `other_attributes` to the JSON object -/// if `other_attributes` is not empty -/// and return the JSON object -pub fn add_other_attributes_if_not_empty( - json: &mut Map, - other_attributes: &Map, -) { - if !other_attributes.is_empty() { - let attrs_str = fetch_attributes_string(other_attributes); - json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(attrs_str)); - } -} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 87e4556a8..f243ecacf 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -15,7 +15,6 @@ * along with this program. If not, see . * */ - use opentelemetry_proto::tonic::trace::v1::span::Event; use opentelemetry_proto::tonic::trace::v1::span::Link; use opentelemetry_proto::tonic::trace::v1::ScopeSpans; @@ -24,18 +23,20 @@ use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::{Map, Value}; -use super::otel_utils::add_other_attributes_if_not_empty; use super::otel_utils::convert_epoch_nano_to_timestamp; -use super::otel_utils::fetch_attributes_from_json; use super::otel_utils::insert_attributes; -use super::otel_utils::merge_attributes_in_json; -pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ +pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 30] = [ + "scope_name", + "scope_version", + "scope_schema_url", + "scope_dropped_attributes_count", + "resource_schema_url", + "resource_dropped_attributes_count", "span_trace_id", "span_span_id", "span_name", "span_parent_span_id", - "flags", "name", "span_kind", "span_kind_description", @@ -43,6 +44,16 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ "span_end_time_unix_nano", "event_name", "event_time_unix_nano", + "event_dropped_attributes_count", + "link_span_id", + "link_trace_id", + "link_dropped_attributes_count", + "span_dropped_events_count", + "span_dropped_links_count", + "span_dropped_attributes_count", + "span_trace_state", + "span_flags", + "span_flags_description", "span_status_code", "span_status_description", "span_status_message", @@ -52,7 +63,6 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { let mut vec_scope_span_json = Vec::new(); let mut scope_span_json = Map::new(); - let mut other_attributes = Map::new(); for span in &scope_span.spans { let span_record_json = flatten_span_record(span); vec_scope_span_json.extend(span_record_json); @@ -64,11 +74,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_span_json, - &scope.attributes, - &mut other_attributes, - ); + insert_attributes(&mut scope_span_json, &scope.attributes); scope_span_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -83,12 +89,10 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { for span_json in &mut vec_scope_span_json { span_json.insert( - "schema_url".to_string(), + "scope_schema_url".to_string(), Value::String(scope_span.schema_url.clone()), ); } - // Add the `other_attributes` to the scope span json - merge_attributes_in_json(other_attributes, &mut vec_scope_span_json); vec_scope_span_json } @@ -100,13 +104,8 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { for record in &message.resource_spans { let mut resource_span_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_span_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_span_json, &resource.attributes); resource_span_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -120,7 +119,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { } resource_span_json.insert( - "schema_url".to_string(), + "resource_schema_url".to_string(), Value::String(record.schema_url.clone()), ); @@ -128,13 +127,12 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { for (key, value) in &resource_span_json { resource_spans_json.insert(key.clone(), value.clone()); } + + vec_otel_json.push(Value::Object(resource_spans_json.clone())); } - // Add the `other_attributes` to the resource span json - merge_attributes_in_json(other_attributes, &mut vec_resource_spans_json); - vec_otel_json.extend(vec_resource_spans_json); } - vec_otel_json.into_iter().map(Value::Object).collect() + vec_otel_json } /// otel traces has json array of events @@ -145,7 +143,6 @@ fn flatten_events(events: &[Event]) -> Vec> { .iter() .map(|event| { let mut event_json = Map::new(); - let mut other_attributes = Map::new(); event_json.insert( "event_time_unix_nano".to_string(), Value::String( @@ -153,13 +150,11 @@ fn flatten_events(events: &[Event]) -> Vec> { ), ); event_json.insert("event_name".to_string(), Value::String(event.name.clone())); - insert_attributes(&mut event_json, &event.attributes, &mut other_attributes); + insert_attributes(&mut event_json, &event.attributes); event_json.insert( "event_dropped_attributes_count".to_string(), Value::Number(event.dropped_attributes_count.into()), ); - - add_other_attributes_if_not_empty(&mut event_json, &other_attributes); event_json }) .collect() @@ -173,7 +168,6 @@ fn flatten_links(links: &[Link]) -> Vec> { .iter() .map(|link| { let mut link_json = Map::new(); - let mut other_attributes = Map::new(); link_json.insert( "link_span_id".to_string(), Value::String(hex::encode(&link.span_id)), @@ -183,13 +177,11 @@ fn flatten_links(links: &[Link]) -> Vec> { Value::String(hex::encode(&link.trace_id)), ); - insert_attributes(&mut link_json, &link.attributes, &mut other_attributes); + insert_attributes(&mut link_json, &link.attributes); link_json.insert( "link_dropped_attributes_count".to_string(), Value::Number(link.dropped_attributes_count.into()), ); - - add_other_attributes_if_not_empty(&mut link_json, &other_attributes); link_json }) .collect() @@ -274,7 +266,6 @@ fn flatten_kind(kind: i32) -> Map { /// this function is called recursively for each span record object in the otel traces event fn flatten_span_record(span_record: &Span) -> Vec> { let mut span_records_json = Vec::new(); - let mut other_attributes = Map::new(); let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(), @@ -310,40 +301,20 @@ fn flatten_span_record(span_record: &Span) -> Vec> { span_record.end_time_unix_nano as i64, )), ); - insert_attributes( - &mut span_record_json, - &span_record.attributes, - &mut other_attributes, - ); + insert_attributes(&mut span_record_json, &span_record.attributes); span_record_json.insert( "span_dropped_attributes_count".to_string(), Value::Number(span_record.dropped_attributes_count.into()), ); let events_json = flatten_events(&span_record.events); - // fetch all other_attributes from the events_json - let events_other_attributes = fetch_attributes_from_json(&events_json); span_records_json.extend(events_json); span_record_json.insert( "span_dropped_events_count".to_string(), Value::Number(span_record.dropped_events_count.into()), ); let links_json = flatten_links(&span_record.links); - // fetch all other_attributes from the links_json - let links_other_attributes = fetch_attributes_from_json(&links_json); span_records_json.extend(links_json); - // merge all other_attributes from the events_json and links_json - if !other_attributes.is_empty() - || !events_other_attributes.is_empty() - || !links_other_attributes.is_empty() - { - for (key, value) in &events_other_attributes { - other_attributes.insert(key.clone(), value.clone()); - } - for (key, value) in &links_other_attributes { - other_attributes.insert(key.clone(), value.clone()); - } - add_other_attributes_if_not_empty(&mut span_record_json, &other_attributes); - } + span_record_json.insert( "span_dropped_links_count".to_string(), Value::Number(span_record.dropped_links_count.into()),