Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

top level span events support #12504

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddtrace/internal/_encoding.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ class MsgpackEncoderBase(BufferedEncoder):

class MsgpackEncoderV04(MsgpackEncoderBase): ...
class MsgpackEncoderV05(MsgpackEncoderBase): ...
class MsgpackEncoderV041(MsgpackEncoderBase): ...

def packb(o: Any, **kwargs) -> bytes: ...
301 changes: 301 additions & 0 deletions ddtrace/internal/_encoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,307 @@ cdef class MsgpackEncoderV04(MsgpackEncoderBase):

return ret

cdef class MsgpackEncoderV041(MsgpackEncoderBase):
cpdef flush(self):
with self._lock:
try:
return self.get_bytes(), len(self)
finally:
self._reset_buffer()

cdef void * get_dd_origin_ref(self, str dd_origin):
return string_to_buff(dd_origin)

cdef inline int _pack_span_events(self, list span_events):
ret = msgpack_pack_array(&self.pk, len(span_events))
if ret != 0:
return ret

for event in span_events:
# get dict representation of span event
d = dict(event)
ret = msgpack_pack_map(&self.pk, len(d))
if ret != 0:
return ret

for k, v in d.items():
# pack the name of a span event
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
if isinstance(v, (int, float)):
ret = pack_number(&self.pk, v)
elif isinstance(v, str):
ret = pack_text(&self.pk, v)
elif k == "attributes":
# span events can contain attributes, this is analougous to span tags
attributes = v.items()
ret = msgpack_pack_map(&self.pk, len(attributes))
for attr_k, attr_v in attributes:
ret = pack_text(&self.pk, attr_k)
if ret != 0:
return ret
ret = pack_text(&self.pk, attr_v)
if ret != 0:
return ret
else:
raise ValueError(f"Failed to encode SpanEvent. {k}={v} contains an unsupported type, {type(v)}")
if ret != 0:
return ret
return 0

cdef inline int _pack_links(self, list span_links):
ret = msgpack_pack_array(&self.pk, len(span_links))
if ret != 0:
return ret

for link in span_links:
# SpanLink.to_dict() returns all serializable span link fields
# v0.4 encoding is disabled by default. SpanLinks.to_dict() is optimized for the v0.5 format.
d = link.to_dict()
# Encode 128 bit trace ids usings two 64bit integers
tid = int(d["trace_id"][:16], 16)
if tid > 0:
d["trace_id_high"] = tid
d["trace_id"] = int(d["trace_id"][16:], 16)
# span id should be uint64 in v0.4 (it is hex in v0.5)
d["span_id"] = int(d["span_id"], 16)
if "flags" in d:
# If traceflags set, the high bit (bit 31) should be set to 1 (uint32).
# This helps us distinguish between when the sample decision is zero or not set
d["flags"] = d["flags"] | (1 << 31)

ret = msgpack_pack_map(&self.pk, len(d))
if ret != 0:
return ret

for k, v in d.items():
# pack the name of a span link field (ex: trace_id, span_id, flags, ...)
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
# pack the value of a span link field (values can be number, string or dict)
if isinstance(v, (int, float)):
ret = pack_number(&self.pk, v)
elif isinstance(v, str):
ret = pack_text(&self.pk, v)
elif k == "attributes":
# span links can contain attributes, this is analougous to span tags
# attributes are serialized as a nested dict with string keys and values
attributes = v.items()
ret = msgpack_pack_map(&self.pk, len(attributes))
for attr_k, attr_v in attributes:
ret = pack_text(&self.pk, attr_k)
if ret != 0:
return ret
ret = pack_text(&self.pk, attr_v)
if ret != 0:
return ret
else:
raise ValueError(f"Failed to encode SpanLink. {k}={v} contains an unsupported type, {type(v)}")
if ret != 0:
return ret
return 0

cdef inline int _pack_meta(self, object meta, char *dd_origin) except? -1:
cdef Py_ssize_t L
cdef int ret
cdef dict d

if PyDict_CheckExact(meta):
d = <dict> meta
L = len(d) + (dd_origin is not NULL)
if L > ITEM_LIMIT:
raise ValueError("dict is too large")

ret = msgpack_pack_map(&self.pk, L)
if ret == 0:
for k, v in d.items():
ret = pack_text(&self.pk, k)
if ret != 0:
break
ret = pack_text(&self.pk, v)
if ret != 0:
break
if dd_origin is not NULL:
ret = pack_bytes(&self.pk, _ORIGIN_KEY, _ORIGIN_KEY_LEN)
if ret == 0:
ret = pack_bytes(&self.pk, dd_origin, strlen(dd_origin))
if ret != 0:
return ret
return ret

raise TypeError("Unhandled meta type: %r" % type(meta))

cdef inline int _pack_metrics(self, object metrics) except? -1:
cdef Py_ssize_t L
cdef int ret
cdef dict d

if PyDict_CheckExact(metrics):
d = <dict> metrics
L = len(d)
if L > ITEM_LIMIT:
raise ValueError("dict is too large")

ret = msgpack_pack_map(&self.pk, L)
if ret == 0:
for k, v in d.items():
ret = pack_text(&self.pk, k)
if ret != 0:
break
ret = pack_number(&self.pk, v)
if ret != 0:
break
return ret

raise TypeError("Unhandled metrics type: %r" % type(metrics))

cdef int pack_span(self, object span, void *dd_origin) except? -1:
cdef int ret
cdef Py_ssize_t L
cdef int has_span_type
cdef int has_meta
cdef int has_metrics

