Skip to content

ref: properly type Span enrichment #92970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ module = [
"sentry.snuba.metrics.extraction",
"sentry.snuba.metrics.naming_layer.*",
"sentry.snuba.query_subscriptions.*",
"sentry.spans.consumers.process_segments.*",
"sentry.spans.grouping.*",
"sentry.stacktraces.platform",
"sentry.tasks.beacon",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/spans/consumers/process_segments/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TraceItem,
)

from sentry.spans.consumers.process_segments.types import Span
from sentry.spans.consumers.process_segments.enrichment import Span

I64_MAX = 2**63 - 1

Expand Down
239 changes: 148 additions & 91 deletions src/sentry/spans/consumers/process_segments/enrichment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from collections import defaultdict
from typing import Any, cast
from typing import Any, NotRequired

from sentry.models.project import Project
from sentry.spans.consumers.process_segments.types import MeasurementValue, Span
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import MeasurementValue, SegmentSpan

# Keys in `sentry_tags` that are shared across all spans in a segment. This list
# is taken from `extract_shared_tags` in Relay.
Expand Down Expand Up @@ -42,98 +41,100 @@
DEFAULT_SPAN_OP = "default"


def match_schemas(spans) -> None:
class Span(SegmentSpan, total=True):
"""
Creates attributes for EAP spans that are required by logic shared with the
event pipeline.

Spans in the transaction event protocol had a slightly different schema
compared to raw spans on the EAP topic. This function adds the missing
attributes to the spans to make them compatible with the event pipeline
logic.
"""

for span in spans:
sentry_tags = span.setdefault("sentry_tags", {})
span["op"] = sentry_tags.get("op") or DEFAULT_SPAN_OP


def set_shared_tags(segment: Span, spans: list[Span]) -> None:
Enriched version of the incoming span payload that has additional attributes
extracted.
"""
Extracts tags from the segment span and materializes them into all spans.
"""

# Assume that Relay has extracted the shared tags into `sentry_tags` on the
# root span. Once `sentry_tags` is removed, the logic from
# `extract_shared_tags` should be moved here.
segment_tags = segment.get("sentry_tags", {})
shared_tags = {k: v for k, v in segment_tags.items() if k in SHARED_TAG_KEYS}

is_mobile = segment_tags.get("mobile") == "true"
mobile_start_type = _get_mobile_start_type(segment)
ttid_ts = _timestamp_by_op(spans, "ui.load.initial_display")
ttfd_ts = _timestamp_by_op(spans, "ui.load.full_display")
# Added in enrichment
exclusive_time: float
exclusive_time_ms: float
op: str

for span in spans:
span_tags = cast(dict[str, Any], span["sentry_tags"])
sentry_tags: dict[str, Any] # type: ignore[misc] # XXX: fix w/ TypedDict extra_items once available

if is_mobile:
# NOTE: Like in Relay's implementation, shared tags are added at the
# very end. This does not have access to the shared tag value. We
# keep behavior consistent, although this should be revisited.
if span_tags.get("thread.name") == MOBILE_MAIN_THREAD_NAME:
span_tags["main_thread"] = "true"
if not span_tags.get("app_start_type") and mobile_start_type:
span_tags["app_start_type"] = mobile_start_type
# XXX: unclear where this comes from as it's not from enrichment!
hash: NotRequired[str]

if ttid_ts is not None and span["end_timestamp_precise"] <= ttid_ts:
span_tags["ttid"] = "ttid"
if ttfd_ts is not None and span["end_timestamp_precise"] <= ttfd_ts:
span_tags["ttfd"] = "ttfd"

for key, value in shared_tags.items():
if span_tags.get(key) is None:
span_tags[key] = value
def _get_span_op(span: SegmentSpan) -> str:
return span.get("sentry_tags", {}).get("op") or DEFAULT_SPAN_OP


def _get_mobile_start_type(segment: Span) -> str | None:
"""
Check the measurements on the span to determine what kind of start type the
event is.
def _find_segment_span(spans: list[SegmentSpan]) -> SegmentSpan | None:
"""
measurements = segment.get("measurements") or {}
Finds the segment in the span in the list that has ``is_segment`` set to
``True``.

