Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
quinna-h committed Feb 25, 2025
1 parent 2387928 commit 6dadbd5
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 26 deletions.
8 changes: 1 addition & 7 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class Span(object):
"_links",
"_events",
"__weakref__",
"_span_event_formatting",
]

def __init__(
Expand All @@ -146,7 +145,6 @@ def __init__(
on_finish: Optional[List[Callable[["Span"], None]]] = None,
span_api: str = SPAN_API_DATADOG,
links: Optional[List[SpanLink]] = None,
span_event_formatting: Optional[bool] = False,
) -> None:
"""
Create a new span. Call `finish` once the traced operation is over.
Expand Down Expand Up @@ -187,7 +185,6 @@ def __init__(
self._resource = [resource or name]
self.span_type = span_type
self._span_api = span_api
self._span_event_formatting = span_event_formatting

self._meta: _MetaDictType = {}
self.error = 0
Expand Down Expand Up @@ -490,10 +487,7 @@ def _add_event(
self, name: str, attributes: Optional[Dict[str, _JSONType]] = None, timestamp: Optional[int] = None
) -> None:
"""Add an event to the span."""
if self._span_event_formatting is False:
# backwards compatibility: v0.5 and agent versions older than 7.63.0
self._events.append(SpanEvent(name, attributes, timestamp))
# TODO: add top-level span events for v0.4 and agent versions greater than or equal 7.63.0
self._events.append(SpanEvent(name, attributes, timestamp))

def get_metrics(self) -> _MetricDictType:
"""Return all metrics."""
Expand Down
7 changes: 0 additions & 7 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ def current_trace_context(self, *args, **kwargs) -> Optional[Context]:
return active.context
return None

def top_level_span_events_enabled(self) -> bool:
# assert correct writer type
assert isinstance(self._writer, AgentWriter)
return self._writer._agent_top_level_span_events_support

def get_log_correlation_context(self, active: Optional[Union[Context, Span]] = None) -> Dict[str, str]:
"""Retrieves the data used to correlate a log with the current active trace.
Generates a dictionary for custom logging instrumentation including the trace id and
Expand Down Expand Up @@ -758,7 +753,6 @@ def _start_span(
service = config.service_mapping.get(service, service)

links = context._span_links if not parent else []
span_event_support = self.top_level_span_events_enabled()
if trace_id or links or context._baggage:
# child_of a non-empty context, so either a local child span or from a remote context
span = Span(
Expand Down Expand Up @@ -794,7 +788,6 @@ def _start_span(
span_type=span_type,
span_api=span_api,
on_finish=[self._on_span_finish],
span_event_formatting=span_event_support,
)
if config._report_hostname:
span.set_tag_str(_HOSTNAME_KEY, hostname.get_hostname())
Expand Down
1 change: 0 additions & 1 deletion ddtrace/contrib/internal/graphql/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ def _set_span_errors(errors: List[GraphQLError], span: Span) -> None:
if extensions:
for key in extensions:
attributes[f"extensions.{key}"] = extensions[key]
# breakpoint()
span._add_event(
name="dd.graphql.query.error",
attributes=attributes,
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/_encoding.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ class MsgpackEncoderBase(BufferedEncoder):

class MsgpackEncoderV04(MsgpackEncoderBase): ...
class MsgpackEncoderV05(MsgpackEncoderBase): ...
class MsgpackEncoderV041(MsgpackEncoderBase): ...

def packb(o: Any, **kwargs) -> bytes: ...
302 changes: 302 additions & 0 deletions ddtrace/internal/_encoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,308 @@ cdef class MsgpackEncoderV04(MsgpackEncoderBase):

return ret

cdef class MsgpackEncoderV041(MsgpackEncoderBase):
cpdef flush(self):
with self._lock:
try:
return self.get_bytes(), len(self)
finally:
self._reset_buffer()

cdef void * get_dd_origin_ref(self, str dd_origin):
return string_to_buff(dd_origin)

cdef inline int _pack_span_events(self, list span_events):
ret = msgpack_pack_array(&self.pk, len(span_events))
if ret != 0:
return ret

for event in span_events:
# get dict representation of span event
d = dict(event)
ret = msgpack_pack_map(&self.pk, len(d))
if ret != 0:
return ret

for k, v in d.items():
# pack the name of a span event
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
if isinstance(v, (int, float)):
ret = pack_number(&self.pk, v)
elif isinstance(v, str):
ret = pack_text(&self.pk, v)
elif k == "attributes":
# span links can contain attributes, this is analougous to span tags
# attributes are serialized as a nested dict with string keys and values
attributes = v.items()
ret = msgpack_pack_map(&self.pk, len(attributes))
for attr_k, attr_v in attributes:
ret = pack_text(&self.pk, attr_k)
if ret != 0:
return ret
ret = pack_text(&self.pk, attr_v)
if ret != 0:
return ret
else:
raise ValueError(f"Failed to encode SpanEvent. {k}={v} contains an unsupported type, {type(v)}")
if ret != 0:
return ret
return 0

cdef inline int _pack_links(self, list span_links):
ret = msgpack_pack_array(&self.pk, len(span_links))
if ret != 0:
return ret

for link in span_links:
# SpanLink.to_dict() returns all serializable span link fields
# v0.4 encoding is disabled by default. SpanLinks.to_dict() is optimized for the v0.5 format.
d = link.to_dict()
# Encode 128 bit trace ids usings two 64bit integers
tid = int(d["trace_id"][:16], 16)
if tid > 0:
d["trace_id_high"] = tid
d["trace_id"] = int(d["trace_id"][16:], 16)
# span id should be uint64 in v0.4 (it is hex in v0.5)
d["span_id"] = int(d["span_id"], 16)
if "flags" in d:
# If traceflags set, the high bit (bit 31) should be set to 1 (uint32).
# This helps us distinguish between when the sample decision is zero or not set
d["flags"] = d["flags"] | (1 << 31)

ret = msgpack_pack_map(&self.pk, len(d))
if ret != 0:
return ret

for k, v in d.items():
# pack the name of a span link field (ex: trace_id, span_id, flags, ...)
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
# pack the value of a span link field (values can be number, string or dict)
if isinstance(v, (int, float)):
ret = pack_number(&self.pk, v)
elif isinstance(v, str):
ret = pack_text(&self.pk, v)
elif k == "attributes":
# span links can contain attributes, this is analougous to span tags
# attributes are serialized as a nested dict with string keys and values
attributes = v.items()
ret = msgpack_pack_map(&self.pk, len(attributes))
for attr_k, attr_v in attributes:
ret = pack_text(&self.pk, attr_k)
if ret != 0:
return ret
ret = pack_text(&self.pk, attr_v)
if ret != 0:
return ret
else:
raise ValueError(f"Failed to encode SpanLink. {k}={v} contains an unsupported type, {type(v)}")
if ret != 0:
return ret
return 0

cdef inline int _pack_meta(self, object meta, char *dd_origin) except? -1:
cdef Py_ssize_t L
cdef int ret
cdef dict d

if PyDict_CheckExact(meta):
d = <dict> meta
L = len(d) + (dd_origin is not NULL)
if L > ITEM_LIMIT:
raise ValueError("dict is too large")

ret = msgpack_pack_map(&self.pk, L)
if ret == 0:
for k, v in d.items():
ret = pack_text(&self.pk, k)
if ret != 0:
break
ret = pack_text(&self.pk, v)
if ret != 0:
break
if dd_origin is not NULL:
ret = pack_bytes(&self.pk, _ORIGIN_KEY, _ORIGIN_KEY_LEN)
if ret == 0:
ret = pack_bytes(&self.pk, dd_origin, strlen(dd_origin))
if ret != 0:
return ret
return ret

raise TypeError("Unhandled meta type: %r" % type(meta))

cdef inline int _pack_metrics(self, object metrics) except? -1:
cdef Py_ssize_t L
cdef int ret
cdef dict d

if PyDict_CheckExact(metrics):
d = <dict> metrics
L = len(d)
if L > ITEM_LIMIT:
raise ValueError("dict is too large")

ret = msgpack_pack_map(&self.pk, L)
if ret == 0:
for k, v in d.items():
ret = pack_text(&self.pk, k)
if ret != 0:
break
ret = pack_number(&self.pk, v)
if ret != 0:
break
return ret

raise TypeError("Unhandled metrics type: %r" % type(metrics))

cdef int pack_span(self, object span, void *dd_origin) except? -1:
cdef int ret
cdef Py_ssize_t L
cdef int has_span_type
cdef int has_meta
cdef int has_metrics

has_error = <bint> (span.error != 0)
has_span_type = <bint> (span.span_type is not None)
has_span_events = <bint> (len(span._events) > 0)
has_meta = <bint> (len(span._meta) > 0 or dd_origin is not NULL)
has_metrics = <bint> (len(span._metrics) > 0)
has_parent_id = <bint> (span.parent_id is not None)
has_links = <bint> (len(span._links) > 0)
has_meta_struct = <bint> (len(span._meta_struct) > 0)

L = 7 + has_span_type + has_meta + has_metrics + has_error + has_parent_id + has_links + has_meta_struct

ret = msgpack_pack_map(&self.pk, L)

if ret == 0:
ret = pack_bytes(&self.pk, <char *> b"trace_id", 8)
if ret != 0:
return ret
ret = pack_number(&self.pk, span._trace_id_64bits)
if ret != 0:
return ret

if has_parent_id:
ret = pack_bytes(&self.pk, <char *> b"parent_id", 9)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.parent_id)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"span_id", 7)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.span_id)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"service", 7)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.service)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"resource", 8)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.resource)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"name", 4)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.name)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"start", 5)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.start_ns)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"duration", 8)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.duration_ns)
if ret != 0:
return ret

