Skip to content

Commit a72d810

Browse files
committed
fix(kafka): adapt python tracer to trace-test-suite
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 734b679 commit a72d810

File tree

2 files changed

+217
-76
lines changed

2 files changed

+217
-76
lines changed

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 105 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,31 @@
11
# (c) Copyright IBM Corp. 2025
22

3+
34
try:
5+
import contextvars
46
import inspect
57
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
68

79
import kafka # noqa: F401
810
import wrapt
11+
from opentelemetry import context, trace
912
from opentelemetry.trace import SpanKind
1013

1114
from instana.log import logger
1215
from instana.propagators.format import Format
16+
from instana.singletons import get_tracer
1317
from instana.util.traceutils import (
1418
get_tracer_tuple,
1519
tracing_is_off,
1620
)
21+
from instana.span.span import InstanaSpan
1722

1823
if TYPE_CHECKING:
1924
from kafka.producer.future import FutureRecordMetadata
2025

26+
consumer_token: Dict[str, Any] = {}
27+
consumer_span = contextvars.ContextVar("kafka_python_consumer_span")
28+
2129
@wrapt.patch_function_wrapper("kafka", "KafkaProducer.send")
2230
def trace_kafka_send(
2331
wrapped: Callable[..., "kafka.KafkaProducer.send"],
@@ -59,35 +67,83 @@ def trace_kafka_send(
5967
kwargs["headers"] = headers
6068
try:
6169
res = wrapped(*args, **kwargs)
70+
return res
6271
except Exception as exc:
6372
span.record_exception(exc)
64-
else:
65-
return res
6673

6774
def create_span(
6875
span_type: str,
6976
topic: Optional[str],
7077
headers: Optional[List[Tuple[str, bytes]]] = [],
71-
exception: Optional[str] = None,
78+
exception: Optional[Exception] = None,
7279
) -> None:
73-
tracer, parent_span, _ = get_tracer_tuple()
74-
parent_context = (
75-
parent_span.get_span_context()
76-
if parent_span
77-
else tracer.extract(
78-
Format.KAFKA_HEADERS,
79-
headers,
80-
disable_w3c_trace_context=True,
80+
try:
81+
span = consumer_span.get(None)
82+
if span is not None:
83+
close_consumer_span(span)
84+
85+
tracer, parent_span, _ = get_tracer_tuple()
86+
87+
if not tracer:
88+
tracer = get_tracer()
89+
90+
is_suppressed = False
91+
if topic:
92+
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
93+
"kafka",
94+
span_type,
95+
topic,
96+
)
97+
98+
if not is_suppressed and headers:
99+
for header_name, header_value in headers:
100+
if header_name == "x_instana_l_s" and header_value == b"0":
101+
is_suppressed = True
102+
break
103+
104+
if is_suppressed:
105+
return
106+
107+
parent_context = (
108+
parent_span.get_span_context()
109+
if parent_span
110+
else tracer.extract(
111+
Format.KAFKA_HEADERS,
112+
headers,
113+
disable_w3c_trace_context=True,
114+
)
115+
)
116+
span = tracer.start_span(
117+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
81118
)
82-
)
83-
with tracer.start_as_current_span(
84-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
85-
) as span:
86119
if topic:
87120
span.set_attribute("kafka.service", topic)
88121
span.set_attribute("kafka.access", span_type)
89122
if exception:
90123
span.record_exception(exception)
124+
span.end()
125+
126+
save_consumer_span_into_context(span)
127+
except Exception as e:
128+
logger.debug(f"Error while creating kafka-consumer span: {e}")
129+
130+
def save_consumer_span_into_context(span: "InstanaSpan") -> None:
131+
ctx = trace.set_span_in_context(span)
132+
token = context.attach(ctx)
133+
consumer_token["token"] = token
134+
consumer_span.set(span)
135+
136+
def close_consumer_span(span: "InstanaSpan") -> None:
137+
if span.is_recording():
138+
span.end()
139+
consumer_span.set(None)
140+
if "token" in consumer_token:
141+
context.detach(consumer_token.pop("token", None))
142+
143+
def clear_context() -> None:
144+
context.attach(trace.set_span_in_context(None))
145+
consumer_token.clear()
146+
consumer_span.set(None)
91147

92148
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
93149
def trace_kafka_consume(
@@ -96,29 +152,39 @@ def trace_kafka_consume(
96152
args: Tuple[int, str, Tuple[Any, ...]],
97153
kwargs: Dict[str, Any],
98154
) -> "FutureRecordMetadata":
99-
if tracing_is_off():
100-
return wrapped(*args, **kwargs)
101-
102155
exception = None
103156
res = None
104157

105158
try:
106159
res = wrapped(*args, **kwargs)
160+
create_span(
161+
"consume",
162+
res.topic if res else list(instance.subscription())[0],
163+
res.headers,
164+
)
165+
return res
166+
except StopIteration:
167+
pass
107168
except Exception as exc:
108169
exception = exc
109-
finally:
110-
if res:
111-
create_span(
112-
"consume",
113-
res.topic if res else list(instance.subscription())[0],
114-
res.headers,
115-
)
116-
else:
117-
create_span(
118-
"consume", list(instance.subscription())[0], exception=exception
119-
)
170+
create_span(
171+
"consume", list(instance.subscription())[0], exception=exception
172+
)
120173

121-
return res
174+
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.close")
175+
def trace_kafka_close(
176+
wrapped: Callable[..., None],
177+
instance: "kafka.KafkaConsumer",
178+
args: Tuple[Any, ...],
179+
kwargs: Dict[str, Any],
180+
) -> None:
181+
try:
182+
span = consumer_span.get(None)
183+
if span is not None:
184+
close_consumer_span(span)
185+
except Exception as e:
186+
logger.debug(f"Error while closing kafka-consumer span: {e}")
187+
return wrapped(*args, **kwargs)
122188

123189
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
124190
def trace_kafka_poll(
@@ -127,9 +193,6 @@ def trace_kafka_poll(
127193
args: Tuple[int, str, Tuple[Any, ...]],
128194
kwargs: Dict[str, Any],
129195
) -> Optional[Dict[str, Any]]:
130-
if tracing_is_off():
131-
return wrapped(*args, **kwargs)
132-
133196
# The KafkaConsumer.consume() from the kafka-python-ng call the
134197
# KafkaConsumer.poll() internally, so we do not consider it here.
135198
if any(
@@ -143,23 +206,17 @@ def trace_kafka_poll(
143206

144207
try:
145208
res = wrapped(*args, **kwargs)
209+
for partition, consumer_records in res.items():
210+
for message in consumer_records:
211+
create_span(
212+
"poll",
213+
partition.topic,
214+
message.headers if hasattr(message, "headers") else [],
215+
)
216+
return res
146217
except Exception as exc:
147218
exception = exc
148-
finally:
149-
if res:
150-
for partition, consumer_records in res.items():
151-
for message in consumer_records:
152-
create_span(
153-
"poll",
154-
partition.topic,
155-
message.headers if hasattr(message, "headers") else [],
156-
)
157-
else:
158-
create_span(
159-
"poll", list(instance.subscription())[0], exception=exception
160-
)
161-
162-
return res
219+
create_span("poll", list(instance.subscription())[0], exception=exception)
163220

164221
logger.debug("Instrumenting Kafka (kafka-python)")
165222
except ImportError:

0 commit comments

Comments
 (0)