Skip to content

feat: allow otel ingestion with /v1/logs endpoint #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
Ok(())
}

// Handler for POST /v1/logs to ingest OTEL logs
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
{
let log_source: String = log_source.to_str().unwrap().to_owned();
if log_source == LOG_SOURCE_OTEL {
let mut json = otel::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
log::warn!("Unknown log source: {}", log_source);
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::CustomError(
"log source key header is missing".to_string(),
));
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Ok(HttpResponse::Ok().finish())
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
Expand All @@ -116,7 +153,9 @@ async fn flatten_and_push_logs(
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => json = otel::flatten_otel_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
Expand Down
22 changes: 12 additions & 10 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,18 @@ impl ParseableServer for IngestServer {
impl IngestServer {
// configure the api routes
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
config.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory()),
);
config
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
}

fn analytics_factory() -> Scope {
Expand Down
12 changes: 12 additions & 0 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Server {
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
}

Expand Down Expand Up @@ -347,6 +348,17 @@ impl Server {
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
}

// /v1/logs endpoint to be used for OTEL log ingestion only
pub fn get_ingest_otel_factory() -> Resource {
web::resource("/v1/logs")
.route(
web::post()
.to(ingest::ingest_otel_logs)
.authorize_for_stream(Action::Ingest),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
}

// get the oauth webscope
pub fn get_oauth_webscope(oidc_client: Option<OpenIdClient>) -> Scope {
let oauth = web::scope("/o")
Expand Down
113 changes: 80 additions & 33 deletions server/src/handlers/http/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ fn collect_json_from_any_value(
if value.array_val.is_some() {
let array_val = value.array_val.as_ref().unwrap();
let values = &array_val.values;

for value in values {
let value = &value.value;
value_json = collect_json_from_any_value(key, value.clone());
let array_value_json = collect_json_from_any_value(key, value.clone());
for key in array_value_json.keys() {
value_json.insert(
format!(
"{}_{}",
key.to_owned(),
value_to_string(array_value_json[key].to_owned())
),
array_value_json[key].to_owned(),
);
}
}
}

Expand All @@ -69,7 +77,22 @@ fn collect_json_from_any_value(
let kv_list_val = value.kv_list_val.unwrap();
for key_value in kv_list_val.values {
let value = key_value.value;
value_json = collect_json_from_values(&value, key);
if value.is_some() {
let value = value.unwrap();
let key_value_json = collect_json_from_any_value(key, value);

for key in key_value_json.keys() {
value_json.insert(
format!(
"{}_{}_{}",
key.to_owned(),
key_value.key,
value_to_string(key_value_json[key].to_owned())
),
key_value_json[key].to_owned(),
);
}
}
}
}
if value.bytes_val.is_some() {
Expand All @@ -96,6 +119,14 @@ fn collect_json_from_values(
value_json
}

fn value_to_string(value: serde_json::Value) -> String {
match value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
}
}

pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
let body_str = std::str::from_utf8(body).unwrap();
Expand All @@ -117,27 +148,33 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}
}
if resource.dropped_attributes_count > 0 {
if resource.dropped_attributes_count.is_some() {
otel_json.insert(
"resource_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(resource.dropped_attributes_count)),
Value::Number(serde_json::Number::from(
resource.dropped_attributes_count.unwrap(),
)),
);
}
}

for scope_logs in record.scope_logs.iter() {
for scope_log in scope_logs.iter() {
for instrumentation_scope in scope_log.scope.iter() {
if !instrumentation_scope.name.is_empty() {
if instrumentation_scope.name.is_some() {
otel_json.insert(
"instrumentation_scope_name".to_string(),
Value::String(instrumentation_scope.name.to_string()),
Value::String(
instrumentation_scope.name.as_ref().unwrap().to_string(),
),
);
}
if !instrumentation_scope.version.is_empty() {
if instrumentation_scope.version.is_some() {
otel_json.insert(
"instrumentation_scope_version".to_string(),
Value::String(instrumentation_scope.version.to_string()),
Value::String(
instrumentation_scope.version.as_ref().unwrap().to_string(),
),
);
}
let attributes = &instrumentation_scope.attributes;
Expand All @@ -154,37 +191,45 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}
}
if instrumentation_scope.dropped_attributes_count > 0 {
if instrumentation_scope.dropped_attributes_count.is_some() {
otel_json.insert(
"instrumentation_scope_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(
instrumentation_scope.dropped_attributes_count,
instrumentation_scope.dropped_attributes_count.unwrap(),
)),
);
}
}

for log_record in scope_log.log_records.iter() {
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
if !log_record.time_unix_nano > 0 {
if log_record.time_unix_nano.is_some() {
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(log_record.time_unix_nano.to_string()),
Value::String(
log_record.time_unix_nano.as_ref().unwrap().to_string(),
),
);
}
if !log_record.observed_time_unix_nano > 0 {
if log_record.observed_time_unix_nano.is_some() {
log_record_json.insert(
"observed_time_unix_nano".to_string(),
Value::String(log_record.observed_time_unix_nano.to_string()),
Value::String(
log_record
.observed_time_unix_nano
.as_ref()
.unwrap()
.to_string(),
),
);
}
if log_record.severity_number > 0 {
let severity_number: i32 = log_record.severity_number;
if log_record.severity_number.is_some() {
let severity_number: i32 = log_record.severity_number.unwrap();
log_record_json.insert(
"severity_number".to_string(),
Value::Number(serde_json::Number::from(severity_number)),
);
if log_record.severity_text.is_empty() {
if log_record.severity_text.is_none() {
log_record_json.insert(
"severity_text".to_string(),
Value::String(
Expand All @@ -193,10 +238,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
);
}
}
if !log_record.severity_text.is_empty() {
if log_record.severity_text.is_some() {
log_record_json.insert(
"severity_text".to_string(),
Value::String(log_record.severity_text.to_string()),
Value::String(
log_record.severity_text.as_ref().unwrap().to_string(),
),
);
}

Expand All @@ -221,17 +268,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}

if log_record.dropped_attributes_count > 0 {
if log_record.dropped_attributes_count.is_some() {
log_record_json.insert(
"log_record_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(
log_record.dropped_attributes_count,
log_record.dropped_attributes_count.unwrap(),
)),
);
}

if log_record.flags > 0 {
let flags: u32 = log_record.flags;
if log_record.flags.is_some() {
let flags: u32 = log_record.flags.unwrap();
log_record_json.insert(
"flags_number".to_string(),
Value::Number(serde_json::Number::from(flags)),
Expand All @@ -242,17 +289,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
);
}

if !log_record.span_id.is_empty() {
if log_record.span_id.is_some() {
log_record_json.insert(
"span_id".to_string(),
Value::String(log_record.span_id.to_string()),
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
);
}

if !log_record.trace_id.is_empty() {
if log_record.trace_id.is_some() {
log_record_json.insert(
"trace_id".to_string(),
Value::String(log_record.trace_id.to_string()),
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
);
}
for key in log_record_json.keys() {
Expand All @@ -261,18 +308,18 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
vec_otel_json.push(otel_json.clone());
}

if !scope_log.schema_url.is_empty() {
if scope_log.schema_url.is_some() {
otel_json.insert(
"scope_log_schema_url".to_string(),
Value::String(scope_log.schema_url.to_string()),
Value::String(scope_log.schema_url.as_ref().unwrap().to_string()),
);
}
}
}
if !record.schema_url.is_empty() {
if record.schema_url.is_some() {
otel_json.insert(
"resource_schema_url".to_string(),
Value::String(record.schema_url.to_string()),
Value::String(record.schema_url.as_ref().unwrap().to_string()),
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/// since oneof in AnyValue does not allow repeated fields.
pub struct ArrayValue {
/// Array of values. The array may be empty (contain 0 elements).
pub values: Vec<AnyValue>,
pub values: Vec<Value>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -83,13 +83,13 @@
/// such as the fully qualified name and version.
pub struct InstrumentationScope {
/// An empty instrumentation scope name means the name is unknown.
pub name: String,
pub version: String,
pub name: Option<String>,
pub version: Option<String>,
/// Additional attributes that describe the scope. \[Optional\].
/// Attribute keys MUST be unique (it is not allowed to have more than one
/// attribute with the same key).
pub attributes: Option<Vec<KeyValue>>,
#[serde(rename = "droppedAttributesCount")]
pub dropped_attributes_count: u32,
pub dropped_attributes_count: Option<u32>,
}

Loading
Loading