-
-
Notifications
You must be signed in to change notification settings - Fork 145
reject event if fields count exceed 250 #1311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
0eefe77
038df7c
a11790c
59582a6
9045f66
df59fde
e46ee5f
cb1d328
5ff122a
566ea15
1636d31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -368,6 +368,15 @@ pub struct Options { | |||||
|
||||||
#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] | ||||||
pub ms_clarity_tag: Option<String>, | ||||||
|
||||||
#[arg( | ||||||
long, | ||||||
env = "P_DATASET_FIELDS_ALLOWED_LIMIT", | ||||||
default_value = "250", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make |
||||||
value_parser = validation::validate_dataset_fields_allowed_limit, | ||||||
help = "allowed limit for fields count in a dataset" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
)] | ||||||
pub dataset_fields_allowed_limit: usize, | ||||||
} | ||||||
|
||||||
#[derive(Parser, Debug)] | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -467,6 +467,8 @@ pub enum PostError { | |||||
KnownFormat(#[from] known_schema::Error), | ||||||
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] | ||||||
IncorrectLogFormat(String), | ||||||
#[error("Ingestion has been stoppped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.")] | ||||||
nikhilsinhaparseable marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
FieldsLimitExceeded(String, usize, usize), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
impl actix_web::ResponseError for PostError { | ||||||
|
@@ -495,6 +497,7 @@ impl actix_web::ResponseError for PostError { | |||||
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, | ||||||
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, | ||||||
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, | ||||||
PostError::FieldsLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -47,13 +47,18 @@ use crate::{ | |||||||||
const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; | ||||||||||
const MAX_CUSTOM_FIELDS: usize = 10; | ||||||||||
const MAX_FIELD_VALUE_LENGTH: usize = 100; | ||||||||||
// Maximum allowed count for fields in a dataset | ||||||||||
pub const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; | ||||||||||
|
||||||||||
pub async fn flatten_and_push_logs( | ||||||||||
json: Value, | ||||||||||
stream_name: &str, | ||||||||||
log_source: &LogSource, | ||||||||||
p_custom_fields: &HashMap<String, String>, | ||||||||||
) -> Result<(), PostError> { | ||||||||||
// Verify the dataset fields count | ||||||||||
verify_dataset_fields_count(stream_name)?; | ||||||||||
|
||||||||||
match log_source { | ||||||||||
LogSource::Kinesis => { | ||||||||||
//custom flattening required for Amazon Kinesis | ||||||||||
|
@@ -205,6 +210,39 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin | |||||||||
p_custom_fields | ||||||||||
} | ||||||||||
|
||||||||||
fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { | ||||||||||
let fields_count = PARSEABLE | ||||||||||
.get_stream(stream_name)? | ||||||||||
.get_schema() | ||||||||||
.fields() | ||||||||||
.len(); | ||||||||||
let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64; | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
// Check if the fields count exceeds the warn threshold | ||||||||||
if fields_count > dataset_fields_warn_threshold as usize { | ||||||||||
tracing::warn!( | ||||||||||
"Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset.", | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
dataset_fields_warn_threshold, | ||||||||||
stream_name, | ||||||||||
dataset_fields_warn_threshold); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
// Check if the fields count exceeds the limit | ||||||||||
// Return an error if the fields count exceeds the limit | ||||||||||
if fields_count > PARSEABLE.options.dataset_fields_allowed_limit { | ||||||||||
tracing::error!( | ||||||||||
"Ingestion has been stopped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.", | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make this a string and use the same string that is put in the Error definition. |
||||||||||
stream_name, | ||||||||||
fields_count, | ||||||||||
PARSEABLE.options.dataset_fields_allowed_limit); | ||||||||||
// Return an error if the fields count exceeds the limit | ||||||||||
return Err(PostError::FieldsLimitExceeded( | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
stream_name.to_string(), | ||||||||||
fields_count, | ||||||||||
PARSEABLE.options.dataset_fields_allowed_limit, | ||||||||||
)); | ||||||||||
} | ||||||||||
Ok(()) | ||||||||||
} | ||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
#[cfg(test)] | ||||||||||
mod tests { | ||||||||||
use super::*; | ||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -91,6 +91,7 @@ pub mod validation { | |||||
path::{Path, PathBuf}, | ||||||
}; | ||||||
|
||||||
use crate::handlers::http::modal::utils::ingest_utils::DATASET_FIELDS_ALLOWED_LIMIT; | ||||||
use path_clean::PathClean; | ||||||
|
||||||
use super::{Compression, Mode}; | ||||||
|
@@ -173,4 +174,19 @@ pub mod validation { | |||||
Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string()) | ||||||
} | ||||||
} | ||||||
|
||||||
pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> { | ||||||
if let Ok(size) = s.parse::<usize>() { | ||||||
if (1..=DATASET_FIELDS_ALLOWED_LIMIT).contains(&size) { | ||||||
Ok(size) | ||||||
} else { | ||||||
Err(format!( | ||||||
"Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
DATASET_FIELDS_ALLOWED_LIMIT | ||||||
)) | ||||||
} | ||||||
} else { | ||||||
Err("Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be given as integer value".to_string()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,27 +15,33 @@ | |
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
* | ||
*/ | ||
|
||
use super::otel_utils::collect_json_from_values; | ||
use super::otel_utils::convert_epoch_nano_to_timestamp; | ||
use super::otel_utils::insert_attributes; | ||
use opentelemetry_proto::tonic::logs::v1::LogRecord; | ||
use opentelemetry_proto::tonic::logs::v1::LogsData; | ||
use opentelemetry_proto::tonic::logs::v1::ScopeLogs; | ||
use opentelemetry_proto::tonic::logs::v1::SeverityNumber; | ||
use serde_json::Map; | ||
use serde_json::Value; | ||
|
||
use super::otel_utils::add_other_attributes_if_not_empty; | ||
use super::otel_utils::collect_json_from_values; | ||
use super::otel_utils::convert_epoch_nano_to_timestamp; | ||
use super::otel_utils::insert_attributes; | ||
use super::otel_utils::merge_attributes_in_json; | ||
|
||
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ | ||
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the use of this list now? We're defaulting to separate columns right There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this list was initially maintained to store the known field list along with the known log format name in the stream info, with an idea that UI can use the fields list to apply quick filters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. identifying useful columns is difficult. chances of getting it wrong are high. In UX we're working on a way that lets user decide what is important for them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can create an issue and work on it in a separate PR, similar change needs to be done at the place when we detect schema and add fields to stream_info, other than otel |
||
"scope_name", | ||
"scope_version", | ||
"scope_log_schema_url", | ||
"scope_dropped_attributes_count", | ||
"resource_dropped_attributes_count", | ||
"schema_url", | ||
"time_unix_nano", | ||
"observed_time_unix_nano", | ||
"severity_number", | ||
"severity_text", | ||
"body", | ||
"flags", | ||
"log_record_dropped_attributes_count", | ||
"span_id", | ||
"trace_id", | ||
"event_name", | ||
]; | ||
/// otel log event has severity number | ||
/// there is a mapping of severity number to severity text provided in proto | ||
|
@@ -60,7 +66,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> { | |
/// this function is called recursively for each log record object in the otel logs | ||
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | ||
let mut log_record_json: Map<String, Value> = Map::new(); | ||
let mut other_attributes = Map::new(); | ||
log_record_json.insert( | ||
"time_unix_nano".to_string(), | ||
Value::String(convert_epoch_nano_to_timestamp( | ||
|
@@ -83,11 +88,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
log_record_json.insert(key.to_owned(), body_json[key].to_owned()); | ||
} | ||
} | ||
insert_attributes( | ||
&mut log_record_json, | ||
&log_record.attributes, | ||
&mut other_attributes, | ||
); | ||
insert_attributes(&mut log_record_json, &log_record.attributes); | ||
log_record_json.insert( | ||
"log_record_dropped_attributes_count".to_string(), | ||
Value::Number(log_record.dropped_attributes_count.into()), | ||
|
@@ -106,9 +107,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
Value::String(hex::encode(&log_record.trace_id)), | ||
); | ||
|
||
// Add the `other_attributes` to the log record json | ||
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes); | ||
|
||
log_record_json | ||
} | ||
|
||
|
@@ -117,18 +115,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> { | ||
let mut vec_scope_log_json = Vec::new(); | ||
let mut scope_log_json = Map::new(); | ||
let mut other_attributes = Map::new(); | ||
if let Some(scope) = &scope_log.scope { | ||
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); | ||
scope_log_json.insert( | ||
"scope_version".to_string(), | ||
Value::String(scope.version.clone()), | ||
); | ||
insert_attributes( | ||
&mut scope_log_json, | ||
&scope.attributes, | ||
&mut other_attributes, | ||
); | ||
insert_attributes(&mut scope_log_json, &scope.attributes); | ||
scope_log_json.insert( | ||
"scope_dropped_attributes_count".to_string(), | ||
Value::Number(scope.dropped_attributes_count.into()), | ||
|
@@ -146,26 +139,17 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> { | |
vec_scope_log_json.push(combined_json); | ||
} | ||
|
||
// Add the `other_attributes` to the scope log json | ||
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json); | ||
|
||
vec_scope_log_json | ||
} | ||
|
||
/// this function performs the custom flattening of the otel logs | ||
/// and returns a `Vec` of `Value::Object` of the flattened json | ||
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> { | ||
let mut vec_otel_json = Vec::new(); | ||
|
||
for record in &message.resource_logs { | ||
let mut resource_log_json = Map::new(); | ||
let mut other_attributes = Map::new(); | ||
if let Some(resource) = &record.resource { | ||
insert_attributes( | ||
&mut resource_log_json, | ||
&resource.attributes, | ||
&mut other_attributes, | ||
); | ||
insert_attributes(&mut resource_log_json, &resource.attributes); | ||
resource_log_json.insert( | ||
"resource_dropped_attributes_count".to_string(), | ||
Value::Number(resource.dropped_attributes_count.into()), | ||
|
@@ -176,19 +160,18 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> { | |
for scope_log in &record.scope_logs { | ||
vec_resource_logs_json.extend(flatten_scope_log(scope_log)); | ||
} | ||
|
||
resource_log_json.insert( | ||
"schema_url".to_string(), | ||
Value::String(record.schema_url.clone()), | ||
); | ||
|
||
for resource_logs_json in &mut vec_resource_logs_json { | ||
resource_logs_json.extend(resource_log_json.clone()); | ||
} | ||
|
||
// Add the `other_attributes` to the resource log json | ||
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json); | ||
|
||
vec_otel_json.extend(vec_resource_logs_json); | ||
vec_otel_json.push(Value::Object(resource_logs_json.clone())); | ||
} | ||
} | ||
vec_otel_json.into_iter().map(Value::Object).collect() | ||
|
||
vec_otel_json | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.