if "app_start_cold" in measurements:
return "cold"
if "app_start_warm" in measurements:
return "warm"
At most one span in the list can be marked as segment span. If more than one
span is marked, the function does not have defined behavior.

return None
If there is no segment span, the function returns ``None``.
"""

# Iterate backwards since we usually expect the segment span to be at the end.
for span in reversed(spans):
if span.get("is_segment"):
return span

def _timestamp_by_op(spans: list[Span], op: str) -> float | None:
for span in spans:
if span["op"] == op:
return span["end_timestamp_precise"]
return None


def set_exclusive_time(spans: list[Span]) -> None:
"""
Sets the exclusive time on all spans in the list.

The exclusive time is the time spent in a span's own code. This is the sum
of all time intervals where no child span was active.
"""

span_map: dict[str, list[tuple[int, int]]] = {}
for span in spans:
if parent_span_id := span.get("parent_span_id"):
interval = _span_interval(span)
span_map.setdefault(parent_span_id, []).append(interval)

for span in spans:
intervals = span_map.get(span["span_id"], [])
class Enricher:
def __init__(self, spans: list[SegmentSpan]) -> None:
self._segment_span = _find_segment_span(spans)

self._ttid_ts = _timestamp_by_op(spans, "ui.load.initial_display")
self._ttfd_ts = _timestamp_by_op(spans, "ui.load.full_display")

self._span_map: dict[str, list[tuple[int, int]]] = {}
for span in spans:
if parent_span_id := span.get("parent_span_id"):
interval = _span_interval(span)
self._span_map.setdefault(parent_span_id, []).append(interval)

def _sentry_tags(self, span: SegmentSpan) -> dict[str, Any]:
ret = {**span.get("sentry_tags", {})}
if self._segment_span is not None:
# Assume that Relay has extracted the shared tags into `sentry_tags` on the
# root span. Once `sentry_tags` is removed, the logic from
# `extract_shared_tags` should be moved here.
segment_tags = self._segment_span.get("sentry_tags", {})
shared_tags = {k: v for k, v in segment_tags.items() if k in SHARED_TAG_KEYS}

is_mobile = segment_tags.get("mobile") == "true"
mobile_start_type = _get_mobile_start_type(self._segment_span)

if is_mobile:
# NOTE: Like in Relay's implementation, shared tags are added at the
# very end. This does not have access to the shared tag value. We
# keep behavior consistent, although this should be revisited.
if ret.get("thread.name") == MOBILE_MAIN_THREAD_NAME:
ret["main_thread"] = "true"
if not ret.get("app_start_type") and mobile_start_type:
ret["app_start_type"] = mobile_start_type

if self._ttid_ts is not None and span["end_timestamp_precise"] <= self._ttid_ts:
ret["ttid"] = "ttid"
if self._ttfd_ts is not None and span["end_timestamp_precise"] <= self._ttfd_ts:
ret["ttfd"] = "ttfd"

for key, value in shared_tags.items():
if ret.get(key) is None:
ret[key] = value

return ret

def _exclusive_time(self, span: SegmentSpan) -> float:
"""
Sets the exclusive time on all spans in the list.