if has_error:
ret = pack_bytes(&self.pk, <char *> b"error", 5)
if ret != 0:
return ret
ret = msgpack_pack_long(&self.pk, <long> 1)
if ret != 0:
return ret

if has_span_type:
ret = pack_bytes(&self.pk, <char *> b"type", 4)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.span_type)
if ret != 0:
return ret

if has_links:
ret = pack_bytes(&self.pk, <char *> b"span_links", 10)
if ret != 0:
return ret
ret = self._pack_links(span._links)
if ret != 0:
return ret

if has_span_events:
ret = pack_bytes(&self.pk, <char *> b"span_events", 11)
if ret != 0:
return ret
ret = self._pack_span_events(span._events)
if ret != 0:
return ret

if has_meta:
ret = pack_bytes(&self.pk, <char *> b"meta", 4)
if ret != 0:
return ret
if ret != 0:
return ret

if has_meta_struct:
ret = pack_bytes(&self.pk, <char *> b"meta_struct", 11)
if ret != 0:
return ret

ret = msgpack_pack_map(&self.pk, len(span._meta_struct))
if ret != 0:
return ret
for k, v in span._meta_struct.items():
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
value_packed = packb(v)
ret = msgpack_pack_bin(&self.pk, len(value_packed))
if ret == 0:
ret = msgpack_pack_raw_body(&self.pk, <char *> value_packed, len(value_packed))
if ret != 0:
return ret

if has_metrics:
ret = pack_bytes(&self.pk, <char *> b"metrics", 7)
if ret != 0:
return ret
ret = self._pack_metrics(span._metrics)
if ret != 0:
return ret

return ret

cdef class MsgpackEncoderV05(MsgpackEncoderBase):
cdef MsgpackStringTable _st
Expand Down
Loading

0 comments on commit 6dadbd5

Please sign in to comment.