Skip to content

reject event if fields count exceed 250 #1311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 14, 2025
9 changes: 9 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,15 @@ pub struct Options {

#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
pub ms_clarity_tag: Option<String>,

#[arg(
long,
env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT",
default_value = "200",
value_parser = validation::validate_otel_attributes_allowed_limit,
help = "allowed limit for otel attributes"
)]
pub otel_attributes_allowed_limit: usize,
}

#[derive(Parser, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
use crate::otel::otel_utils::OtelError;
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::storage::{ObjectStorageError, StreamType};
Expand Down Expand Up @@ -467,6 +468,8 @@ pub enum PostError {
KnownFormat(#[from] known_schema::Error),
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
IncorrectLogFormat(String),
#[error("OtelError: {0}")]
OtelError(#[from] OtelError),
}

impl actix_web::ResponseError for PostError {
Expand Down Expand Up @@ -495,6 +498,7 @@ impl actix_web::ResponseError for PostError {
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
PostError::OtelError(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,24 @@ pub async fn flatten_and_push_logs(
LogSource::OtelLogs => {
//custom flattening required for otel logs
let logs: LogsData = serde_json::from_value(json)?;
for record in flatten_otel_logs(&logs) {
let records = flatten_otel_logs(&logs)?;
for record in records {
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
}
LogSource::OtelTraces => {
//custom flattening required for otel traces
let traces: TracesData = serde_json::from_value(json)?;
for record in flatten_otel_traces(&traces) {
let records = flatten_otel_traces(&traces)?;
for record in records {
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
}
LogSource::OtelMetrics => {
//custom flattening required for otel metrics
let metrics: MetricsData = serde_json::from_value(json)?;
for record in flatten_otel_metrics(metrics) {
let records = flatten_otel_metrics(metrics)?;
for record in records {
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,19 @@ pub mod validation {
Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string())
}
}

pub fn validate_otel_attributes_allowed_limit(s: &str) -> Result<usize, String> {
if let Ok(size) = s.parse::<usize>() {
if (1..=200).contains(&size) {
Ok(size)
} else {
Err(format!(
"Invalid value for size. It should be between 1 and {}",
200
))
}
} else {
Err("Invalid value for size. It should be given as integer value".to_string())
}
}
}
84 changes: 46 additions & 38 deletions src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,38 @@
*
*/

use std::collections::HashSet;

use crate::parseable::PARSEABLE;

use super::otel_utils::collect_json_from_values;
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;
use super::otel_utils::OtelError;
use opentelemetry_proto::tonic::logs::v1::LogRecord;
use opentelemetry_proto::tonic::logs::v1::LogsData;
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
use serde_json::Map;
use serde_json::Value;

use super::otel_utils::add_other_attributes_if_not_empty;
use super::otel_utils::collect_json_from_values;
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;
use super::otel_utils::merge_attributes_in_json;

pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this list now? We're defaulting to separate columns right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this list was initially maintained to store the known field list along with the known log format name in the stream info, with an idea that UI can use the fields list to apply quick filters

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identifying useful columns is difficult. chances of getting it wrong are high. In UX we're working on a way that lets user decide what is important for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can create an issue and work on it in a separate PR, similar change needs to be done at the place when we detect schema and add fields to stream_info, other than otel

"scope_name",
"scope_version",
"scope_log_schema_url",
"scope_dropped_attributes_count",
"resource_dropped_attributes_count",
"schema_url",
"time_unix_nano",
"observed_time_unix_nano",
"severity_number",
"severity_text",
"body",
"flags",
"log_record_dropped_attributes_count",
"span_id",
"trace_id",
"event_name",
];
/// otel log event has severity number
/// there is a mapping of severity number to severity text provided in proto
Expand All @@ -60,7 +72,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
/// this function is called recursively for each log record object in the otel logs
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
let mut log_record_json: Map<String, Value> = Map::new();
let mut other_attributes = Map::new();
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(convert_epoch_nano_to_timestamp(
Expand All @@ -83,11 +94,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
}
}
insert_attributes(
&mut log_record_json,
&log_record.attributes,
&mut other_attributes,
);
insert_attributes(&mut log_record_json, &log_record.attributes);
log_record_json.insert(
"log_record_dropped_attributes_count".to_string(),
Value::Number(log_record.dropped_attributes_count.into()),
Expand All @@ -106,9 +113,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
Value::String(hex::encode(&log_record.trace_id)),
);

// Add the `other_attributes` to the log record json
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);

log_record_json
}

Expand All @@ -117,18 +121,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
let mut vec_scope_log_json = Vec::new();
let mut scope_log_json = Map::new();
let mut other_attributes = Map::new();
if let Some(scope) = &scope_log.scope {
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
scope_log_json.insert(
"scope_version".to_string(),
Value::String(scope.version.clone()),
);
insert_attributes(
&mut scope_log_json,
&scope.attributes,
&mut other_attributes,
);
insert_attributes(&mut scope_log_json, &scope.attributes);
scope_log_json.insert(
"scope_dropped_attributes_count".to_string(),
Value::Number(scope.dropped_attributes_count.into()),
Expand All @@ -146,26 +145,19 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
vec_scope_log_json.push(combined_json);
}

// Add the `other_attributes` to the scope log json
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);

vec_scope_log_json
}

/// this function performs the custom flattening of the otel logs
/// and returns a `Vec` of `Value::Object` of the flattened json
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
pub fn flatten_otel_logs(message: &LogsData) -> Result<Vec<Value>, OtelError> {
let mut vec_otel_json = Vec::new();
let known_fields: HashSet<&str> = OTEL_LOG_KNOWN_FIELD_LIST.iter().cloned().collect();

for record in &message.resource_logs {
let mut resource_log_json = Map::new();
let mut other_attributes = Map::new();
if let Some(resource) = &record.resource {
insert_attributes(
&mut resource_log_json,
&resource.attributes,
&mut other_attributes,
);
insert_attributes(&mut resource_log_json, &resource.attributes);
resource_log_json.insert(
"resource_dropped_attributes_count".to_string(),
Value::Number(resource.dropped_attributes_count.into()),
Expand All @@ -176,19 +168,35 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
for scope_log in &record.scope_logs {
vec_resource_logs_json.extend(flatten_scope_log(scope_log));
}

resource_log_json.insert(
"schema_url".to_string(),
Value::String(record.schema_url.clone()),
);

for resource_logs_json in &mut vec_resource_logs_json {
resource_logs_json.extend(resource_log_json.clone());
}

// Add the `other_attributes` to the resource log json
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);

vec_otel_json.extend(vec_resource_logs_json);
let attribute_count = resource_logs_json
.iter()
.filter(|(key, _)| !known_fields.contains(key.as_str()))
.count();
// Check if the number of attributes exceeds the allowed limit
if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit {
tracing::error!(
"OTEL logs ingestion failed because the number of attributes ({}) exceeded the threshold of {}",
attribute_count,
PARSEABLE.options.otel_attributes_allowed_limit
);
return Err(OtelError::AttributeCountExceeded(
attribute_count,
PARSEABLE.options.otel_attributes_allowed_limit,
));
}

vec_otel_json.push(Value::Object(resource_logs_json.clone()));
}
}
vec_otel_json.into_iter().map(Value::Object).collect()

Ok(vec_otel_json)
}
Loading
Loading