The exclusive time is the time spent in a span's own code. This is the sum
of all time intervals where no child span was active.
"""

intervals = self._span_map.get(span["span_id"], [])
# Sort by start ASC, end DESC to skip over nested intervals efficiently
intervals.sort(key=lambda x: (x[0], -x[1]))

Expand All @@ -151,13 +152,65 @@ def set_exclusive_time(spans: list[Span]) -> None:
# Add any remaining time not covered by children
exclusive_time_us += max(end - start, 0)

# Note: Event protocol spans expect `exclusive_time` while EAP expects
# `exclusive_time_ms`. Both are the same value in milliseconds
span["exclusive_time"] = exclusive_time_us / 1_000
span["exclusive_time_ms"] = exclusive_time_us / 1_000
return exclusive_time_us / 1_000

def enrich_span(self, span: SegmentSpan) -> Span:
exclusive_time = self._exclusive_time(span)
return {
**span,
# Creates attributes for EAP spans that are required by logic shared with the
# event pipeline.
#
# Spans in the transaction event protocol had a slightly different schema
# compared to raw spans on the EAP topic. This function adds the missing
# attributes to the spans to make them compatible with the event pipeline
# logic.
"sentry_tags": self._sentry_tags(span),
"op": _get_span_op(span),
# Note: Event protocol spans expect `exclusive_time` while EAP expects
# `exclusive_time_ms`. Both are the same value in milliseconds
"exclusive_time": exclusive_time,
"exclusive_time_ms": exclusive_time,
}

@classmethod
def enrich_spans(cls, spans: list[SegmentSpan]) -> tuple[Span | None, list[Span]]:
inst = cls(spans)
ret = []
segment_span = None

for span in spans:
enriched = inst.enrich_span(span)
if span is inst._segment_span:
segment_span = enriched
ret.append(enriched)

return segment_span, ret


def _get_mobile_start_type(segment: SegmentSpan) -> str | None:
"""
Check the measurements on the span to determine what kind of start type the
event is.
"""
measurements = segment.get("measurements") or {}

if "app_start_cold" in measurements:
return "cold"
if "app_start_warm" in measurements:
return "warm"

return None

def _span_interval(span: Span) -> tuple[int, int]:

def _timestamp_by_op(spans: list[SegmentSpan], op: str) -> float | None:
for span in spans:
if _get_span_op(span) == op:
return span["end_timestamp_precise"]
return None


def _span_interval(span: SegmentSpan) -> tuple[int, int]:
"""Get the start and end timestamps of a span in microseconds."""
return _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"])

Expand All @@ -168,7 +221,11 @@ def _us(timestamp: float) -> int:
return int(timestamp * 1_000_000)


def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> None:
def segment_span_measurement_updates(
segment: Span,
spans: list[SegmentSpan],
breakdowns_config: dict[str, dict[str, Any]],
) -> dict[str, MeasurementValue]:
"""
Computes breakdowns from all spans and writes them to the segment span.

Expand All @@ -177,29 +234,29 @@ def compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> No
are converted into attributes on the span trace item.
"""

config = project.get_option("sentry:breakdowns")

for breakdown_name, breakdown_config in config.items():
ret = {}
for breakdown_name, breakdown_config in breakdowns_config.items():
ty = breakdown_config.get("type")

if ty == "spanOperations":
breakdowns = _compute_span_ops(spans, breakdown_config)
else:
continue

measurements = segment.setdefault("measurements", {})
for key, value in breakdowns.items():
measurements[f"{breakdown_name}.{key}"] = value
ret[f"{breakdown_name}.{key}"] = value

return ret


def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, MeasurementValue]:
def _compute_span_ops(spans: list[SegmentSpan], config: Any) -> dict[str, MeasurementValue]:
matches = config.get("matches")
if not matches:
return {}

intervals_by_op = defaultdict(list)
for span in spans:
op = span.get("sentry_tags", {}).get("op", "")
op = _get_span_op(span)
if operation_name := next(filter(lambda m: op.startswith(m), matches), None):
intervals_by_op[operation_name].append(_span_interval(span))

Expand Down
6 changes: 3 additions & 3 deletions src/sentry/spans/consumers/process_segments/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.process_segments.convert import convert_span_to_item
from sentry.spans.consumers.process_segments.enrichment import Span
from sentry.spans.consumers.process_segments.message import process_segment
from sentry.spans.consumers.process_segments.types import Span
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -98,7 +98,7 @@ def create_with_partitions(
output_block_size=self.output_block_size,
)

def shutdown(self):
def shutdown(self) -> None:
self.pool.close()


Expand Down Expand Up @@ -133,5 +133,5 @@ def _serialize_payload(span: Span, timestamp: datetime | None) -> Value[KafkaPay
)


def _unfold_segment(spans: list[Value[KafkaPayload]]):
def _unfold_segment(spans: list[Value[KafkaPayload]]) -> list[Value[KafkaPayload]]:
return [span for span in spans if span is not None]
Loading
Loading