Skip to content

vllm logs instrumentation #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified dist/singlestore_pulse-0.1-py3-none-any.whl
Binary file not shown.
Binary file modified dist/singlestore_pulse-0.1.tar.gz
Binary file not shown.
4 changes: 2 additions & 2 deletions pulse_otel/examples/time_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_current_time():
return datetime.datetime.now().strftime("%H:%M:%S")

# Define a new tool: a function to get the current date
@pulse_tool("toolA")
@pulse_tool()
def get_current_date():
logger.critical("CRITICAL LOGS of get_current_date")
logger.debug("DEBUG LOGS of get_current_date")
Expand All @@ -99,7 +99,7 @@ def get_funny_timestamp_phrase(funny_timestamp):
return f"Here is a funny timestamp: {funny_timestamp}"

# Simple agent function to process user input and decide on tool use
@pulse_agent("Myagent")
@pulse_agent()
def agent_run(prompt):
messages = [{"role": "user", "content": prompt}]

Expand Down
186 changes: 126 additions & 60 deletions pulse_otel/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import json
import os
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import agent, tool
Expand Down Expand Up @@ -105,7 +106,7 @@ def __init__(self, write_to_file: bool = False, write_to_traceloop: bool = False
"""
A LoggingHandler is then created, configured to capture logs at the DEBUG level and to use the custom logger provider. The Python logging system is configured via logging.basicConfig to use this handler and to set the root logger’s level to INFO. This means all logs at INFO level or higher will be processed and sent to the OTLP collector, while the handler itself is capable of handling DEBUG logs if needed.
"""
handler = LoggingHandler(level=logging.DEBUG, logger_provider=log_provider)
otel_handler = LoggingHandler(level=logging.DEBUG, logger_provider=log_provider)

"""
In Python logging, both the logger and the handler have their own log levels, and both levels must be satisfied for a log record to be processed and exported.
Expand All @@ -116,7 +117,21 @@ def __init__(self, write_to_file: bool = False, write_to_traceloop: bool = False
2. Root Logger Level (logging.basicConfig(level=logging.INFO, ...)):
This sets the minimum level for the root logger. Only log records at INFO level and above will be passed from the logger to the handler.
"""
logging.basicConfig(level=logging.INFO, handlers=[handler])
# logging.basicConfig(level=logging.INFO, handlers=[handler])
# Attach to vllm logger
vllm_logger = logging.getLogger("vllm")
vllm_logger.addHandler(otel_handler)
vllm_logger.setLevel(logging.DEBUG)


# Register your custom handler with the logging system
# logging.root.handlers = [otlp_handler]
# logging._handlerList.append(otlp_handler)
# logging._handlers["otlp_handler"] = otlp_handler

with open("vllm_logging_config.json", "r") as f:
config = json.load(f)
logging.config.dictConfig(config)

Traceloop.init(
disable_batch=True,
Expand Down Expand Up @@ -212,74 +227,125 @@ async def wrapper(request: Request, *args, **kwargs) -> Response:
raise e

return wrapper
# Decorator to register a function as a tool. Can be used as @pulse_tool or @pulse_tool("name").
# If no argument is passed, uses the function name as the tool name.


def pulse_tool(func):
"""
A decorator that wraps a given function with a third-party `tool` decorator
while preserving the original function's metadata.

def pulse_tool(_func=None, name=None):
"""
Decorator to register a function as a tool. Can be used as @pulse_tool, @pulse_tool("name"), or @pulse_tool(name="name").
If no argument is passed, uses the function name as the tool name.
Args:
func (Callable): The function to be wrapped.

_func: The function to be decorated.
tool_name: Optional name for the tool. If not provided, the function name is used.
Returns:
Callable: The wrapped function with preserved metadata.
A decorator that registers the function as a tool with the specified name.

Usage:
@pulse_tool("my_tool")
def my_function():
# Function implementation

@pulse_tool
def my_function():
# Function implementation
"""
def decorator(func):
tool_name = name or func.__name__
decorated_func = tool(tool_name)(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
return decorated_func(*args, **kwargs)
return wrapper

# Handle @pulse_tool("name")
if isinstance(_func, str):
return decorator_with_name(_func)
# Handle @pulse_tool or @pulse_tool(name="name")
if _func is None:
return decorator
else:
return decorator(_func)

def decorator_with_name(tool_name):
def wrapper(func):
decorated_func = tool(tool_name)(func)
@functools.wraps(func)
def inner(*args, **kwargs):
return decorated_func(*args, **kwargs)
return inner
return wrapper


def pulse_agent(_func=None, *, name=None):
"""
# Wrap the original function with the third-party decorator
decorated_func = tool(func)
A decorator that integrates with the SingleStore Pulse agent to associate
session IDs with function calls for tracing purposes. It extracts the
session ID from the `baggage` header if available, or generates a random
session ID if not. The session ID is then set as an association property
for tracing.

# Preserve metadata and return
@functools.wraps(func)
def wrapper(*args, **kwargs):
return decorated_func(*args, **kwargs)

return wrapper
Args:
_func (callable, optional): The function to be decorated. Defaults to None.
name (str, optional): The name to be used for the agent. If not provided,
it defaults to the value of the `SINGLESTOREDB_APP_NAME` environment variable.

def pulse_agent(func):
Returns:
callable: The wrapped function with tracing capabilities.

Notes:
- If a session ID is found in the `baggage` header, it is used for tracing.
- If no session ID is found, a random session ID is generated.
- The `Traceloop.set_association_properties` method is used to set the
session ID as an association property.
- The `agent` function is used to wrap the original function with the
resolved name.

Example:
@pulse_agent(name="my_app")
def my_function(headers):
# Function logic here
pass

@pulse_agent
def my_function(headers):
# Function logic here
pass
"""
A decorator that wraps a function to extract a `singlestore-session-id` from the
`baggage` header in the keyword arguments (if present) and associates it with
Traceloop properties.
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
session_id = None
if 'headers' in kwargs:
headers = kwargs['headers']
baggage = headers.get('baggage')
if baggage:
parts = [item.strip() for item in baggage.split(',')]
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
if key.strip() == HEADER_INCOMING_SESSION_ID:
session_id = value.strip()
break

if session_id:
properties = {SESSION_ID: session_id}
Traceloop.set_association_properties(properties)
print(f"[pulse_agent] singlestore-session-id: {session_id}")
else:
random_session_id = random.randint(10**15, 10**16 - 1)
properties = {SESSION_ID: str(random_session_id)}
Traceloop.set_association_properties(properties)
print("[pulse_agent] No singlestore-session-id found in baggage.")

The decorated function is then wrapped with the `agent` decorator.
resolved_name = name or os.getenv("SINGLESTOREDB_APP_NAME", "MY_APP_NAME")
return agent(resolved_name)(func)(*args, **kwargs)

Args:
func (Callable): The function to be decorated.
return wrapped

Returns:
Callable: The wrapped function with additional functionality for handling
`singlestore-session-id` and associating it with Traceloop properties.
"""
@functools.wraps(func)
def wrapped(*args, **kwargs):
session_id = None
if 'headers' in kwargs:
headers = kwargs['headers']
baggage = headers.get('baggage')
if baggage:
# baggage header is a comma-separated string of key=value pairs
# example: baggage: key1=value1;property1;property2, key2 = value2, key3=value3; propertyKey=propertyValue
parts = [item.strip() for item in baggage.split(',')]
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
if key.strip() == HEADER_INCOMING_SESSION_ID:
session_id = value.strip()
break

if session_id:
properties = {SESSION_ID: session_id}
Traceloop.set_association_properties(properties)
print(f"[pulse_agent] singlestore-session-id: {session_id}")
else:
random_session_id = random.randint(10**15, 10**16 - 1)
properties = {SESSION_ID: str(random_session_id)}
Traceloop.set_association_properties(properties)
print("[pulse_agent] No singlestore-session-id found in baggage.")

return agent(func)(*args, **kwargs)

return wrapped
if _func is None:
return decorator
else:
return decorator(_func)


class CustomFileSpanExporter(SpanExporter):
Expand Down
21 changes: 21 additions & 0 deletions pulse_otel/vllm_logging_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"version": 1,
"disable_existing_loggers": false,
"handlers": {
"otel": {
"class": "logging.NullHandler"
}
},
"root": {
"level": "INFO",
"handlers": ["otel"]
},
"loggers": {
"vllm": {
"level": "DEBUG",
"handlers": ["otel"],
"propagate": false
}
}
}