Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 112 additions & 37 deletions src/instana/instrumentation/kafka/confluent_kafka_python.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
# (c) Copyright IBM Corp. 2025


try:
import contextvars
from typing import Any, Callable, Dict, List, Optional, Tuple

import confluent_kafka # noqa: F401
import wrapt
from confluent_kafka import Consumer, Producer
from opentelemetry import context, trace
from opentelemetry.trace import SpanKind

from instana.log import logger
from instana.propagators.format import Format
from instana.singletons import get_tracer
from instana.util.traceutils import (
get_tracer_tuple,
tracing_is_off,
)
from instana.span.span import InstanaSpan

consumer_token: Dict[str, Any] = {}
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")

# As confluent_kafka is a wrapper around the C-developed librdkafka
# (provided automatically via binary wheels), we have to create new classes
Expand Down Expand Up @@ -105,78 +113,142 @@ def create_span(
headers: Optional[List[Tuple[str, bytes]]] = [],
exception: Optional[str] = None,
) -> None:
tracer, parent_span, _ = get_tracer_tuple()
parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
try:
span = consumer_span.get(None)
if span is not None:
close_consumer_span(span)

tracer, parent_span, _ = get_tracer_tuple()

if not tracer:
tracer = get_tracer()
is_suppressed = False

if topic:
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
span_type,
topic,
)

if not is_suppressed and headers:
for header_name, header_value in headers:
if header_name == "x_instana_l_s" and header_value == b"0":
is_suppressed = True
break

if is_suppressed:
return

parent_context = (
parent_span.get_span_context()
if parent_span
else (
tracer.extract(
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
)
if tracer.exporter.options.kafka_trace_correlation
else None
)
)
span = tracer.start_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
)
)
with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
if topic:
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", span_type)

if exception:
span.record_exception(exception)
span.end()

save_consumer_span_into_context(span)
except Exception as e:
logger.debug(f"Error while creating kafka-consumer span: {e}")

def save_consumer_span_into_context(span: "InstanaSpan") -> None:
ctx = trace.set_span_in_context(span)
token = context.attach(ctx)
consumer_token["token"] = token
consumer_span.set(span)

def close_consumer_span(span: "InstanaSpan") -> None:
if span.is_recording():
span.end()
consumer_span.set(None)
if "token" in consumer_token:
context.detach(consumer_token.pop("token", None))

def clear_context() -> None:
context.attach(trace.set_span_in_context(None))
consumer_token.clear()
consumer_span.set(None)

def trace_kafka_consume(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
instance: InstanaConfluentKafkaConsumer,
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> List[confluent_kafka.Message]:
if tracing_is_off():
return wrapped(*args, **kwargs)

res = None
exception = None

try:
res = wrapped(*args, **kwargs)
for message in res:
create_span("consume", message.topic(), message.headers())
return res
except Exception as exc:
exception = exc
finally:
if res:
for message in res:
create_span("consume", message.topic(), message.headers())
else:
create_span("consume", exception=exception)
create_span("consume", exception=exception)

def trace_kafka_close(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.close],
instance: InstanaConfluentKafkaConsumer,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> None:
try:
# Close any existing consumer span before closing the consumer
span = consumer_span.get(None)
if span is not None:
close_consumer_span(span)

# Execute the actual close operation
result = wrapped(*args, **kwargs)

return res
logger.debug("Kafka consumer closed and spans cleaned up")
return result

except Exception as exc:
logger.debug(f"Error while closing Kafka consumer: {exc}")
# Still try to clean up the span even if close fails
span = consumer_span.get(None)
if span is not None:
close_consumer_span(span)
raise

def trace_kafka_poll(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
instance: InstanaConfluentKafkaConsumer,
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> Optional[confluent_kafka.Message]:
if tracing_is_off():
return wrapped(*args, **kwargs)

res = None
exception = None

try:
res = wrapped(*args, **kwargs)
create_span("poll", res.topic(), res.headers())
return res
except Exception as exc:
exception = exc
finally:
if res:
create_span("poll", res.topic(), res.headers())
else:
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)

return res
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)

# Apply the monkey patch
confluent_kafka.Producer = InstanaConfluentKafkaProducer
Expand All @@ -189,6 +261,9 @@ def trace_kafka_poll(
InstanaConfluentKafkaConsumer, "consume", trace_kafka_consume
)
wrapt.wrap_function_wrapper(InstanaConfluentKafkaConsumer, "poll", trace_kafka_poll)
wrapt.wrap_function_wrapper(
InstanaConfluentKafkaConsumer, "close", trace_kafka_close
)

logger.debug("Instrumenting Kafka (confluent_kafka)")
except ImportError:
Expand Down
Loading
Loading