Skip to content

Add trace ID header to responses #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified dist/singlestore_pulse-0.1-py3-none-any.whl
Binary file not shown.
Binary file modified dist/singlestore_pulse-0.1.tar.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion pulse_otel/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from pulse_otel.main import Pulse, CustomFileSpanExporter, FileLogExporter, pulse_agent, pulse_tool
from pulse_otel.main import Pulse, CustomFileSpanExporter, FileLogExporter, pulse_agent, pulse_tool, healthcheck
1 change: 1 addition & 0 deletions pulse_otel/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
OTEL_COLLECTOR_ENDPOINT = "http://otel-collector-{PROJECTID_PLACEHOLDER}.observability.svc.cluster.local:4317"

HEADER_INCOMING_SESSION_ID = "singlestore-session-id"
TRACEID_RESPONSE_HEADER = "singlestore-trace-id"

# Formatted attribute names
APP_TYPE = "singlestore.nova.app.type"
Expand Down
243 changes: 121 additions & 122 deletions pulse_otel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import os
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import agent, tool
from opentelemetry import _logs
from opentelemetry import _logs, trace

from opentelemetry.trace import get_current_span

from opentelemetry.context import attach, set_value
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogData
Expand Down Expand Up @@ -37,6 +39,7 @@
HEADER_INCOMING_SESSION_ID,
PROJECT,
LIVE_LOGS_FILE_PATH,
TRACEID_RESPONSE_HEADER
)
import logging

Expand Down Expand Up @@ -181,58 +184,26 @@ def init_log_provider(self):
logging.root.setLevel(logging.INFO)
logging.root.addHandler(handler)
return log_exporter


def pulse_add_session_id(self, session_id=None, **kwargs):
@staticmethod
def add_traceid_header(result: Response, traceID: str) -> Response:
"""
Decorator to set Traceloop association properties for a function.

Parameters:
- session_id: Optional session_id identifier
- **kwargs: Any additional association properties
Adds a trace ID header to the response.
"""
def decorator(func):
def wrapper(*args, **kwargs_inner):

properties = {}
if session_id:
properties["session_id"] = session_id
properties.update(kwargs)

# Set the association properties
Traceloop.set_association_properties(properties)
return func(*args, **kwargs_inner)
return wrapper
return decorator


