Skip to content

runtime/core: refactor streaming reporting #1948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions runtimes/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
rttrace = []

[dependencies]
pingora = { version = "0.4", features = [ "lb", "openssl" ] }
pingora = { version = "0.4", features = ["lb", "openssl"] }
anyhow = "1.0.76"
base64 = "0.21.5"
gjson = "0.8.1"
Expand All @@ -17,6 +17,7 @@ prost-types = "0.12.3"
serde = "1.0.193"
serde_json = { version = "1.0.108", features = ["raw_value"] }
tokio = { version = "1.35.1", features = ["sync"] }
tokio-stream = "0.1.17"
tokio-nsq = "0.14.0"
xid = "1.0.3"
log = { version = "0.4.20", features = ["kv_unstable", "kv_unstable_serde"] }
Expand All @@ -31,7 +32,7 @@ tokio-postgres = { version = "0.7.10", features = [
] }
tokio-util = "0.7.10"
tokio-tungstenite = "0.21.0"
futures-util = "0.3.30"
futures-util = "0.3.31"
rand = "0.8.5"
env_logger = "0.10.1"
google-cloud-pubsub = "0.22.1"
Expand Down Expand Up @@ -86,12 +87,30 @@ tower-http = { version = "0.5.2", features = ["fs"] }
google-cloud-storage = "0.22.1"
serde_path_to_error = "0.1.16"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["alloc", "ansi", "env-filter", "fmt", "matchers", "nu-ansi-term", "once_cell", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing"], default-features = false }
tracing-subscriber = { version = "0.3.18", features = [
"alloc",
"ansi",
"env-filter",
"fmt",
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"registry",
"sharded-slab",
"smallvec",
"std",
"thread_local",
"tracing",
], default-features = false }
thiserror = "1.0.64"
async-stream = "0.3.6"
md5 = "0.7.0"
aws-sdk-s3 = "1.58.0"
aws-smithy-types = { version = "1.2.8", features = ["byte-stream-poll-next", "rt-tokio"] }
aws-smithy-types = { version = "1.2.8", features = [
"byte-stream-poll-next",
"rt-tokio",
] }
percent-encoding = "2.3.1"
aws-credential-types = "1.2.1"
regex = "1.11.1"
Expand Down
250 changes: 91 additions & 159 deletions runtimes/core/src/trace/log.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::api::reqauth::platform;
use bytes::Bytes;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::model;
use crate::trace::eventbuf::signed_to_unsigned_i64;
Expand Down Expand Up @@ -58,11 +56,8 @@ pub(super) struct TraceEvent {

impl Reporter {
pub async fn start_reporting(self) {
let mut inner = Box::new(InnerTraceEventStream {
rx: self.rx,
anchor: self.anchor.clone(),
current: None,
});
let mut rx = self.rx;
let anchor = self.anchor.clone();
let trace_time_anchor = self.anchor.trace_header();

let trace_headers = {
Expand Down Expand Up @@ -94,151 +89,97 @@ impl Reporter {
};

loop {
// Wait for at least one entry on rx before we open an HTTP request.
{
let Some(event) = inner.rx.recv().await else {
// The stream is closed. This only happens if all senders have been dropped,
// which should never happen in regular use.
return;
};
inner.current = Some(StreamingTraceEvent {
event,
next: EventStreamState::Header,
});
};

// Construct the body stream.
let mut no_data_ticker = tokio::time::interval(std::time::Duration::from_millis(1000));
no_data_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let stream = TraceEventStream {
inner: inner.as_mut() as *mut InnerTraceEventStream,
num_events_this_tick: 1,
no_data_ticker,
};

let req = self
.http_client
.post(self.config.trace_endpoint.clone())
.headers(trace_headers.clone())
.build();
let mut req = match req {
Ok(req) => req,
Err(err) => {
log::error!("failed to build trace request: {:?}", err);
continue;
}
};

if let Err(err) = self
.config
.platform_validator
.sign_outgoing_request(&mut req)
{
log::error!("failed to sign trace request: {:?}", err);
continue;
}

*req.body_mut() = Some(reqwest::Body::wrap_stream(stream));

let result = self.http_client.execute(req).await;
match result {
Ok(resp) if !resp.status().is_success() => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| String::new());
log::error!("failed to send trace: HTTP {}: {}", status, body);
}
Err(err) => {
log::error!("failed to send trace: {}", err);
}
_ => {}
}
}
}
}

struct TraceEventStream {
inner: *mut InnerTraceEventStream,

// The number of events received since the last tick.
num_events_this_tick: usize,

/// Ticks to detect when there is no data to close the stream.
no_data_ticker: tokio::time::Interval,
}

// Safety: the TraceEventStream only contains `poll_next` which requires a mutable reference
// to self. Therefore it is never called concurrently. The lifetime of inner is guaranteed
// to exceed the lifetime of the stream.
unsafe impl Send for TraceEventStream {}
unsafe impl Sync for TraceEventStream {}

struct InnerTraceEventStream {
rx: tokio::sync::mpsc::UnboundedReceiver<TraceEvent>,
anchor: TimeAnchor,

/// Current item received from rx and being streamed.
current: Option<StreamingTraceEvent>,
}

impl futures_core::stream::Stream for TraceEventStream {
type Item = Result<Bytes, Infallible>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Safety: the inner pointer is boxed and never moved, and is kept alive
// by the start_reporting method for the lifetime of the stream.
let inner = unsafe { &mut *self.inner };

{
// If we have a current item, return it.
if inner.current.is_some() {
let next = inner.current.as_ref().unwrap().next;
return match next {
EventStreamState::Header => {
inner.current.as_mut().unwrap().next = EventStreamState::Data;
Poll::Ready(Some(Ok(inner
.current
.as_ref()
.unwrap()
.header(&inner.anchor))))
let mut body_sender = None;

let timeout_duration = std::time::Duration::from_millis(1000);
let timeout_future = Box::pin(tokio::time::sleep(timeout_duration));
let mut no_data_timeout = timeout_future;

loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(event) => {
// Wait for at least one entry on rx before we open a HTTP request.
if body_sender.is_none() {
// Create a channel for the streaming body
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<bytes::Bytes, std::convert::Infallible>>();
let body = reqwest::Body::wrap_stream(UnboundedReceiverStream::new(rx));

let mut req = self.http_client
.post(self.config.trace_endpoint.clone())
.headers(trace_headers.clone())
.body(body)
.build()
.unwrap_or_else(|err| {
log::error!("failed to build trace request: {:?}", err);
panic!("Failed to build trace request");
});

if let Err(err) = self.config.platform_validator.sign_outgoing_request(&mut req) {
log::error!("failed to sign trace request: {:?}", err);
continue;
}

// Start the request
let request = self.http_client.execute(req);
tokio::spawn(async move {
match request.await {
Ok(resp) if !resp.status().is_success() => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| String::new());
log::error!("failed to send trace: HTTP {}: {}", status, body);
}
Err(err) => {
log::error!("failed to send trace: {}", err);
}
_ => {}
}
});

body_sender = Some(tx);
}

// Add the event to the stream
if let Some(sender) = &body_sender {
let streaming_event = StreamingTraceEvent {
event,
};

// Send header
let header = streaming_event.header(&anchor);
if let Err(err) = sender.send(Ok(header)) {
log::error!("failed to send trace header: {:?}", err);
break;
}

// Send data
let data = streaming_event.event.data.clone();
if let Err(err) = sender.send(Ok(data)) {
log::error!("failed to send trace data: {:?}", err);
break;
}
}

// Reset the timeout
no_data_timeout = Box::pin(tokio::time::sleep(timeout_duration));
}
None => {
// The stream is closed. This only happens if all senders have been dropped,
// which should never happen in regular use.
return;
}
}
}
EventStreamState::Data => {
let data = inner.current.as_ref().unwrap().event.data.clone(); // cheap clone
inner.current = None;
Poll::Ready(Some(Ok(data)))
_ = &mut no_data_timeout => {
// Timeout reached with no new events
if let Some(sender) = body_sender {
// Close the stream and wait for a new event
drop(sender);
}
break;
}
};
}
}