has_error = <bint> (span.error != 0)
has_span_type = <bint> (span.span_type is not None)
has_span_events = <bint> (len(span._events) > 0)
has_meta = <bint> (len(span._meta) > 0 or dd_origin is not NULL)
has_metrics = <bint> (len(span._metrics) > 0)
has_parent_id = <bint> (span.parent_id is not None)
has_links = <bint> (len(span._links) > 0)
has_meta_struct = <bint> (len(span._meta_struct) > 0)

L = 7 + has_span_type + has_meta + has_metrics + has_error + has_parent_id + has_links + has_meta_struct

ret = msgpack_pack_map(&self.pk, L)

if ret == 0:
ret = pack_bytes(&self.pk, <char *> b"trace_id", 8)
if ret != 0:
return ret
ret = pack_number(&self.pk, span._trace_id_64bits)
if ret != 0:
return ret

if has_parent_id:
ret = pack_bytes(&self.pk, <char *> b"parent_id", 9)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.parent_id)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"span_id", 7)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.span_id)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"service", 7)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.service)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"resource", 8)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.resource)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"name", 4)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.name)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"start", 5)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.start_ns)
if ret != 0:
return ret

ret = pack_bytes(&self.pk, <char *> b"duration", 8)
if ret != 0:
return ret
ret = pack_number(&self.pk, span.duration_ns)
if ret != 0:
return ret

if has_error:
ret = pack_bytes(&self.pk, <char *> b"error", 5)
if ret != 0:
return ret
ret = msgpack_pack_long(&self.pk, <long> 1)
if ret != 0:
return ret

if has_span_type:
ret = pack_bytes(&self.pk, <char *> b"type", 4)
if ret != 0:
return ret
ret = pack_text(&self.pk, span.span_type)
if ret != 0:
return ret

if has_links:
ret = pack_bytes(&self.pk, <char *> b"span_links", 10)
if ret != 0:
return ret
ret = self._pack_links(span._links)
if ret != 0:
return ret

if has_span_events:
ret = pack_bytes(&self.pk, <char *> b"span_events", 11)
if ret != 0:
return ret
ret = self._pack_span_events(span._events)
if ret != 0:
return ret

if has_meta:
ret = pack_bytes(&self.pk, <char *> b"meta", 4)
if ret != 0:
return ret
if ret != 0:
return ret

if has_meta_struct:
ret = pack_bytes(&self.pk, <char *> b"meta_struct", 11)
if ret != 0:
return ret

ret = msgpack_pack_map(&self.pk, len(span._meta_struct))
if ret != 0:
return ret
for k, v in span._meta_struct.items():
ret = pack_text(&self.pk, k)
if ret != 0:
return ret
value_packed = packb(v)
ret = msgpack_pack_bin(&self.pk, len(value_packed))
if ret == 0:
ret = msgpack_pack_raw_body(&self.pk, <char *> value_packed, len(value_packed))
if ret != 0:
return ret

if has_metrics:
ret = pack_bytes(&self.pk, <char *> b"metrics", 7)
if ret != 0:
return ret
ret = self._pack_metrics(span._metrics)
if ret != 0:
return ret

return ret

cdef class MsgpackEncoderV05(MsgpackEncoderBase):
cdef MsgpackStringTable _st
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/internal/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

from ._encoding import ListStringTable
from ._encoding import MsgpackEncoderV04
from ._encoding import MsgpackEncoderV041
from ._encoding import MsgpackEncoderV05
from .compat import ensure_text
from .logger import get_logger


__all__ = ["MsgpackEncoderV04", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"]
__all__ = ["MsgpackEncoderV04", "MsgpackEncoderV041", "MsgpackEncoderV05", "ListStringTable", "MSGPACK_ENCODERS"]


if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -156,4 +157,5 @@ def encode(self, obj):
MSGPACK_ENCODERS = {
"v0.4": MsgpackEncoderV04,
"v0.5": MsgpackEncoderV05,
"v0.4.1": MsgpackEncoderV041,
}
12 changes: 11 additions & 1 deletion ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import ddtrace
from ddtrace import config
from ddtrace.internal import agent
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
from ddtrace.settings.asm import config as asm_config
from ddtrace.vendor.dogstatsd import DogStatsd
Expand Down Expand Up @@ -481,7 +482,10 @@ def __init__(
", ".join(sorted(WRITER_CLIENTS.keys())),
)
self._api_version = sorted(WRITER_CLIENTS.keys())[-1]
client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size)
if self._api_version == "v0.4" and self.get_span_event_support_type(agent_url):
client = WRITER_CLIENTS["v0.4.1"](buffer_size, max_payload_size)
else:
client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size)

_headers = {
"Datadog-Meta-Lang": "python",
Expand Down Expand Up @@ -513,6 +517,12 @@ def __init__(
report_metrics=report_metrics,
)

def get_span_event_support_type(self, agent_url):
info = agent.info(agent_url)
if info and info.get("span_events"):
return True
return False

def recreate(self):
# type: () -> HTTPWriter
return self.__class__(
Expand Down
13 changes: 13 additions & 0 deletions ddtrace/internal/writer/writer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,20 @@ def __init__(self, buffer_size, max_payload_size):
)


class AgentWriterClientV401(WriterClientBase):
ENDPOINT = "v0.4/traces"

def __init__(self, buffer_size, max_payload_size):
super(AgentWriterClientV401, self).__init__(
MSGPACK_ENCODERS["v0.4.1"](
max_size=buffer_size,
max_item_size=max_payload_size,
)
)


WRITER_CLIENTS = {
"v0.4": AgentWriterClientV4,
"v0.5": AgentWriterClientV5,
"v0.4.1": AgentWriterClientV401,
}
Loading