def add_traceid_header(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(request: Request, *args, **kwargs) -> Response:
# Generate unique trace ID
trace_id = str(uuid.uuid4())

# Extract session ID from request headers if present
session_id = request.headers.get("X-SINGLESTORE-AI-SESSION-ID", "N/A")

try:
# Execute the original function
result = await func(request, *args, **kwargs)

# If result is already a Response object
if isinstance(result, Response):
result.headers["X-SINGLESTORE-TRACE-ID"] = trace_id
return result

return JSONResponse(
content=result,
headers={"X-SINGLESTORE-TRACE-ID": trace_id}
)

except Exception as e:
raise e

return wrapper
try:
# If result is already a Response object
if isinstance(result, Response):
result.headers[TRACEID_RESPONSE_HEADER] = traceID
return result
return JSONResponse(
content=result,
headers={TRACEID_RESPONSE_HEADER: traceID}
)

except Exception as e:
print(f"Error adding trace ID header: {e}")
return result

def pulse_tool(_func=None, *, name=None):
"""
Expand Down Expand Up @@ -290,79 +261,107 @@ def add_session_id_to_span_attributes(kwargs):
print("[pulse_agent] No singlestore-session-id found in baggage.")

def pulse_agent(_func=None, *, name=None):
"""
A decorator that integrates with the SingleStore Pulse agent to associate
session IDs with function calls for tracing purposes. It extracts the
session ID from the `baggage` header if available, or generates a random
session ID if not. The session ID is then set as an association property
for tracing.

Args:
_func (callable, optional): The function to be decorated. Defaults to None.
name (str, optional): The name to be used for the agent. If not provided,
it defaults to the function name.

Returns:
callable: The wrapped function with tracing capabilities.

Notes:
- If a session ID is found in the `baggage` header, it is used for tracing.
- If no session ID is found, a random session ID is generated.
- The `Traceloop.set_association_properties` method is used to set the
session ID as an association property.
- The `agent` function is used to wrap the original function with the
resolved name.

Example:
@pulse_agent(name="my_app")
def my_function(headers):
# Function logic here
pass

@pulse_agent
def my_function(headers):
# Function logic here
pass

# Works with other decorators:
@pulse_agent(name="my_app")
@retry(stop=stop_after_attempt(3))
def my_function(headers):
# Function logic here
pass
"""
def decorator(func):
# Use the provided name or fall back to the function's name
agent_name = name or func.__name__

# Apply the agent decorator to the function
decorated_func = agent(agent_name)(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
add_session_id_to_span_attributes(kwargs)
return decorated_func(*args, **kwargs)

return wrapper

if _func is None:
# Called as @pulse_agent() or @pulse_agent(name="...")
return decorator
elif isinstance(_func, str):
# Called as @pulse_agent("name") - backward compatibility
def wrapper(func):
agent_name = _func
decorated_func = agent(agent_name)(func)

@functools.wraps(func)
def inner(*args, **kwargs):
add_session_id_to_span_attributes(kwargs)
return decorated_func(*args, **kwargs)
return inner
return wrapper
else:
# Called as @pulse_agent (without parentheses)
return decorator(_func)
"""
A decorator that integrates with the SingleStore Pulse agent to associate
session IDs with function calls for tracing purposes. It extracts the
session ID from the `baggage` header if available, or generates a random
session ID if not. The session ID is then set as an association property
for tracing.

Args:
_func (callable, optional): The function to be decorated. Defaults to None.
name (str, optional): The name to be used for the agent. If not provided,
it defaults to the function name.

Returns:
callable: The wrapped function with tracing capabilities.

Notes:
- If a session ID is found in the `baggage` header, it is used for tracing.
- If no session ID is found, a random session ID is generated.
- The `Traceloop.set_association_properties` method is used to set the
session ID as an association property.
- The `agent` function is used to wrap the original function with the
resolved name.

Example:
@pulse_agent(name="my_app")
def my_function(headers):
# Function logic here
pass

@pulse_agent
def my_function(headers):
# Function logic here
pass

# Works with other decorators:
@pulse_agent(name="my_app")
@retry(stop=stop_after_attempt(3))
def my_function(headers):
# Function logic here
pass
"""

def decorator(func):
# Use the provided name or fall back to the function's name
agent_name = name or func.__name__

# Apply the agent decorator to the function
decorated_func = agent(agent_name)(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
add_session_id_to_span_attributes(kwargs)
tracer = trace.get_tracer(agent_name)
with tracer.start_as_current_span(agent_name):
result = decorated_func(*args, **kwargs)

span = get_current_span()
trace_id = span.get_span_context().trace_id

# Convert to hex string (OpenTelemetry trace IDs are 16-byte ints)
trace_id_hex = format(trace_id, "032x")
result = Pulse.add_traceid_header(result, trace_id_hex)
return result

return wrapper

if _func is None:
# Called as @pulse_agent() or @pulse_agent(name="...")
return decorator
elif isinstance(_func, str):
# Called as @pulse_agent("name") - backward compatibility
def wrapper(func):
agent_name = _func
decorated_func = agent(agent_name)(func)

@functools.wraps(func)
def inner(*args, **kwargs):
add_session_id_to_span_attributes(kwargs)
tracer = trace.get_tracer(agent_name)

with tracer.start_as_current_span(agent_name):
result = decorated_func(*args, **kwargs)

span = get_current_span()
trace_id = span.get_span_context().trace_id

trace_id_hex = format(trace_id, "032x")
result = Pulse.add_traceid_header(result, trace_id_hex)
return result
return inner
return wrapper
else:
# Called as @pulse_agent (without parentheses)
return decorator(_func)

def healthcheck():
"""
Health check endpoint for the Pulse service.
Returns a JSON response indicating the service is healthy.
"""
return {"status": "healthy"}

class CustomFileSpanExporter(SpanExporter):
def __init__(self, file_name):
Expand Down