// Check if the no-data-ticker is ready.
match self.no_data_ticker.poll_tick(cx) {
Poll::Ready(_) => {
// If we have received no events since the last tick, close the stream.
if self.num_events_this_tick == 0 {
return Poll::Ready(None);
}
self.num_events_this_tick = 0;

// Call the ticker again to schedule a wake-up for the next tick.
_ = self.no_data_ticker.poll_tick(cx);
}
Poll::Pending => {}
}

// If we have no current item, poll the receiver for a new trace event.
{
match inner.rx.poll_recv(cx) {
Poll::Ready(Some(event)) => {
self.num_events_this_tick += 1;
inner.current = Some(StreamingTraceEvent {
event,
next: EventStreamState::Header,
});
self.poll_next(cx)
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
Expand All @@ -249,8 +190,6 @@ impl futures_core::stream::Stream for TraceEventStream {
struct StreamingTraceEvent {
/// The event itself.
event: TraceEvent,
/// The next part of the event to be sent.
next: EventStreamState,
}

impl StreamingTraceEvent {
Expand Down Expand Up @@ -324,10 +263,3 @@ impl StreamingTraceEvent {
])
}
}

/// Represents the piece of data to be sent next.
#[derive(Debug, Clone, Copy)]
enum EventStreamState {
Header,
Data,
}
Loading