@@ -47,30 +47,17 @@ use crate::{
47
47
const IGNORE_HEADERS : [ & str ; 3 ] = [ STREAM_NAME_HEADER_KEY , LOG_SOURCE_KEY , EXTRACT_LOG_KEY ] ;
48
48
const MAX_CUSTOM_FIELDS : usize = 10 ;
49
49
const MAX_FIELD_VALUE_LENGTH : usize = 100 ;
50
+ // Maximum allowed count for fields in a dataset
51
+ pub const DATASET_FIELDS_ALLOWED_LIMIT : usize = 250 ;
50
52
51
53
pub async fn flatten_and_push_logs (
52
54
json : Value ,
53
55
stream_name : & str ,
54
56
log_source : & LogSource ,
55
57
p_custom_fields : & HashMap < String , String > ,
56
58
) -> Result < ( ) , PostError > {
57
- // fetch the storage schema for the stream
58
- let schema = PARSEABLE . get_stream ( stream_name) ?. get_schema ( ) ;
59
- //fetch the fields count from the schema
60
- let fields_count = schema. fields ( ) . len ( ) ;
61
- if fields_count > PARSEABLE . options . dataset_fields_allowed_limit {
62
- tracing:: error!(
63
- "Ingestion failed for dataset {0} as fields count {1} exceeds the limit {2}, Parseable recommends creating a new dataset." ,
64
- stream_name,
65
- fields_count,
66
- PARSEABLE . options. dataset_fields_allowed_limit) ;
67
- // Return an error if the fields count exceeds the limit
68
- return Err ( PostError :: FieldsLimitExceeded (
69
- stream_name. to_string ( ) ,
70
- fields_count,
71
- PARSEABLE . options . dataset_fields_allowed_limit ,
72
- ) ) ;
73
- }
59
+ // Verify the dataset fields count
60
+ verify_dataset_fields_count ( stream_name) ?;
74
61
75
62
match log_source {
76
63
LogSource :: Kinesis => {
@@ -223,6 +210,39 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
223
210
p_custom_fields
224
211
}
225
212
213
+ fn verify_dataset_fields_count ( stream_name : & str ) -> Result < ( ) , PostError > {
214
+ let fields_count = PARSEABLE
215
+ . get_stream ( stream_name) ?
216
+ . get_schema ( )
217
+ . fields ( )
218
+ . len ( ) ;
219
+ let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64 ;
220
+ // Check if the fields count exceeds the warn threshold
221
+ if fields_count > dataset_fields_warn_threshold as usize {
222
+ tracing:: warn!(
223
+ "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset." ,
224
+ dataset_fields_warn_threshold,
225
+ stream_name,
226
+ dataset_fields_warn_threshold) ;
227
+ }
228
+ // Check if the fields count exceeds the limit
229
+ // Return an error if the fields count exceeds the limit
230
+ if fields_count > PARSEABLE . options . dataset_fields_allowed_limit {
231
+ tracing:: error!(
232
+ "Ingestion has been stopped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset." ,
233
+ stream_name,
234
+ fields_count,
235
+ PARSEABLE . options. dataset_fields_allowed_limit) ;
236
+ // Return an error if the fields count exceeds the limit
237
+ return Err ( PostError :: FieldsLimitExceeded (
238
+ stream_name. to_string ( ) ,
239
+ fields_count,
240
+ PARSEABLE . options . dataset_fields_allowed_limit ,
241
+ ) ) ;
242
+ }
243
+ Ok ( ( ) )
244
+ }
245
+
226
246
#[ cfg( test) ]
227
247
mod tests {
228
248
use super :: * ;
0 commit comments