From 6dadbd5b84c42b0dd27c2c107eb21608d6fbacad Mon Sep 17 00:00:00 2001 From: quinna-h Date: Tue, 25 Feb 2025 18:07:43 -0500 Subject: [PATCH] wip --- ddtrace/_trace/span.py | 8 +- ddtrace/_trace/tracer.py | 7 - ddtrace/contrib/internal/graphql/patch.py | 1 - ddtrace/internal/_encoding.pyi | 1 + ddtrace/internal/_encoding.pyx | 302 ++++++++++++++++++++++ ddtrace/internal/encoding.py | 4 +- ddtrace/internal/writer/writer.py | 19 +- ddtrace/internal/writer/writer_client.py | 13 + 8 files changed, 329 insertions(+), 26 deletions(-) diff --git a/ddtrace/_trace/span.py b/ddtrace/_trace/span.py index ca820085bdf..4ca5cb34a27 100644 --- a/ddtrace/_trace/span.py +++ b/ddtrace/_trace/span.py @@ -129,7 +129,6 @@ class Span(object): "_links", "_events", "__weakref__", - "_span_event_formatting", ] def __init__( @@ -146,7 +145,6 @@ def __init__( on_finish: Optional[List[Callable[["Span"], None]]] = None, span_api: str = SPAN_API_DATADOG, links: Optional[List[SpanLink]] = None, - span_event_formatting: Optional[bool] = False, ) -> None: """ Create a new span. Call `finish` once the traced operation is over. @@ -187,7 +185,6 @@ def __init__( self._resource = [resource or name] self.span_type = span_type self._span_api = span_api - self._span_event_formatting = span_event_formatting self._meta: _MetaDictType = {} self.error = 0 @@ -490,10 +487,7 @@ def _add_event( self, name: str, attributes: Optional[Dict[str, _JSONType]] = None, timestamp: Optional[int] = None ) -> None: """Add an event to the span.""" - if self._span_event_formatting is False: - # backwards compatibility: v0.5 and agent versions older than 7.63.0 - self._events.append(SpanEvent(name, attributes, timestamp)) - # TODO: add top-level span events for v0.4 and agent versions greater than or equal 7.63.0 + self._events.append(SpanEvent(name, attributes, timestamp)) def get_metrics(self) -> _MetricDictType: """Return all metrics.""" diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index d808b9a8e27..07aad1d3c4d 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -373,11 +373,6 @@ def current_trace_context(self, *args, **kwargs) -> Optional[Context]: return active.context return None - def top_level_span_events_enabled(self) -> bool: - # assert correct writer type - assert isinstance(self._writer, AgentWriter) - return self._writer._agent_top_level_span_events_support - def get_log_correlation_context(self, active: Optional[Union[Context, Span]] = None) -> Dict[str, str]: """Retrieves the data used to correlate a log with the current active trace. Generates a dictionary for custom logging instrumentation including the trace id and @@ -758,7 +753,6 @@ def _start_span( service = config.service_mapping.get(service, service) links = context._span_links if not parent else [] - span_event_support = self.top_level_span_events_enabled() if trace_id or links or context._baggage: # child_of a non-empty context, so either a local child span or from a remote context span = Span( @@ -794,7 +788,6 @@ def _start_span( span_type=span_type, span_api=span_api, on_finish=[self._on_span_finish], - span_event_formatting=span_event_support, ) if config._report_hostname: span.set_tag_str(_HOSTNAME_KEY, hostname.get_hostname()) diff --git a/ddtrace/contrib/internal/graphql/patch.py b/ddtrace/contrib/internal/graphql/patch.py index 29da3389596..ce4be9ec771 100644 --- a/ddtrace/contrib/internal/graphql/patch.py +++ b/ddtrace/contrib/internal/graphql/patch.py @@ -358,7 +358,6 @@ def _set_span_errors(errors: List[GraphQLError], span: Span) -> None: if extensions: for key in extensions: attributes[f"extensions.{key}"] = extensions[key] - # breakpoint() span._add_event( name="dd.graphql.query.error", attributes=attributes, diff --git a/ddtrace/internal/_encoding.pyi b/ddtrace/internal/_encoding.pyi index b9810a8ca4a..54a3c15590e 100644 --- a/ddtrace/internal/_encoding.pyi +++ b/ddtrace/internal/_encoding.pyi @@ -38,5 +38,6 @@ class MsgpackEncoderBase(BufferedEncoder): class MsgpackEncoderV04(MsgpackEncoderBase): ... class MsgpackEncoderV05(MsgpackEncoderBase): ... +class MsgpackEncoderV041(MsgpackEncoderBase): ... def packb(o: Any, **kwargs) -> bytes: ... diff --git a/ddtrace/internal/_encoding.pyx b/ddtrace/internal/_encoding.pyx index 8db5bad7544..17a2fac52a8 100644 --- a/ddtrace/internal/_encoding.pyx +++ b/ddtrace/internal/_encoding.pyx @@ -817,6 +817,308 @@ cdef class MsgpackEncoderV04(MsgpackEncoderBase): return ret +cdef class MsgpackEncoderV041(MsgpackEncoderBase): + cpdef flush(self): + with self._lock: + try: + return self.get_bytes(), len(self) + finally: + self._reset_buffer() + + cdef void * get_dd_origin_ref(self, str dd_origin): + return string_to_buff(dd_origin) + + cdef inline int _pack_span_events(self, list span_events): + ret = msgpack_pack_array(&self.pk, len(span_events)) + if ret != 0: + return ret + + for event in span_events: + # get dict representation of span event + d = dict(event) + ret = msgpack_pack_map(&self.pk, len(d)) + if ret != 0: + return ret + + for k, v in d.items(): + # pack the name of a span event + ret = pack_text(&self.pk, k) + if ret != 0: + return ret + if isinstance(v, (int, float)): + ret = pack_number(&self.pk, v) + elif isinstance(v, str): + ret = pack_text(&self.pk, v) + elif k == "attributes": + # span links can contain attributes, this is analougous to span tags + # attributes are serialized as a nested dict with string keys and values + attributes = v.items() + ret = msgpack_pack_map(&self.pk, len(attributes)) + for attr_k, attr_v in attributes: + ret = pack_text(&self.pk, attr_k) + if ret != 0: + return ret + ret = pack_text(&self.pk, attr_v) + if ret != 0: + return ret + else: + raise ValueError(f"Failed to encode SpanEvent. {k}={v} contains an unsupported type, {type(v)}") + if ret != 0: + return ret + return 0 + + cdef inline int _pack_links(self, list span_links): + ret = msgpack_pack_array(&self.pk, len(span_links)) + if ret != 0: + return ret + + for link in span_links: + # SpanLink.to_dict() returns all serializable span link fields + # v0.4 encoding is disabled by default. SpanLinks.to_dict() is optimized for the v0.5 format. + d = link.to_dict() + # Encode 128 bit trace ids usings two 64bit integers + tid = int(d["trace_id"][:16], 16) + if tid > 0: + d["trace_id_high"] = tid + d["trace_id"] = int(d["trace_id"][16:], 16) + # span id should be uint64 in v0.4 (it is hex in v0.5) + d["span_id"] = int(d["span_id"], 16) + if "flags" in d: + # If traceflags set, the high bit (bit 31) should be set to 1 (uint32). + # This helps us distinguish between when the sample decision is zero or not set + d["flags"] = d["flags"] | (1 << 31) + + ret = msgpack_pack_map(&self.pk, len(d)) + if ret != 0: + return ret + + for k, v in d.items(): + # pack the name of a span link field (ex: trace_id, span_id, flags, ...) + ret = pack_text(&self.pk, k) + if ret != 0: + return ret + # pack the value of a span link field (values can be number, string or dict) + if isinstance(v, (int, float)): + ret = pack_number(&self.pk, v) + elif isinstance(v, str): + ret = pack_text(&self.pk, v) + elif k == "attributes": + # span links can contain attributes, this is analougous to span tags + # attributes are serialized as a nested dict with string keys and values + attributes = v.items() + ret = msgpack_pack_map(&self.pk, len(attributes)) + for attr_k, attr_v in attributes: + ret = pack_text(&self.pk, attr_k) + if ret != 0: + return ret + ret = pack_text(&self.pk, attr_v) + if ret != 0: + return ret + else: + raise ValueError(f"Failed to encode SpanLink. {k}={v} contains an unsupported type, {type(v)}") + if ret != 0: + return ret + return 0 + + cdef inline int _pack_meta(self, object meta, char *dd_origin) except? -1: + cdef Py_ssize_t L + cdef int ret + cdef dict d + + if PyDict_CheckExact(meta): + d = meta + L = len(d) + (dd_origin is not NULL) + if L > ITEM_LIMIT: + raise ValueError("dict is too large") + + ret = msgpack_pack_map(&self.pk, L) + if ret == 0: + for k, v in d.items(): + ret = pack_text(&self.pk, k) + if ret != 0: + break + ret = pack_text(&self.pk, v) + if ret != 0: + break + if dd_origin is not NULL: + ret = pack_bytes(&self.pk, _ORIGIN_KEY, _ORIGIN_KEY_LEN) + if ret == 0: + ret = pack_bytes(&self.pk, dd_origin, strlen(dd_origin)) + if ret != 0: + return ret + return ret + + raise TypeError("Unhandled meta type: %r" % type(meta)) + + cdef inline int _pack_metrics(self, object metrics) except? -1: + cdef Py_ssize_t L + cdef int ret + cdef dict d + + if PyDict_CheckExact(metrics): + d = metrics + L = len(d) + if L > ITEM_LIMIT: + raise ValueError("dict is too large") + + ret = msgpack_pack_map(&self.pk, L) + if ret == 0: + for k, v in d.items(): + ret = pack_text(&self.pk, k) + if ret != 0: + break + ret = pack_number(&self.pk, v) + if ret != 0: + break + return ret + + raise TypeError("Unhandled metrics type: %r" % type(metrics)) + + cdef int pack_span(self, object span, void *dd_origin) except? -1: + cdef int ret + cdef Py_ssize_t L + cdef int has_span_type + cdef int has_meta + cdef int has_metrics + + has_error = (span.error != 0) + has_span_type = (span.span_type is not None) + has_span_events = (len(span._events) > 0) + has_meta = (len(span._meta) > 0 or dd_origin is not NULL) + has_metrics = (len(span._metrics) > 0) + has_parent_id = (span.parent_id is not None) + has_links = (len(span._links) > 0) + has_meta_struct = (len(span._meta_struct) > 0) + + L = 7 + has_span_type + has_meta + has_metrics + has_error + has_parent_id + has_links + has_meta_struct + + ret = msgpack_pack_map(&self.pk, L) + + if ret == 0: + ret = pack_bytes(&self.pk, b"trace_id", 8) + if ret != 0: + return ret + ret = pack_number(&self.pk, span._trace_id_64bits) + if ret != 0: + return ret + + if has_parent_id: + ret = pack_bytes(&self.pk, b"parent_id", 9) + if ret != 0: + return ret + ret = pack_number(&self.pk, span.parent_id) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"span_id", 7) + if ret != 0: + return ret + ret = pack_number(&self.pk, span.span_id) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"service", 7) + if ret != 0: + return ret + ret = pack_text(&self.pk, span.service) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"resource", 8) + if ret != 0: + return ret + ret = pack_text(&self.pk, span.resource) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"name", 4) + if ret != 0: + return ret + ret = pack_text(&self.pk, span.name) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"start", 5) + if ret != 0: + return ret + ret = pack_number(&self.pk, span.start_ns) + if ret != 0: + return ret + + ret = pack_bytes(&self.pk, b"duration", 8) + if ret != 0: + return ret + ret = pack_number(&self.pk, span.duration_ns) + if ret != 0: + return ret + + if has_error: + ret = pack_bytes(&self.pk, b"error", 5) + if ret != 0: + return ret + ret = msgpack_pack_long(&self.pk, 1) + if ret != 0: + return ret + + if has_span_type: + ret = pack_bytes(&self.pk, b"type", 4) + if ret != 0: + return ret + ret = pack_text(&self.pk, span.span_type) + if ret != 0: + return ret + + if has_links: + ret = pack_bytes(&self.pk, b"span_links", 10) + if ret != 0: + return ret + ret = self._pack_links(span._links) + if ret != 0: + return ret + + if has_span_events: + ret = pack_bytes(&self.pk, b"span_events", 11) + if ret != 0: + return ret + ret = self._pack_span_events(span._events) + if ret != 0: + return ret + + if has_meta: + ret = pack_bytes(&self.pk, b"meta", 4) + if ret != 0: + return ret + if ret != 0: + return ret + + if has_meta_struct: + ret = pack_bytes(&self.pk, b"meta_struct", 11) + if ret != 0: + return ret + + ret = msgpack_pack_map(&self.pk, len(span._meta_struct)) + if ret != 0: + return ret + for k, v in span._meta_struct.items(): + ret = pack_text(&self.pk, k) + if ret != 0: + return ret + value_packed = packb(v) + ret = msgpack_pack_bin(&self.pk, len(value_packed)) + if ret == 0: + ret = msgpack_pack_raw_body(&self.pk, value_packed, len(value_packed)) + if ret != 0: + return ret + + if has_metrics: + ret = pack_bytes(&self.pk, b"metrics", 7) + if ret != 0: + return ret + ret = self._pack_metrics(span._metrics) + if ret != 0: + return ret + + return ret cdef class MsgpackEncoderV05(MsgpackEncoderBase): cdef MsgpackStringTable _st diff --git a/ddtrace/internal/encoding.py b/ddtrace/internal/encoding.py index 0cf31f9f7ed..ca1c8c2c58b 100644 --- a/ddtrace/internal/encoding.py +++ b/ddtrace/internal/encoding.py @@ -8,12 +8,13 @@ from ._encoding import ListStringTable from ._encoding import MsgpackEncoderV04 +from ._encoding import MsgpackEncoderV041 from ._encoding import MsgpackEncoderV05 from .compat import ensure_text from .logger import get_logger -__all__ = ["MsgpackEncoderV04", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"] +__all__ = ["MsgpackEncoderV04", "MsgpackEncoderV041", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"] if TYPE_CHECKING: # pragma: no cover @@ -156,4 +157,5 @@ def encode(self, obj): MSGPACK_ENCODERS = { "v0.4": MsgpackEncoderV04, "v0.5": MsgpackEncoderV05, + "v0.4.1": MsgpackEncoderV041, } diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index 0ea519fa56f..537772bf10c 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -482,7 +482,10 @@ def __init__( ", ".join(sorted(WRITER_CLIENTS.keys())), ) self._api_version = sorted(WRITER_CLIENTS.keys())[-1] - client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size) + if self._api_version == "v0.4" and self.get_span_event_support_type(agent_url): + client = WRITER_CLIENTS["v0.4.1"](buffer_size, max_payload_size) + else: + client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size) _headers = { "Datadog-Meta-Lang": "python", @@ -513,16 +516,12 @@ def __init__( headers=_headers, report_metrics=report_metrics, ) - self._agent_top_level_span_events_support = False - info = agent.info(self.agent_url) - # if not info: - # log.warning("Could not read agent info endpoint") - # elif info and not info.get("span_events"): - # log.warning("Top-level span events not supported in this agent version") - # else: - # self._agent_top_level_span_events_support = True + + def get_span_event_support_type(self, agent_url): + info = agent.info(agent_url) if info and info.get("span_events"): - self._agent_top_level_span_events_support = True + return True + return False def recreate(self): # type: () -> HTTPWriter diff --git a/ddtrace/internal/writer/writer_client.py b/ddtrace/internal/writer/writer_client.py index f8cfd51b9f1..03ba35ab3ce 100644 --- a/ddtrace/internal/writer/writer_client.py +++ b/ddtrace/internal/writer/writer_client.py @@ -38,7 +38,20 @@ def __init__(self, buffer_size, max_payload_size): ) +class AgentWriterClientV401(WriterClientBase): + ENDPOINT = "v0.4/traces" + + def __init__(self, buffer_size, max_payload_size): + super(AgentWriterClientV401, self).__init__( + MSGPACK_ENCODERS["v0.4.1"]( + max_size=buffer_size, + max_item_size=max_payload_size, + ) + ) + + WRITER_CLIENTS = { "v0.4": AgentWriterClientV4, "v0.5": AgentWriterClientV5, + "v0.4.1": AgentWriterClientV401, }