Skip to content

Commit 9045f66

Browse files
remove attributes check from otel logs, traces, metrics
add check for fields count in any dataset reject event if fields count exceeds `P_DATASET_FIELDS_ALLOWED_LIMIT` default value is set to 250
1 parent 59582a6 commit 9045f66

File tree

8 files changed

+42
-111
lines changed

8 files changed

+42
-111
lines changed

src/cli.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,12 @@ pub struct Options {
371371

372372
#[arg(
373373
long,
374-
env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT",
375-
default_value = "200",
376-
value_parser = validation::validate_otel_attributes_allowed_limit,
377-
help = "allowed limit for otel attributes"
374+
env = "P_DATASET_FIELDS_ALLOWED_LIMIT",
375+
default_value = "250",
376+
value_parser = validation::validate_dataset_fields_allowed_limit,
377+
help = "allowed limit for fields count in a dataset"
378378
)]
379-
pub otel_attributes_allowed_limit: usize,
379+
pub dataset_fields_allowed_limit: usize,
380380
}
381381

382382
#[derive(Parser, Debug)]

src/handlers/http/ingest.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use crate::metadata::SchemaVersion;
3535
use crate::option::Mode;
3636
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
3737
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
38-
use crate::otel::otel_utils::OtelError;
3938
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
4039
use crate::parseable::{StreamNotFound, PARSEABLE};
4140
use crate::storage::{ObjectStorageError, StreamType};
@@ -468,8 +467,8 @@ pub enum PostError {
468467
KnownFormat(#[from] known_schema::Error),
469468
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
470469
IncorrectLogFormat(String),
471-
#[error("OtelError: {0}")]
472-
OtelError(#[from] OtelError),
470+
#[error("Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.")]
471+
FieldsLimitExceeded(String, usize, usize),
473472
}
474473

475474
impl actix_web::ResponseError for PostError {
@@ -498,7 +497,7 @@ impl actix_web::ResponseError for PostError {
498497
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
499498
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
500499
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
501-
PostError::OtelError(_) => StatusCode::BAD_REQUEST,
500+
PostError::FieldsLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
502501
}
503502
}
504503

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ pub async fn flatten_and_push_logs(
5454
log_source: &LogSource,
5555
p_custom_fields: &HashMap<String, String>,
5656
) -> Result<(), PostError> {
57+
// fetch the storage schema for the stream
58+
let schema = PARSEABLE.get_stream(stream_name)?.get_schema();
59+
//fetch the fields count from the schema
60+
let fields_count = schema.fields().len();
61+
if fields_count > PARSEABLE.options.dataset_fields_allowed_limit {
62+
tracing::error!(
63+
"Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset.",
64+
stream_name,
65+
fields_count,
66+
PARSEABLE.options.dataset_fields_allowed_limit);
67+
// Return an error if the fields count exceeds the limit
68+
return Err(PostError::FieldsLimitExceeded(
69+
stream_name.to_string(),
70+
fields_count,
71+
PARSEABLE.options.dataset_fields_allowed_limit,
72+
));
73+
}
74+
5775
match log_source {
5876
LogSource::Kinesis => {
5977
//custom flattening required for Amazon Kinesis
@@ -65,24 +83,21 @@ pub async fn flatten_and_push_logs(
6583
LogSource::OtelLogs => {
6684
//custom flattening required for otel logs
6785
let logs: LogsData = serde_json::from_value(json)?;
68-
let records = flatten_otel_logs(&logs)?;
69-
for record in records {
86+
for record in flatten_otel_logs(&logs) {
7087
push_logs(stream_name, record, log_source, p_custom_fields).await?;
7188
}
7289
}
7390
LogSource::OtelTraces => {
7491
//custom flattening required for otel traces
7592
let traces: TracesData = serde_json::from_value(json)?;
76-
let records = flatten_otel_traces(&traces)?;
77-
for record in records {
93+
for record in flatten_otel_traces(&traces) {
7894
push_logs(stream_name, record, log_source, p_custom_fields).await?;
7995
}
8096
}
8197
LogSource::OtelMetrics => {
8298
//custom flattening required for otel metrics
8399
let metrics: MetricsData = serde_json::from_value(json)?;
84-
let records = flatten_otel_metrics(metrics)?;
85-
for record in records {
100+
for record in flatten_otel_metrics(metrics) {
86101
push_logs(stream_name, record, log_source, p_custom_fields).await?;
87102
}
88103
}

src/option.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ pub mod validation {
9595

9696
use super::{Compression, Mode};
9797

98-
// Maximum allowed otel attributes per event
99-
const OTEL_ATTRIBUTES_ALLOWED_LIMIT: usize = 200;
98+
// Maximum allowed count for fields in a dataset
99+
const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250;
100100

101101
pub fn file_path(s: &str) -> Result<PathBuf, String> {
102102
if s.is_empty() {
@@ -177,18 +177,18 @@ pub mod validation {
177177
}
178178
}
179179

180-
pub fn validate_otel_attributes_allowed_limit(s: &str) -> Result<usize, String> {
180+
pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
181181
if let Ok(size) = s.parse::<usize>() {
182-
if (1..=OTEL_ATTRIBUTES_ALLOWED_LIMIT).contains(&size) {
182+
if (1..=DATASET_FIELDS_ALLOWED_LIMIT).contains(&size) {
183183
Ok(size)
184184
} else {
185185
Err(format!(
186-
"Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be between 1 and {}",
187-
OTEL_ATTRIBUTES_ALLOWED_LIMIT
186+
"Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}",
187+
DATASET_FIELDS_ALLOWED_LIMIT
188188
))
189189
}
190190
} else {
191-
Err("Invalid value for P_OTEL_ATTRIBUTES_ALLOWED_LIMIT. It should be given as integer value".to_string())
191+
Err("Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be given as integer value".to_string())
192192
}
193193
}
194194
}

src/otel/logs.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,9 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
19-
use std::collections::HashSet;
20-
21-
use crate::parseable::PARSEABLE;
22-
2318
use super::otel_utils::collect_json_from_values;
2419
use super::otel_utils::convert_epoch_nano_to_timestamp;
2520
use super::otel_utils::insert_attributes;
26-
use super::otel_utils::OtelError;
2721
use opentelemetry_proto::tonic::logs::v1::LogRecord;
2822
use opentelemetry_proto::tonic::logs::v1::LogsData;
2923
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
@@ -150,10 +144,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
150144

151145
/// this function performs the custom flattening of the otel logs
152146
/// and returns a `Vec` of `Value::Object` of the flattened json
153-
pub fn flatten_otel_logs(message: &LogsData) -> Result<Vec<Value>, OtelError> {
147+
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
154148
let mut vec_otel_json = Vec::new();
155-
let known_fields: HashSet<&str> = OTEL_LOG_KNOWN_FIELD_LIST.iter().cloned().collect();
156-
157149
for record in &message.resource_logs {
158150
let mut resource_log_json = Map::new();
159151
if let Some(resource) = &record.resource {
@@ -177,26 +169,9 @@ pub fn flatten_otel_logs(message: &LogsData) -> Result<Vec<Value>, OtelError> {
177169
for resource_logs_json in &mut vec_resource_logs_json {
178170
resource_logs_json.extend(resource_log_json.clone());
179171

180-
let attribute_count = resource_logs_json
181-
.iter()
182-
.filter(|(key, _)| !known_fields.contains(key.as_str()))
183-
.count();
184-
// Check if the number of attributes exceeds the allowed limit
185-
if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit {
186-
tracing::error!(
187-
"OTEL logs ingestion failed because the number of attributes ({}) exceeded the threshold of {}",
188-
attribute_count,
189-
PARSEABLE.options.otel_attributes_allowed_limit
190-
);
191-
return Err(OtelError::AttributeCountExceeded(
192-
attribute_count,
193-
PARSEABLE.options.otel_attributes_allowed_limit,
194-
));
195-
}
196-
197172
vec_otel_json.push(Value::Object(resource_logs_json.clone()));
198173
}
199174
}
200175

201-
Ok(vec_otel_json)
176+
vec_otel_json
202177
}

src/otel/metrics.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,13 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
19-
use std::collections::HashSet;
20-
2118
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
2219
use opentelemetry_proto::tonic::metrics::v1::{
2320
exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar,
2421
ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary,
2522
};
2623
use serde_json::{Map, Value};
2724

28-
use crate::otel::otel_utils::OtelError;
29-
use crate::parseable::PARSEABLE;
30-
3125
use super::otel_utils::{
3226
convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some,
3327
};
@@ -507,10 +501,8 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec<Map<String, Value>
507501

508502
/// this function performs the custom flattening of the otel metrics
509503
/// and returns a `Vec` of `Value::Object` of the flattened json
510-
pub fn flatten_otel_metrics(message: MetricsData) -> Result<Vec<Value>, OtelError> {
504+
pub fn flatten_otel_metrics(message: MetricsData) -> Vec<Value> {
511505
let mut vec_otel_json = Vec::new();
512-
let known_fields: HashSet<&str> = OTEL_METRICS_KNOWN_FIELD_LIST.iter().cloned().collect();
513-
514506
for record in &message.resource_metrics {
515507
let mut resource_metrics_json = Map::new();
516508
if let Some(resource) = &record.resource {
@@ -564,29 +556,11 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Result<Vec<Value>, OtelErro
564556
resource_metric_json.insert(key.clone(), value.clone());
565557
}
566558

567-
let attribute_count = resource_metric_json
568-
.iter()
569-
.filter(|(key, _)| !known_fields.contains(key.as_str()))
570-
.count();
571-
572-
// Check if the number of attributes exceeds the allowed limit
573-
if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit {
574-
tracing::error!(
575-
"OTEL metrics ingestion failed because the number of attributes ({}) exceeded the threshold of {}",
576-
attribute_count,
577-
PARSEABLE.options.otel_attributes_allowed_limit
578-
);
579-
return Err(OtelError::AttributeCountExceeded(
580-
attribute_count,
581-
PARSEABLE.options.otel_attributes_allowed_limit,
582-
));
583-
}
584-
585559
vec_otel_json.push(Value::Object(resource_metric_json.clone()));
586560
}
587561
}
588562

589-
Ok(vec_otel_json)
563+
vec_otel_json
590564
}
591565

592566
/// otel metrics event has json object for aggregation temporality

src/otel/otel_utils.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use chrono::DateTime;
2020
use opentelemetry_proto::tonic::common::v1::{
2121
any_value::Value as OtelValue, AnyValue, ArrayValue, KeyValue, KeyValueList,
2222
};
23-
use serde::Serialize;
2423
use serde_json::{Map, Value};
2524

2625
// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte
@@ -194,9 +193,3 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String {
194193
let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc();
195194
dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string()
196195
}
197-
198-
#[derive(Debug, thiserror::Error, Serialize)]
199-
pub enum OtelError {
200-
#[error("Ingestion failed because the attributes count {0} exceeded the threshold of {1}")]
201-
AttributeCountExceeded(usize, usize),
202-
}

src/otel/traces.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
19-
use std::collections::HashSet;
20-
2118
use opentelemetry_proto::tonic::trace::v1::span::Event;
2219
use opentelemetry_proto::tonic::trace::v1::span::Link;
2320
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
@@ -26,9 +23,6 @@ use opentelemetry_proto::tonic::trace::v1::Status;
2623
use opentelemetry_proto::tonic::trace::v1::TracesData;
2724
use serde_json::{Map, Value};
2825

29-
use crate::otel::otel_utils::OtelError;
30-
use crate::parseable::PARSEABLE;
31-
3226
use super::otel_utils::convert_epoch_nano_to_timestamp;
3327
use super::otel_utils::insert_attributes;
3428

@@ -105,9 +99,8 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec<Map<String, Value>> {
10599

106100
/// this function performs the custom flattening of the otel traces event
107101
/// and returns a `Vec` of `Value::Object` of the flattened json
108-
pub fn flatten_otel_traces(message: &TracesData) -> Result<Vec<Value>, OtelError> {
102+
pub fn flatten_otel_traces(message: &TracesData) -> Vec<Value> {
109103
let mut vec_otel_json = Vec::new();
110-
let known_fields: HashSet<&str> = OTEL_TRACES_KNOWN_FIELD_LIST.iter().cloned().collect();
111104

112105
for record in &message.resource_spans {
113106
let mut resource_span_json = Map::new();
@@ -135,29 +128,11 @@ pub fn flatten_otel_traces(message: &TracesData) -> Result<Vec<Value>, OtelError
135128
resource_spans_json.insert(key.clone(), value.clone());
136129
}
137130

138-
let attribute_count = resource_spans_json
139-
.iter()
140-
.filter(|(key, _)| !known_fields.contains(key.as_str()))
141-
.count();
142-
143-
// Check if the number of attributes exceeds the allowed limit
144-
if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit {
145-
tracing::error!(
146-
"OTEL traces ingestion failed because the number of attributes ({}) exceeded the threshold of {}",
147-
attribute_count,
148-
PARSEABLE.options.otel_attributes_allowed_limit
149-
);
150-
return Err(OtelError::AttributeCountExceeded(
151-
attribute_count,
152-
PARSEABLE.options.otel_attributes_allowed_limit,
153-
));
154-
}
155-
156131
vec_otel_json.push(Value::Object(resource_spans_json.clone()));
157132
}
158133
}
159134

160-
Ok(vec_otel_json)
135+
vec_otel_json
161136
}
162137

163138
/// otel traces has json array of events

0 commit comments

Comments
 (0)