From 0eefe7786f0f350fbf6fe0ec464cb2775b7797f0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 12 May 2025 08:35:42 -0400 Subject: [PATCH 01/11] remove other_attributes from otel logs/traces/metrics keep all attributes as individual columns in the ingested event expose env `P_OTEL_ATTRIBUTES_ALLOWED_LIMIT` to configure the allowed limit for attributes count if attributes count in flattened event > the allowed limit log the error, and reject the event Fixes: #1310 --- src/cli.rs | 8 + src/handlers/http/ingest.rs | 4 + src/handlers/http/modal/utils/ingest_utils.rs | 9 +- src/otel/logs.rs | 84 +++++----- src/otel/metrics.rs | 150 +++++++++--------- src/otel/otel_utils.rs | 136 +--------------- src/otel/traces.rs | 105 ++++++------ 7 files changed, 201 insertions(+), 295 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index d98499783..731b3909e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -368,6 +368,14 @@ pub struct Options { #[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] pub ms_clarity_tag: Option, + + #[arg( + long, + env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT", + default_value = "200", + help = "allowed limit for otel attributes" + )] + pub otel_attributes_allowed_limit: usize, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index a742057f9..d40c821a2 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -35,6 +35,7 @@ use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; +use crate::otel::otel_utils::OtelError; use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; @@ -467,6 +468,8 @@ pub enum PostError { KnownFormat(#[from] known_schema::Error), #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), + #[error("OtelError: {0}")] + OtelError(#[from] OtelError), } impl actix_web::ResponseError for PostError { @@ -495,6 +498,7 @@ impl actix_web::ResponseError for PostError { PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, + PostError::OtelError(_) => StatusCode::EXPECTATION_FAILED, } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 9b32b924b..1754e59c4 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -65,21 +65,24 @@ pub async fn flatten_and_push_logs( LogSource::OtelLogs => { //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; - for record in flatten_otel_logs(&logs) { + let records = flatten_otel_logs(&logs)?; + for record in records { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; - for record in flatten_otel_traces(&traces) { + let records = flatten_otel_traces(&traces)?; + for record in records { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; - for record in flatten_otel_metrics(metrics) { + let records = flatten_otel_metrics(metrics)?; + for record in records { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } diff --git a/src/otel/logs.rs b/src/otel/logs.rs index dfd60b8ec..a9e8bd478 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -16,6 +16,14 @@ * */ +use std::collections::HashSet; + +use crate::parseable::PARSEABLE; + +use super::otel_utils::collect_json_from_values; +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; +use super::otel_utils::OtelError; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; @@ -23,19 +31,23 @@ use opentelemetry_proto::tonic::logs::v1::SeverityNumber; use serde_json::Map; use serde_json::Value; -use super::otel_utils::add_other_attributes_if_not_empty; -use super::otel_utils::collect_json_from_values; -use super::otel_utils::convert_epoch_nano_to_timestamp; -use super::otel_utils::insert_attributes; -use super::otel_utils::merge_attributes_in_json; - -pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ +pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [ + "scope_name", + "scope_version", + "scope_log_schema_url", + "scope_dropped_attributes_count", + "resource_dropped_attributes_count", + "resource_schema_url", "time_unix_nano", + "observed_time_unix_nano", "severity_number", "severity_text", "body", + "flags", + "log_record_dropped_attributes_count", "span_id", "trace_id", + "event_name", ]; /// otel log event has severity number /// there is a mapping of severity number to severity text provided in proto @@ -60,7 +72,6 @@ fn flatten_severity(severity_number: i32) -> Map { /// this function is called recursively for each log record object in the otel logs pub fn flatten_log_record(log_record: &LogRecord) -> Map { let mut log_record_json: Map = Map::new(); - let mut other_attributes = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -83,11 +94,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { log_record_json.insert(key.to_owned(), body_json[key].to_owned()); } } - insert_attributes( - &mut log_record_json, - &log_record.attributes, - &mut other_attributes, - ); + insert_attributes(&mut log_record_json, &log_record.attributes); log_record_json.insert( "log_record_dropped_attributes_count".to_string(), Value::Number(log_record.dropped_attributes_count.into()), @@ -106,9 +113,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { Value::String(hex::encode(&log_record.trace_id)), ); - // Add the `other_attributes` to the log record json - add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes); - log_record_json } @@ -117,18 +121,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { let mut vec_scope_log_json = Vec::new(); let mut scope_log_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_log_json.insert( "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_log_json, - &scope.attributes, - &mut other_attributes, - ); + insert_attributes(&mut scope_log_json, &scope.attributes); scope_log_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -146,26 +145,19 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json.push(combined_json); } - // Add the `other_attributes` to the scope log json - merge_attributes_in_json(other_attributes, &mut vec_scope_log_json); - vec_scope_log_json } /// this function performs the custom flattening of the otel logs /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_logs(message: &LogsData) -> Vec { +pub fn flatten_otel_logs(message: &LogsData) -> Result, OtelError> { let mut vec_otel_json = Vec::new(); + let known_fields: HashSet<&str> = OTEL_LOG_KNOWN_FIELD_LIST.iter().cloned().collect(); for record in &message.resource_logs { let mut resource_log_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_log_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_log_json, &resource.attributes); resource_log_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -176,6 +168,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { for scope_log in &record.scope_logs { vec_resource_logs_json.extend(flatten_scope_log(scope_log)); } + resource_log_json.insert( "schema_url".to_string(), Value::String(record.schema_url.clone()), @@ -183,12 +176,27 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { for resource_logs_json in &mut vec_resource_logs_json { resource_logs_json.extend(resource_log_json.clone()); - } - - // Add the `other_attributes` to the resource log json - merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json); - vec_otel_json.extend(vec_resource_logs_json); + let attribute_count = resource_logs_json + .keys() + .filter(|key| !known_fields.contains(key.as_str())) + .count(); + // Check if the number of attributes exceeds the allowed limit + if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { + tracing::error!( + "OTEL logs ingestion failed because the number of attributes ({}) exceeded the threshold of {}", + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit + ); + return Err(OtelError::AttributeCountExceeded( + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit, + )); + } + + vec_otel_json.push(Value::Object(resource_logs_json.clone())); + } } - vec_otel_json.into_iter().map(Value::Object).collect() + + Ok(vec_otel_json) } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 97a27ad74..02b3227ce 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, @@ -23,17 +25,49 @@ use opentelemetry_proto::tonic::metrics::v1::{ }; use serde_json::{Map, Value}; +use crate::otel::otel_utils::OtelError; +use crate::parseable::PARSEABLE; + use super::otel_utils::{ - add_other_attributes_if_not_empty, convert_epoch_nano_to_timestamp, insert_attributes, - insert_number_if_some, merge_attributes_in_json, + convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; -pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ +pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 35] = [ "metric_name", "metric_description", "metric_unit", "start_time_unix_nano", "time_unix_nano", + "exemplar_time_unix_nano", + "exemplar_span_id", + "exemplar_trace_id", + "exemplar_value", + "data_point_value", + "data_point_count", + "data_point_sum", + "data_point_bucket_counts", + "data_point_explicit_bounds", + "data_point_scale", + "data_point_zero_count", + "data_point_flags", + "data_point_flags_description", + "positive_offset", + "positive_bucket_count", + "negative_offset", + "negative_bucket_count", + "quantile", + "value", + "is_monotonic", + "aggregation_temporality", + "aggregation_temporality_description", + "metric_type", + "scope_name", + "scope_version", + "scope_metrics_schema_url", + "scope_dropped_attributes_count", + "resource_dropped_attributes_count", + "resource_schema_url", + "resource_metrics_schema_url", ]; /// otel metrics event has json array for exemplar @@ -45,12 +79,7 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { .iter() .map(|exemplar| { let mut exemplar_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut exemplar_json, - &exemplar.filtered_attributes, - &mut other_attributes, - ); + insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); exemplar_json.insert( "exemplar_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -83,7 +112,6 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { } } } - add_other_attributes_if_not_empty(&mut exemplar_json, &other_attributes); exemplar_json }) .collect() @@ -98,12 +126,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { let mut data_point_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut data_point_json, - &data_point.attributes, - &mut other_attributes, - ); + insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -253,16 +268,13 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let mut exemplar_json = flatten_exemplar(&data_point.exemplars); + let exemplar_json = flatten_exemplar(&data_point.exemplars); if !exemplar_json.is_empty() { - merge_attributes_in_json(other_attributes, &mut exemplar_json); for exemplar in exemplar_json { for (key, value) in exemplar { data_point_json.insert(key, value); } } - } else { - add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); } data_point_json.extend(flatten_data_point_flags(data_point.flags)); @@ -309,12 +321,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { let mut data_point_json = Map::new(); - let mut other_attributes = Map::new(); - insert_attributes( - &mut data_point_json, - &data_point.attributes, - &mut other_attributes, - ); + insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -445,8 +444,6 @@ fn flatten_summary(summary: &Summary) -> Vec> { ), ); - add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); - data_points_json.push(data_point_json); } data_points_json @@ -460,7 +457,6 @@ fn flatten_summary(summary: &Summary) -> Vec> { pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); let mut metric_json = Map::new(); - let mut other_attributes = Map::new(); let mut metric_type = String::default(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { @@ -498,11 +494,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec Value::String(metrics_record.unit.clone()), ); metric_json.insert("metric_type".to_string(), Value::String(metric_type)); - insert_attributes( - &mut metric_json, - &metrics_record.metadata, - &mut other_attributes, - ); + insert_attributes(&mut metric_json, &metrics_record.metadata); for data_point_json in &mut data_points_json { for (key, value) in &metric_json { data_point_json.insert(key.clone(), value.clone()); @@ -511,35 +503,32 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec if data_points_json.is_empty() { data_points_json.push(metric_json); } - merge_attributes_in_json(other_attributes, &mut data_points_json); data_points_json } /// this function performs the custom flattening of the otel metrics /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_metrics(message: MetricsData) -> Vec { +pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelError> { let mut vec_otel_json = Vec::new(); + let known_fields: HashSet<&str> = OTEL_METRICS_KNOWN_FIELD_LIST.iter().cloned().collect(); + for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_metrics_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_metrics_json, &resource.attributes); resource_metrics_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), ); } + let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { let mut scope_metrics_json = Map::new(); - let mut scope_other_attributes = Map::new(); for metrics_record in &scope_metric.metrics { vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } + if let Some(scope) = &scope_metric.scope { scope_metrics_json .insert("scope_name".to_string(), Value::String(scope.name.clone())); @@ -547,16 +536,13 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_metrics_json, - &scope.attributes, - &mut scope_other_attributes, - ); + insert_attributes(&mut scope_metrics_json, &scope.attributes); scope_metrics_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), ); } + scope_metrics_json.insert( "scope_metrics_schema_url".to_string(), Value::String(scope_metric.schema_url.clone()), @@ -567,21 +553,41 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { scope_metric_json.insert(key.clone(), value.clone()); } } - merge_attributes_in_json(scope_other_attributes, &mut vec_scope_metrics_json); } + resource_metrics_json.insert( "resource_metrics_schema_url".to_string(), Value::String(record.schema_url.clone()), ); + for resource_metric_json in &mut vec_scope_metrics_json { for (key, value) in &resource_metrics_json { resource_metric_json.insert(key.clone(), value.clone()); } + + let attribute_count = resource_metric_json + .keys() + .filter(|key| !known_fields.contains(key.as_str())) + .count(); + + // Check if the number of attributes exceeds the allowed limit + if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { + tracing::error!( + "OTEL metrics ingestion failed because the number of attributes ({}) exceeded the threshold of {}", + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit + ); + return Err(OtelError::AttributeCountExceeded( + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit, + )); + } + + vec_otel_json.push(Value::Object(resource_metric_json.clone())); } - merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json); - vec_otel_json.extend(vec_scope_metrics_json); } - vec_otel_json.into_iter().map(Value::Object).collect() + + Ok(vec_otel_json) } /// otel metrics event has json object for aggregation temporality diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 3c37c0e0a..eeec03cef 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -22,10 +22,6 @@ use opentelemetry_proto::tonic::common::v1::{ }; use serde_json::{Map, Value}; -/// Prefixes of attribute keys that should be preserved as individual fields in flattened output. -/// Other attributes will be collected in a separate JSON object under `other_attributes`. -const KNOWN_ATTRIBUTES_PREFIX: [&str; 6] = ["http", "url", "service", "os", "host", "telemetry"]; -pub const OTHER_ATTRIBUTES_KEY: &str = "other_attributes"; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { let mut value_json: Map = Map::new(); @@ -153,24 +149,14 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes( - attributes: &Vec, - other_attributes_json: &mut Map, -) -> Map { +pub fn flatten_attributes(attributes: &Vec) -> Map { let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values(value, &key.to_string()); for (attr_key, attr_val) in &value_json { - if KNOWN_ATTRIBUTES_PREFIX - .iter() - .any(|prefix| attr_key.starts_with(prefix)) - { - attributes_json.insert(attr_key.clone(), attr_val.clone()); - } else { - other_attributes_json.insert(attr_key.clone(), attr_val.clone()); - } + attributes_json.insert(attr_key.clone(), attr_val.clone()); } } attributes_json @@ -196,12 +182,8 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes( - map: &mut Map, - attributes: &Vec, - other_attributes_json: &mut Map, -) { - let attributes_json = flatten_attributes(attributes, other_attributes_json); +pub fn insert_attributes(map: &mut Map, attributes: &Vec) { + let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); } @@ -212,110 +194,8 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } -/// fetch `other_attributes` from array of JSON objects -/// merge them with the provided attributes -/// and return the merged array of JSON object -pub fn merge_attributes_in_json( - attributes: Map, - vec_json: &mut Vec>, -) { - if attributes.is_empty() { - return; - } - - for json in vec_json { - let merged_attributes = match json.get(OTHER_ATTRIBUTES_KEY) { - Some(Value::String(attrs_str)) => { - merge_with_existing_attributes(&attributes, attrs_str) - } - Some(Value::Object(existing_attrs)) => { - merge_with_existing_object(&attributes, existing_attrs) - } - _ => serialize_attributes(&attributes), - }; - - if let Some(merged_str) = merged_attributes { - json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(merged_str)); - } - } -} - -/// Merge attributes with an existing JSON string of attributes -fn merge_with_existing_attributes( - attributes: &Map, - attrs_str: &str, -) -> Option { - match serde_json::from_str::>(attrs_str) { - Ok(mut existing_attrs) => { - for (key, value) in attributes { - existing_attrs.insert(key.clone(), value.clone()); - } - serde_json::to_string(&existing_attrs).ok() - } - Err(e) => { - tracing::warn!("failed to deserialize existing attributes: {e}"); - None - } - } -} - -/// Merge attributes with an existing JSON object of attributes -fn merge_with_existing_object( - attributes: &Map, - existing_attrs: &Map, -) -> Option { - let mut merged_attrs = existing_attrs.clone(); - for (key, value) in attributes { - merged_attrs.insert(key.clone(), value.clone()); - } - serde_json::to_string(&merged_attrs).ok() -} - -/// Serialize attributes into a JSON string -fn serialize_attributes(attributes: &Map) -> Option { - serde_json::to_string(attributes).ok() -} - -/// fetch `other_attributes` from array of JSON objects -/// and merge them into a single map -/// and return the merged map -pub fn fetch_attributes_from_json(json_arr: &Vec>) -> Map { - let mut merged_attributes = Map::new(); - - for json in json_arr { - if let Some(Value::String(attrs_str)) = json.get(OTHER_ATTRIBUTES_KEY) { - if let Ok(attrs) = serde_json::from_str::>(attrs_str) { - for (key, value) in attrs { - merged_attributes.insert(key, value); - } - } - } - } - merged_attributes -} - -/// convert attributes map to a string -/// and return the string -/// if serialisation fails, return an empty string -pub fn fetch_attributes_string(attributes: &Map) -> String { - match serde_json::to_string(attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - } -} - -/// add `other_attributes` to the JSON object -/// if `other_attributes` is not empty -/// and return the JSON object -pub fn add_other_attributes_if_not_empty( - json: &mut Map, - other_attributes: &Map, -) { - if !other_attributes.is_empty() { - let attrs_str = fetch_attributes_string(other_attributes); - json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(attrs_str)); - } +#[derive(Debug, thiserror::Error)] +pub enum OtelError { + #[error("Ingestion failed because the attributes count {0} exceeded the threshold of {1}")] + AttributeCountExceeded(usize, usize), } diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 87e4556a8..3c9ff3c5d 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use opentelemetry_proto::tonic::trace::v1::span::Event; use opentelemetry_proto::tonic::trace::v1::span::Link; use opentelemetry_proto::tonic::trace::v1::ScopeSpans; @@ -24,13 +26,19 @@ use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::{Map, Value}; -use super::otel_utils::add_other_attributes_if_not_empty; +use crate::otel::otel_utils::OtelError; +use crate::parseable::PARSEABLE; + use super::otel_utils::convert_epoch_nano_to_timestamp; -use super::otel_utils::fetch_attributes_from_json; use super::otel_utils::insert_attributes; -use super::otel_utils::merge_attributes_in_json; -pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ +pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 31] = [ + "scope_name", + "scope_version", + "scope_schema_url", + "scope_dropped_attributes_count", + "resource_schema_url", + "resource_dropped_attributes_count", "span_trace_id", "span_span_id", "span_name", @@ -43,6 +51,16 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ "span_end_time_unix_nano", "event_name", "event_time_unix_nano", + "event_dropped_attributes_count", + "link_span_id", + "link_trace_id", + "link_dropped_attributes_count", + "span_dropped_events_count", + "span_dropped_links_count", + "span_dropped_attributes_count", + "span_trace_state", + "span_flags", + "span_flags_description", "span_status_code", "span_status_description", "span_status_message", @@ -52,7 +70,6 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { let mut vec_scope_span_json = Vec::new(); let mut scope_span_json = Map::new(); - let mut other_attributes = Map::new(); for span in &scope_span.spans { let span_record_json = flatten_span_record(span); vec_scope_span_json.extend(span_record_json); @@ -64,11 +81,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes( - &mut scope_span_json, - &scope.attributes, - &mut other_attributes, - ); + insert_attributes(&mut scope_span_json, &scope.attributes); scope_span_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -87,26 +100,20 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { Value::String(scope_span.schema_url.clone()), ); } - // Add the `other_attributes` to the scope span json - merge_attributes_in_json(other_attributes, &mut vec_scope_span_json); vec_scope_span_json } /// this function performs the custom flattening of the otel traces event /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Vec { +pub fn flatten_otel_traces(message: &TracesData) -> Result, OtelError> { let mut vec_otel_json = Vec::new(); + let known_fields: HashSet<&str> = OTEL_TRACES_KNOWN_FIELD_LIST.iter().cloned().collect(); for record in &message.resource_spans { let mut resource_span_json = Map::new(); - let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { - insert_attributes( - &mut resource_span_json, - &resource.attributes, - &mut other_attributes, - ); + insert_attributes(&mut resource_span_json, &resource.attributes); resource_span_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -128,13 +135,30 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { for (key, value) in &resource_span_json { resource_spans_json.insert(key.clone(), value.clone()); } + + let attribute_count = resource_spans_json + .keys() + .filter(|key| !known_fields.contains(key.as_str())) + .count(); + + // Check if the number of attributes exceeds the allowed limit + if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { + tracing::error!( + "OTEL traces ingestion failed because the number of attributes ({}) exceeded the threshold of {}", + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit + ); + return Err(OtelError::AttributeCountExceeded( + attribute_count, + PARSEABLE.options.otel_attributes_allowed_limit, + )); + } + + vec_otel_json.push(Value::Object(resource_spans_json.clone())); } - // Add the `other_attributes` to the resource span json - merge_attributes_in_json(other_attributes, &mut vec_resource_spans_json); - vec_otel_json.extend(vec_resource_spans_json); } - vec_otel_json.into_iter().map(Value::Object).collect() + Ok(vec_otel_json) } /// otel traces has json array of events @@ -145,7 +169,6 @@ fn flatten_events(events: &[Event]) -> Vec> { .iter() .map(|event| { let mut event_json = Map::new(); - let mut other_attributes = Map::new(); event_json.insert( "event_time_unix_nano".to_string(), Value::String( @@ -153,13 +176,11 @@ fn flatten_events(events: &[Event]) -> Vec> { ), ); event_json.insert("event_name".to_string(), Value::String(event.name.clone())); - insert_attributes(&mut event_json, &event.attributes, &mut other_attributes); + insert_attributes(&mut event_json, &event.attributes); event_json.insert( "event_dropped_attributes_count".to_string(), Value::Number(event.dropped_attributes_count.into()), ); - - add_other_attributes_if_not_empty(&mut event_json, &other_attributes); event_json }) .collect() @@ -173,7 +194,6 @@ fn flatten_links(links: &[Link]) -> Vec> { .iter() .map(|link| { let mut link_json = Map::new(); - let mut other_attributes = Map::new(); link_json.insert( "link_span_id".to_string(), Value::String(hex::encode(&link.span_id)), @@ -183,13 +203,11 @@ fn flatten_links(links: &[Link]) -> Vec> { Value::String(hex::encode(&link.trace_id)), ); - insert_attributes(&mut link_json, &link.attributes, &mut other_attributes); + insert_attributes(&mut link_json, &link.attributes); link_json.insert( "link_dropped_attributes_count".to_string(), Value::Number(link.dropped_attributes_count.into()), ); - - add_other_attributes_if_not_empty(&mut link_json, &other_attributes); link_json }) .collect() @@ -274,7 +292,6 @@ fn flatten_kind(kind: i32) -> Map { /// this function is called recursively for each span record object in the otel traces event fn flatten_span_record(span_record: &Span) -> Vec> { let mut span_records_json = Vec::new(); - let mut other_attributes = Map::new(); let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(), @@ -310,40 +327,20 @@ fn flatten_span_record(span_record: &Span) -> Vec> { span_record.end_time_unix_nano as i64, )), ); - insert_attributes( - &mut span_record_json, - &span_record.attributes, - &mut other_attributes, - ); + insert_attributes(&mut span_record_json, &span_record.attributes); span_record_json.insert( "span_dropped_attributes_count".to_string(), Value::Number(span_record.dropped_attributes_count.into()), ); let events_json = flatten_events(&span_record.events); - // fetch all other_attributes from the events_json - let events_other_attributes = fetch_attributes_from_json(&events_json); span_records_json.extend(events_json); span_record_json.insert( "span_dropped_events_count".to_string(), Value::Number(span_record.dropped_events_count.into()), ); let links_json = flatten_links(&span_record.links); - // fetch all other_attributes from the links_json - let links_other_attributes = fetch_attributes_from_json(&links_json); span_records_json.extend(links_json); - // merge all other_attributes from the events_json and links_json - if !other_attributes.is_empty() - || !events_other_attributes.is_empty() - || !links_other_attributes.is_empty() - { - for (key, value) in &events_other_attributes { - other_attributes.insert(key.clone(), value.clone()); - } - for (key, value) in &links_other_attributes { - other_attributes.insert(key.clone(), value.clone()); - } - add_other_attributes_if_not_empty(&mut span_record_json, &other_attributes); - } + span_record_json.insert( "span_dropped_links_count".to_string(), Value::Number(span_record.dropped_links_count.into()), From 038df7c6fef6b8d1e62fd23c52d0bd8f0654dc07 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 12 May 2025 22:09:44 -0400 Subject: [PATCH 02/11] refactor --- src/handlers/http/ingest.rs | 2 +- src/otel/logs.rs | 6 +++--- src/otel/metrics.rs | 13 ++++++------- src/otel/otel_utils.rs | 7 ++++--- src/otel/traces.rs | 11 +++++------ 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index d40c821a2..6f52be525 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -498,7 +498,7 @@ impl actix_web::ResponseError for PostError { PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, - PostError::OtelError(_) => StatusCode::EXPECTATION_FAILED, + PostError::OtelError(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/otel/logs.rs b/src/otel/logs.rs index a9e8bd478..f7567f773 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -37,7 +37,7 @@ pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [ "scope_log_schema_url", "scope_dropped_attributes_count", "resource_dropped_attributes_count", - "resource_schema_url", + "schema_url", "time_unix_nano", "observed_time_unix_nano", "severity_number", @@ -178,8 +178,8 @@ pub fn flatten_otel_logs(message: &LogsData) -> Result, OtelError> { resource_logs_json.extend(resource_log_json.clone()); let attribute_count = resource_logs_json - .keys() - .filter(|key| !known_fields.contains(key.as_str())) + .iter() + .filter(|(key, _)| !known_fields.contains(key.as_str())) .count(); // Check if the number of attributes exceeds the allowed limit if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 02b3227ce..a9c096071 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -32,7 +32,7 @@ use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; -pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 35] = [ +pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 34] = [ "metric_name", "metric_description", "metric_unit", @@ -63,11 +63,10 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 35] = [ "metric_type", "scope_name", "scope_version", - "scope_metrics_schema_url", + "scope_schema_url", "scope_dropped_attributes_count", "resource_dropped_attributes_count", "resource_schema_url", - "resource_metrics_schema_url", ]; /// otel metrics event has json array for exemplar @@ -544,7 +543,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelErro } scope_metrics_json.insert( - "scope_metrics_schema_url".to_string(), + "scope_schema_url".to_string(), Value::String(scope_metric.schema_url.clone()), ); @@ -556,7 +555,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelErro } resource_metrics_json.insert( - "resource_metrics_schema_url".to_string(), + "resource_schema_url".to_string(), Value::String(record.schema_url.clone()), ); @@ -566,8 +565,8 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelErro } let attribute_count = resource_metric_json - .keys() - .filter(|key| !known_fields.contains(key.as_str())) + .iter() + .filter(|(key, _)| !known_fields.contains(key.as_str())) .count(); // Check if the number of attributes exceeds the allowed limit diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index eeec03cef..f8206a04c 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -20,6 +20,7 @@ use chrono::DateTime; use opentelemetry_proto::tonic::common::v1::{ any_value::Value as OtelValue, AnyValue, ArrayValue, KeyValue, KeyValueList, }; +use serde::Serialize; use serde_json::{Map, Value}; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte @@ -149,7 +150,7 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes(attributes: &Vec) -> Map { +pub fn flatten_attributes(attributes: &[KeyValue]) -> Map { let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; @@ -182,7 +183,7 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes(map: &mut Map, attributes: &Vec) { +pub fn insert_attributes(map: &mut Map, attributes: &[KeyValue]) { let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); @@ -194,7 +195,7 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Serialize)] pub enum OtelError { #[error("Ingestion failed because the attributes count {0} exceeded the threshold of {1}")] AttributeCountExceeded(usize, usize), diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 3c9ff3c5d..b8aa1afd6 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -32,7 +32,7 @@ use crate::parseable::PARSEABLE; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; -pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 31] = [ +pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 30] = [ "scope_name", "scope_version", "scope_schema_url", @@ -43,7 +43,6 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 31] = [ "span_span_id", "span_name", "span_parent_span_id", - "flags", "name", "span_kind", "span_kind_description", @@ -96,7 +95,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { for span_json in &mut vec_scope_span_json { span_json.insert( - "schema_url".to_string(), + "scope_schema_url".to_string(), Value::String(scope_span.schema_url.clone()), ); } @@ -127,7 +126,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Result, OtelError } resource_span_json.insert( - "schema_url".to_string(), + "resource_schema_url".to_string(), Value::String(record.schema_url.clone()), ); @@ -137,8 +136,8 @@ pub fn flatten_otel_traces(message: &TracesData) -> Result, OtelError } let attribute_count = resource_spans_json - .keys() - .filter(|key| !known_fields.contains(key.as_str())) + .iter() + .filter(|(key, _)| !known_fields.contains(key.as_str())) .count(); // Check if the number of attributes exceeds the allowed limit From a11790c31292316ca04ef4c8a9277d83bcbdcc31 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 May 2025 00:46:23 -0400 Subject: [PATCH 03/11] add validation for env --- src/cli.rs | 1 + src/option.rs | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/cli.rs b/src/cli.rs index 731b3909e..12be3b3d7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -373,6 +373,7 @@ pub struct Options { long, env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT", default_value = "200", + value_parser = validation::validate_otel_attributes_allowed_limit, help = "allowed limit for otel attributes" )] pub otel_attributes_allowed_limit: usize, diff --git a/src/option.rs b/src/option.rs index 26b5f4664..277247aeb 100644 --- a/src/option.rs +++ b/src/option.rs @@ -173,4 +173,19 @@ pub mod validation { Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string()) } } + + pub fn validate_otel_attributes_allowed_limit(s: &str) -> Result { + if let Ok(size) = s.parse::() { + if (1..=200).contains(&size) { + Ok(size) + } else { + Err(format!( + "Invalid value for size. It should be between 1 and {}", + 200 + )) + } + } else { + Err("Invalid value for size. It should be given as integer value".to_string()) + } + } } From 59582a6ce200b438c41edccbf061ff050130282a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 May 2025 00:53:10 -0400 Subject: [PATCH 04/11] add const for OTEL_ATTRIBUTES_ALLOWED_LIMIT --- src/option.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/option.rs b/src/option.rs index 277247aeb..a0c84a0ed 100644 --- a/src/option.rs +++ b/src/option.rs @@ -95,6 +95,9 @@ pub mod validation { use super::{Compression, Mode}; + // Maximum allowed otel attributes per event + const OTEL_ATTRIBUTES_ALLOWED_LIMIT: usize = 200; + pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); @@ -176,16 +179,16 @@ pub mod validation { pub fn validate_otel_attributes_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { - if (1..=200).contains(&size) { + if (1..=OTEL_ATTRIBUTES_ALLOWED_LIMIT).contains(&size) { Ok(size) } else { Err(format!( - "Invalid value for size. It should be between 1 and {}", - 200 + "Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be between 1 and {}", + OTEL_ATTRIBUTES_ALLOWED_LIMIT )) } } else { - Err("Invalid value for size. It should be given as integer value".to_string()) + Err("Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be given as integer value".to_string()) } } } From 9045f6620afc0fd86e70dea0c1de1b5cf672b512 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 May 2025 02:26:38 -0400 Subject: [PATCH 05/11] remove attributes check from otel logs, traces, metrics add check for fields count in any dataset reject event if fields count exceeds `P_DATASET_FIELDS_ALLOWED_LIMIT` default value is set to 250 --- src/cli.rs | 10 +++---- src/handlers/http/ingest.rs | 7 ++--- src/handlers/http/modal/utils/ingest_utils.rs | 27 +++++++++++++---- src/option.rs | 14 ++++----- src/otel/logs.rs | 29 ++---------------- src/otel/metrics.rs | 30 ++----------------- src/otel/otel_utils.rs | 7 ----- src/otel/traces.rs | 29 ++---------------- 8 files changed, 42 insertions(+), 111 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 12be3b3d7..20fcfbd49 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -371,12 +371,12 @@ pub struct Options { #[arg( long, - env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT", - default_value = "200", - value_parser = validation::validate_otel_attributes_allowed_limit, - help = "allowed limit for otel attributes" + env = "P_DATASET_FIELDS_ALLOWED_LIMIT", + default_value = "250", + value_parser = validation::validate_dataset_fields_allowed_limit, + help = "allowed limit for fields count in a dataset" )] - pub otel_attributes_allowed_limit: usize, + pub dataset_fields_allowed_limit: usize, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6f52be525..50b9ec944 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -35,7 +35,6 @@ use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; -use crate::otel::otel_utils::OtelError; use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; @@ -468,8 +467,8 @@ pub enum PostError { KnownFormat(#[from] known_schema::Error), #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), - #[error("OtelError: {0}")] - OtelError(#[from] OtelError), + #[error("Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.")] + FieldsLimitExceeded(String, usize, usize), } impl actix_web::ResponseError for PostError { @@ -498,7 +497,7 @@ impl actix_web::ResponseError for PostError { PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, - PostError::OtelError(_) => StatusCode::BAD_REQUEST, + PostError::FieldsLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 1754e59c4..a5dcfa440 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -54,6 +54,24 @@ pub async fn flatten_and_push_logs( log_source: &LogSource, p_custom_fields: &HashMap, ) -> Result<(), PostError> { + // fetch the storage schema for the stream + let schema = PARSEABLE.get_stream(stream_name)?.get_schema(); + //fetch the fields count from the schema + let fields_count = schema.fields().len(); + if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { + tracing::error!( + "Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.", + stream_name, + fields_count, + PARSEABLE.options.dataset_fields_allowed_limit); + // Return an error if the fields count exceeds the limit + return Err(PostError::FieldsLimitExceeded( + stream_name.to_string(), + fields_count, + PARSEABLE.options.dataset_fields_allowed_limit, + )); + } + match log_source { LogSource::Kinesis => { //custom flattening required for Amazon Kinesis @@ -65,24 +83,21 @@ pub async fn flatten_and_push_logs( LogSource::OtelLogs => { //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; - let records = flatten_otel_logs(&logs)?; - for record in records { + for record in flatten_otel_logs(&logs) { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; - let records = flatten_otel_traces(&traces)?; - for record in records { + for record in flatten_otel_traces(&traces) { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; - let records = flatten_otel_metrics(metrics)?; - for record in records { + for record in flatten_otel_metrics(metrics) { push_logs(stream_name, record, log_source, p_custom_fields).await?; } } diff --git a/src/option.rs b/src/option.rs index a0c84a0ed..269d9c966 100644 --- a/src/option.rs +++ b/src/option.rs @@ -95,8 +95,8 @@ pub mod validation { use super::{Compression, Mode}; - // Maximum allowed otel attributes per event - const OTEL_ATTRIBUTES_ALLOWED_LIMIT: usize = 200; + // Maximum allowed count for fields in a dataset + const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; pub fn file_path(s: &str) -> Result { if s.is_empty() { @@ -177,18 +177,18 @@ pub mod validation { } } - pub fn validate_otel_attributes_allowed_limit(s: &str) -> Result { + pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { - if (1..=OTEL_ATTRIBUTES_ALLOWED_LIMIT).contains(&size) { + if (1..=DATASET_FIELDS_ALLOWED_LIMIT).contains(&size) { Ok(size) } else { Err(format!( - "Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be between 1 and {}", - OTEL_ATTRIBUTES_ALLOWED_LIMIT + "Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}", + DATASET_FIELDS_ALLOWED_LIMIT )) } } else { - Err("Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be given as integer value".to_string()) + Err("Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be given as integer value".to_string()) } } } diff --git a/src/otel/logs.rs b/src/otel/logs.rs index f7567f773..f301869d2 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -15,15 +15,9 @@ * along with this program. If not, see . * */ - -use std::collections::HashSet; - -use crate::parseable::PARSEABLE; - use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; -use super::otel_utils::OtelError; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; @@ -150,10 +144,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { /// this function performs the custom flattening of the otel logs /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_logs(message: &LogsData) -> Result, OtelError> { +pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); - let known_fields: HashSet<&str> = OTEL_LOG_KNOWN_FIELD_LIST.iter().cloned().collect(); - for record in &message.resource_logs { let mut resource_log_json = Map::new(); if let Some(resource) = &record.resource { @@ -177,26 +169,9 @@ pub fn flatten_otel_logs(message: &LogsData) -> Result, OtelError> { for resource_logs_json in &mut vec_resource_logs_json { resource_logs_json.extend(resource_log_json.clone()); - let attribute_count = resource_logs_json - .iter() - .filter(|(key, _)| !known_fields.contains(key.as_str())) - .count(); - // Check if the number of attributes exceeds the allowed limit - if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { - tracing::error!( - "OTEL logs ingestion failed because the number of attributes ({}) exceeded the threshold of {}", - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit - ); - return Err(OtelError::AttributeCountExceeded( - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit, - )); - } - vec_otel_json.push(Value::Object(resource_logs_json.clone())); } } - Ok(vec_otel_json) + vec_otel_json } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index a9c096071..4edff77fe 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -15,9 +15,6 @@ * along with this program. If not, see . * */ - -use std::collections::HashSet; - use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, @@ -25,9 +22,6 @@ use opentelemetry_proto::tonic::metrics::v1::{ }; use serde_json::{Map, Value}; -use crate::otel::otel_utils::OtelError; -use crate::parseable::PARSEABLE; - use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; @@ -507,10 +501,8 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec /// this function performs the custom flattening of the otel metrics /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelError> { +pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); - let known_fields: HashSet<&str> = OTEL_METRICS_KNOWN_FIELD_LIST.iter().cloned().collect(); - for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); if let Some(resource) = &record.resource { @@ -564,29 +556,11 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Result, OtelErro resource_metric_json.insert(key.clone(), value.clone()); } - let attribute_count = resource_metric_json - .iter() - .filter(|(key, _)| !known_fields.contains(key.as_str())) - .count(); - - // Check if the number of attributes exceeds the allowed limit - if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { - tracing::error!( - "OTEL metrics ingestion failed because the number of attributes ({}) exceeded the threshold of {}", - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit - ); - return Err(OtelError::AttributeCountExceeded( - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit, - )); - } - vec_otel_json.push(Value::Object(resource_metric_json.clone())); } } - Ok(vec_otel_json) + vec_otel_json } /// otel metrics event has json object for aggregation temporality diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index f8206a04c..9cf0e1644 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -20,7 +20,6 @@ use chrono::DateTime; use opentelemetry_proto::tonic::common::v1::{ any_value::Value as OtelValue, AnyValue, ArrayValue, KeyValue, KeyValueList, }; -use serde::Serialize; use serde_json::{Map, Value}; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte @@ -194,9 +193,3 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc(); dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } - -#[derive(Debug, thiserror::Error, Serialize)] -pub enum OtelError { - #[error("Ingestion failed because the attributes count {0} exceeded the threshold of {1}")] - AttributeCountExceeded(usize, usize), -} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index b8aa1afd6..f243ecacf 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -15,9 +15,6 @@ * along with this program. If not, see . * */ - -use std::collections::HashSet; - use opentelemetry_proto::tonic::trace::v1::span::Event; use opentelemetry_proto::tonic::trace::v1::span::Link; use opentelemetry_proto::tonic::trace::v1::ScopeSpans; @@ -26,9 +23,6 @@ use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::{Map, Value}; -use crate::otel::otel_utils::OtelError; -use crate::parseable::PARSEABLE; - use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; @@ -105,9 +99,8 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { /// this function performs the custom flattening of the otel traces event /// and returns a `Vec` of `Value::Object` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Result, OtelError> { +pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_otel_json = Vec::new(); - let known_fields: HashSet<&str> = OTEL_TRACES_KNOWN_FIELD_LIST.iter().cloned().collect(); for record in &message.resource_spans { let mut resource_span_json = Map::new(); @@ -135,29 +128,11 @@ pub fn flatten_otel_traces(message: &TracesData) -> Result, OtelError resource_spans_json.insert(key.clone(), value.clone()); } - let attribute_count = resource_spans_json - .iter() - .filter(|(key, _)| !known_fields.contains(key.as_str())) - .count(); - - // Check if the number of attributes exceeds the allowed limit - if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit { - tracing::error!( - "OTEL traces ingestion failed because the number of attributes ({}) exceeded the threshold of {}", - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit - ); - return Err(OtelError::AttributeCountExceeded( - attribute_count, - PARSEABLE.options.otel_attributes_allowed_limit, - )); - } - vec_otel_json.push(Value::Object(resource_spans_json.clone())); } } - Ok(vec_otel_json) + vec_otel_json } /// otel traces has json array of events From df59fde0bb54bdb77e7ec4362efa48f81453edae Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 May 2025 05:59:03 -0400 Subject: [PATCH 06/11] add warning threshold --- src/handlers/http/ingest.rs | 2 +- src/handlers/http/modal/utils/ingest_utils.rs | 54 +++++++++++++------ src/option.rs | 4 +- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 50b9ec944..62f9fdce4 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -467,7 +467,7 @@ pub enum PostError { KnownFormat(#[from] known_schema::Error), #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), - #[error("Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.")] + #[error("Ingestion has been stoppped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.")] FieldsLimitExceeded(String, usize, usize), } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index a5dcfa440..59a32d43f 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -47,6 +47,8 @@ use crate::{ const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; const MAX_CUSTOM_FIELDS: usize = 10; const MAX_FIELD_VALUE_LENGTH: usize = 100; +// Maximum allowed count for fields in a dataset +pub const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; pub async fn flatten_and_push_logs( json: Value, @@ -54,23 +56,8 @@ pub async fn flatten_and_push_logs( log_source: &LogSource, p_custom_fields: &HashMap, ) -> Result<(), PostError> { - // fetch the storage schema for the stream - let schema = PARSEABLE.get_stream(stream_name)?.get_schema(); - //fetch the fields count from the schema - let fields_count = schema.fields().len(); - if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { - tracing::error!( - "Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.", - stream_name, - fields_count, - PARSEABLE.options.dataset_fields_allowed_limit); - // Return an error if the fields count exceeds the limit - return Err(PostError::FieldsLimitExceeded( - stream_name.to_string(), - fields_count, - PARSEABLE.options.dataset_fields_allowed_limit, - )); - } + // Verify the dataset fields count + verify_dataset_fields_count(stream_name)?; match log_source { LogSource::Kinesis => { @@ -223,6 +210,39 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap Result<(), PostError> { + let fields_count = PARSEABLE + .get_stream(stream_name)? + .get_schema() + .fields() + .len(); + let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64; + // Check if the fields count exceeds the warn threshold + if fields_count > dataset_fields_warn_threshold as usize { + tracing::warn!( + "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset.", + dataset_fields_warn_threshold, + stream_name, + dataset_fields_warn_threshold); + } + // Check if the fields count exceeds the limit + // Return an error if the fields count exceeds the limit + if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { + tracing::error!( + "Ingestion has been stopped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.", + stream_name, + fields_count, + PARSEABLE.options.dataset_fields_allowed_limit); + // Return an error if the fields count exceeds the limit + return Err(PostError::FieldsLimitExceeded( + stream_name.to_string(), + fields_count, + PARSEABLE.options.dataset_fields_allowed_limit, + )); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/option.rs b/src/option.rs index 269d9c966..4fba2bc00 100644 --- a/src/option.rs +++ b/src/option.rs @@ -91,13 +91,11 @@ pub mod validation { path::{Path, PathBuf}, }; + use crate::handlers::http::modal::utils::ingest_utils::DATASET_FIELDS_ALLOWED_LIMIT; use path_clean::PathClean; use super::{Compression, Mode}; - // Maximum allowed count for fields in a dataset - const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; - pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); From e46ee5fc28869263ca0fb00c03ed24494b8b2909 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 13 May 2025 06:09:16 -0400 Subject: [PATCH 07/11] correct warning message --- src/handlers/http/modal/utils/ingest_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 59a32d43f..5ea9ae492 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -221,7 +221,7 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { if fields_count > dataset_fields_warn_threshold as usize { tracing::warn!( "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset.", - dataset_fields_warn_threshold, + fields_count, stream_name, dataset_fields_warn_threshold); } From cb1d328c85844d6e6421c0421ad0cc31218cb343 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Wed, 14 May 2025 17:44:52 +0530 Subject: [PATCH 08/11] Update src/handlers/http/ingest.rs Co-authored-by: Nitish Tiwari Signed-off-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> --- src/handlers/http/ingest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 62f9fdce4..9c2e99d91 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -467,7 +467,7 @@ pub enum PostError { KnownFormat(#[from] known_schema::Error), #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), - #[error("Ingestion has been stoppped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.")] + #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] FieldsLimitExceeded(String, usize, usize), } From 5ff122aaf0898099010a97d46633fcf65ca684b6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 08:47:59 -0400 Subject: [PATCH 09/11] incorporate suggestions --- src/cli.rs | 7 +++--- src/handlers/http/ingest.rs | 4 ++-- src/handlers/http/modal/utils/ingest_utils.rs | 24 ++++++++----------- src/option.rs | 10 ++++---- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 20fcfbd49..2bd86c5a4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -35,6 +35,7 @@ use crate::{ pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; +pub const DATASET_FIELD_COUNT_LIMIT: usize = 250; #[derive(Parser)] #[command( name = "parseable", @@ -371,10 +372,10 @@ pub struct Options { #[arg( long, - env = "P_DATASET_FIELDS_ALLOWED_LIMIT", - default_value = "250", + env = "P_DATASET_FIELD_COUNT_LIMIT", + default_value_t = DATASET_FIELD_COUNT_LIMIT, value_parser = validation::validate_dataset_fields_allowed_limit, - help = "allowed limit for fields count in a dataset" + help = "total number of fields recommended in a dataset" )] pub dataset_fields_allowed_limit: usize, } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 9c2e99d91..8e252b14f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -468,7 +468,7 @@ pub enum PostError { #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] IncorrectLogFormat(String), #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] - FieldsLimitExceeded(String, usize, usize), + FieldsCountLimitExceeded(String, usize, usize), } impl actix_web::ResponseError for PostError { @@ -497,7 +497,7 @@ impl actix_web::ResponseError for PostError { PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, - PostError::FieldsLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, + PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 5ea9ae492..0c5ae486f 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -47,8 +47,6 @@ use crate::{ const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; const MAX_CUSTOM_FIELDS: usize = 10; const MAX_FIELD_VALUE_LENGTH: usize = 100; -// Maximum allowed count for fields in a dataset -pub const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; pub async fn flatten_and_push_logs( json: Value, @@ -216,29 +214,27 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { .get_schema() .fields() .len(); - let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64; + let dataset_fields_warn_threshold = 0.8 * PARSEABLE.options.dataset_fields_allowed_limit as f64; // Check if the fields count exceeds the warn threshold if fields_count > dataset_fields_warn_threshold as usize { tracing::warn!( - "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset.", - fields_count, + "Total fields in dataset {0} has reached the warning threshold of {1}. Ingestion will not be possible after reaching {2} fields. We recommend creating a new dataset.", stream_name, - dataset_fields_warn_threshold); + dataset_fields_warn_threshold, + PARSEABLE.options.dataset_fields_allowed_limit + ); } // Check if the fields count exceeds the limit // Return an error if the fields count exceeds the limit if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { - tracing::error!( - "Ingestion has been stopped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.", - stream_name, - fields_count, - PARSEABLE.options.dataset_fields_allowed_limit); - // Return an error if the fields count exceeds the limit - return Err(PostError::FieldsLimitExceeded( + let error = PostError::FieldsCountLimitExceeded( stream_name.to_string(), fields_count, PARSEABLE.options.dataset_fields_allowed_limit, - )); + ); + tracing::error!("{}", error); + // Return an error if the fields count exceeds the limit + return Err(error); } Ok(()) } diff --git a/src/option.rs b/src/option.rs index 4fba2bc00..db9c94097 100644 --- a/src/option.rs +++ b/src/option.rs @@ -91,7 +91,7 @@ pub mod validation { path::{Path, PathBuf}, }; - use crate::handlers::http::modal::utils::ingest_utils::DATASET_FIELDS_ALLOWED_LIMIT; + use crate::cli::DATASET_FIELD_COUNT_LIMIT; use path_clean::PathClean; use super::{Compression, Mode}; @@ -177,16 +177,16 @@ pub mod validation { pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { - if (1..=DATASET_FIELDS_ALLOWED_LIMIT).contains(&size) { + if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) { Ok(size) } else { Err(format!( - "Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}", - DATASET_FIELDS_ALLOWED_LIMIT + "Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}", + DATASET_FIELD_COUNT_LIMIT )) } } else { - Err("Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be given as integer value".to_string()) + Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string()) } } } From 566ea152d19202fd0672c84618de69d6bf48b874 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 08:54:40 -0400 Subject: [PATCH 10/11] convert threshold as usize --- src/handlers/http/modal/utils/ingest_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 0c5ae486f..241a652d8 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -220,7 +220,7 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { tracing::warn!( "Total fields in dataset {0} has reached the warning threshold of {1}. Ingestion will not be possible after reaching {2} fields. We recommend creating a new dataset.", stream_name, - dataset_fields_warn_threshold, + dataset_fields_warn_threshold as usize, PARSEABLE.options.dataset_fields_allowed_limit ); } From 1636d3108139d1e7c36134366cba15b6beb0917a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 May 2025 10:09:18 -0400 Subject: [PATCH 11/11] add field count to warning --- src/handlers/http/modal/utils/ingest_utils.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 241a652d8..6cffb6d99 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -218,8 +218,9 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { // Check if the fields count exceeds the warn threshold if fields_count > dataset_fields_warn_threshold as usize { tracing::warn!( - "Total fields in dataset {0} has reached the warning threshold of {1}. Ingestion will not be possible after reaching {2} fields. We recommend creating a new dataset.", + "Dataset {0} has {1} fields, which exceeds the warning threshold of {2}. Ingestion will not be possible after reaching {3} fields. We recommend creating a new dataset.", stream_name, + fields_count, dataset_fields_warn_threshold as usize, PARSEABLE.options.dataset_fields_allowed_limit );