Skip to content

Commit 734b679

Browse files
committed
fix(confluent): adapt python tracer to trace-test-suite
Signed-off-by: Cagri Yonca <[email protected]>
1 parent ea9c383 commit 734b679

File tree

2 files changed

+196
-75
lines changed

2 files changed

+196
-75
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 112 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
# (c) Copyright IBM Corp. 2025
22

3+
34
try:
5+
import contextvars
46
from typing import Any, Callable, Dict, List, Optional, Tuple
57

68
import confluent_kafka # noqa: F401
79
import wrapt
810
from confluent_kafka import Consumer, Producer
11+
from opentelemetry import context, trace
912
from opentelemetry.trace import SpanKind
1013

1114
from instana.log import logger
1215
from instana.propagators.format import Format
16+
from instana.singletons import get_tracer
1317
from instana.util.traceutils import (
1418
get_tracer_tuple,
1519
tracing_is_off,
1620
)
21+
from instana.span.span import InstanaSpan
22+
23+
consumer_token: Dict[str, Any] = {}
24+
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")
1725

1826
# As confluent_kafka is a wrapper around the C-developed librdkafka
1927
# (provided automatically via binary wheels), we have to create new classes
@@ -105,78 +113,142 @@ def create_span(
105113
headers: Optional[List[Tuple[str, bytes]]] = [],
106114
exception: Optional[str] = None,
107115
) -> None:
108-
tracer, parent_span, _ = get_tracer_tuple()
109-
parent_context = (
110-
parent_span.get_span_context()
111-
if parent_span
112-
else tracer.extract(
113-
Format.KAFKA_HEADERS,
114-
headers,
115-
disable_w3c_trace_context=True,
116+
try:
117+
span = consumer_span.get(None)
118+
if span is not None:
119+
close_consumer_span(span)
120+
121+
tracer, parent_span, _ = get_tracer_tuple()
122+
123+
if not tracer:
124+
tracer = get_tracer()
125+
is_suppressed = False
126+
127+
if topic:
128+
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
129+
"kafka",
130+
span_type,
131+
topic,
132+
)
133+
134+
if not is_suppressed and headers:
135+
for header_name, header_value in headers:
136+
if header_name == "x_instana_l_s" and header_value == b"0":
137+
is_suppressed = True
138+
break
139+
140+
if is_suppressed:
141+
return
142+
143+
parent_context = (
144+
parent_span.get_span_context()
145+
if parent_span
146+
else (
147+
tracer.extract(
148+
Format.KAFKA_HEADERS,
149+
headers,
150+
disable_w3c_trace_context=True,
151+
)
152+
if tracer.exporter.options.kafka_trace_correlation
153+
else None
154+
)
155+
)
156+
span = tracer.start_span(
157+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
116158
)
117-
)
118-
with tracer.start_as_current_span(
119-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
120-
) as span:
121159
if topic:
122160
span.set_attribute("kafka.service", topic)
123161
span.set_attribute("kafka.access", span_type)
124-
125162
if exception:
126163
span.record_exception(exception)
164+
span.end()
165+
166+
save_consumer_span_into_context(span)
167+
except Exception as e:
168+
logger.debug(f"Error while creating kafka-consumer span: {e}")
169+
170+
def save_consumer_span_into_context(span: "InstanaSpan") -> None:
171+
ctx = trace.set_span_in_context(span)
172+
token = context.attach(ctx)
173+
consumer_token["token"] = token
174+
consumer_span.set(span)
175+
176+
def close_consumer_span(span: "InstanaSpan") -> None:
177+
if span.is_recording():
178+
span.end()
179+
consumer_span.set(None)
180+
if "token" in consumer_token:
181+
context.detach(consumer_token.pop("token", None))
182+
183+
def clear_context() -> None:
184+
context.attach(trace.set_span_in_context(None))
185+
consumer_token.clear()
186+
consumer_span.set(None)
127187

128188
def trace_kafka_consume(
129189
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
130190
instance: InstanaConfluentKafkaConsumer,
131191
args: Tuple[int, str, Tuple[Any, ...]],
132192
kwargs: Dict[str, Any],
133193
) -> List[confluent_kafka.Message]:
134-
if tracing_is_off():
135-
return wrapped(*args, **kwargs)
136-
137194
res = None
138195
exception = None
139196

140197
try:
141198
res = wrapped(*args, **kwargs)
199+
for message in res:
200+
create_span("consume", message.topic(), message.headers())
201+
return res
142202
except Exception as exc:
143203
exception = exc
144-
finally:
145-
if res:
146-
for message in res:
147-
create_span("consume", message.topic(), message.headers())
148-
else:
149-
create_span("consume", exception=exception)
204+
create_span("consume", exception=exception)
205+
206+
def trace_kafka_close(
207+
wrapped: Callable[..., InstanaConfluentKafkaConsumer.close],
208+
instance: InstanaConfluentKafkaConsumer,
209+
args: Tuple[Any, ...],
210+
kwargs: Dict[str, Any],
211+
) -> None:
212+
try:
213+
# Close any existing consumer span before closing the consumer
214+
span = consumer_span.get(None)
215+
if span is not None:
216+
close_consumer_span(span)
217+
218+
# Execute the actual close operation
219+
result = wrapped(*args, **kwargs)
150220

151-
return res
221+
logger.debug("Kafka consumer closed and spans cleaned up")
222+
return result
223+
224+
except Exception as exc:
225+
logger.debug(f"Error while closing Kafka consumer: {exc}")
226+
# Still try to clean up the span even if close fails
227+
span = consumer_span.get(None)
228+
if span is not None:
229+
close_consumer_span(span)
230+
raise
152231

153232
def trace_kafka_poll(
154233
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
155234
instance: InstanaConfluentKafkaConsumer,
156235
args: Tuple[int, str, Tuple[Any, ...]],
157236
kwargs: Dict[str, Any],
158237
) -> Optional[confluent_kafka.Message]:
159-
if tracing_is_off():
160-
return wrapped(*args, **kwargs)
161-
162238
res = None
163239
exception = None
164240

165241
try:
166242
res = wrapped(*args, **kwargs)
243+
create_span("poll", res.topic(), res.headers())
244+
return res
167245
except Exception as exc:
168246
exception = exc
169-
finally:
170-
if res:
171-
create_span("poll", res.topic(), res.headers())
172-
else:
173-
create_span(
174-
"poll",
175-
next(iter(instance.list_topics().topics)),
176-
exception=exception,
177-
)
178-
179-
return res
247+
create_span(
248+
"poll",
249+
next(iter(instance.list_topics().topics)),
250+
exception=exception,
251+
)
180252

181253
# Apply the monkey patch
182254
confluent_kafka.Producer = InstanaConfluentKafkaProducer
@@ -189,6 +261,9 @@ def trace_kafka_poll(
189261
InstanaConfluentKafkaConsumer, "consume", trace_kafka_consume
190262
)
191263
wrapt.wrap_function_wrapper(InstanaConfluentKafkaConsumer, "poll", trace_kafka_poll)
264+
wrapt.wrap_function_wrapper(
265+
InstanaConfluentKafkaConsumer, "close", trace_kafka_close
266+
)
192267

193268
logger.debug("Instrumenting Kafka (confluent_kafka)")
194269
except ImportError:

0 commit comments

Comments
 (0)