diff --git a/dist/singlestore_pulse-0.1-py3-none-any.whl b/dist/singlestore_pulse-0.1-py3-none-any.whl index e90048f..b581a93 100644 Binary files a/dist/singlestore_pulse-0.1-py3-none-any.whl and b/dist/singlestore_pulse-0.1-py3-none-any.whl differ diff --git a/dist/singlestore_pulse-0.1.tar.gz b/dist/singlestore_pulse-0.1.tar.gz index 1088733..20fd85a 100644 Binary files a/dist/singlestore_pulse-0.1.tar.gz and b/dist/singlestore_pulse-0.1.tar.gz differ diff --git a/pulse_otel/examples/time_agent.py b/pulse_otel/examples/time_agent.py index 75c812f..dc2a781 100644 --- a/pulse_otel/examples/time_agent.py +++ b/pulse_otel/examples/time_agent.py @@ -74,7 +74,7 @@ def get_current_time(): return datetime.datetime.now().strftime("%H:%M:%S") # Define a new tool: a function to get the current date -@pulse_tool("toolA") +@pulse_tool() def get_current_date(): logger.critical("CRITICAL LOGS of get_current_date") logger.debug("DEBUG LOGS of get_current_date") @@ -99,7 +99,7 @@ def get_funny_timestamp_phrase(funny_timestamp): return f"Here is a funny timestamp: {funny_timestamp}" # Simple agent function to process user input and decide on tool use -@pulse_agent("Myagent") +@pulse_agent() def agent_run(prompt): messages = [{"role": "user", "content": prompt}] diff --git a/pulse_otel/main.py b/pulse_otel/main.py index 51e7aa0..313844e 100644 --- a/pulse_otel/main.py +++ b/pulse_otel/main.py @@ -1,4 +1,5 @@ import functools +import json import os from traceloop.sdk import Traceloop from traceloop.sdk.decorators import agent, tool @@ -105,7 +106,7 @@ def __init__(self, write_to_file: bool = False, write_to_traceloop: bool = False """ A LoggingHandler is then created, configured to capture logs at the DEBUG level and to use the custom logger provider. The Python logging system is configured via logging.basicConfig to use this handler and to set the root logger’s level to INFO. This means all logs at INFO level or higher will be processed and sent to the OTLP collector, while the handler itself is capable of handling DEBUG logs if needed. """ - handler = LoggingHandler(level=logging.DEBUG, logger_provider=log_provider) + otel_handler = LoggingHandler(level=logging.DEBUG, logger_provider=log_provider) """ In Python logging, both the logger and the handler have their own log levels, and both levels must be satisfied for a log record to be processed and exported. @@ -116,7 +117,21 @@ def __init__(self, write_to_file: bool = False, write_to_traceloop: bool = False 2. Root Logger Level (logging.basicConfig(level=logging.INFO, ...)): This sets the minimum level for the root logger. Only log records at INFO level and above will be passed from the logger to the handler. """ - logging.basicConfig(level=logging.INFO, handlers=[handler]) + # logging.basicConfig(level=logging.INFO, handlers=[handler]) + # Attach to vllm logger + vllm_logger = logging.getLogger("vllm") + vllm_logger.addHandler(otel_handler) + vllm_logger.setLevel(logging.DEBUG) + + + # Register your custom handler with the logging system + # logging.root.handlers = [otlp_handler] + # logging._handlerList.append(otlp_handler) + # logging._handlers["otlp_handler"] = otlp_handler + + with open("vllm_logging_config.json", "r") as f: + config = json.load(f) + logging.config.dictConfig(config) Traceloop.init( disable_batch=True, @@ -212,74 +227,125 @@ async def wrapper(request: Request, *args, **kwargs) -> Response: raise e return wrapper + # Decorator to register a function as a tool. Can be used as @pulse_tool or @pulse_tool("name"). + # If no argument is passed, uses the function name as the tool name. - -def pulse_tool(func): - """ - A decorator that wraps a given function with a third-party `tool` decorator - while preserving the original function's metadata. - +def pulse_tool(_func=None, name=None): + """ + Decorator to register a function as a tool. Can be used as @pulse_tool, @pulse_tool("name"), or @pulse_tool(name="name"). + If no argument is passed, uses the function name as the tool name. Args: - func (Callable): The function to be wrapped. - + _func: The function to be decorated. + tool_name: Optional name for the tool. If not provided, the function name is used. Returns: - Callable: The wrapped function with preserved metadata. + A decorator that registers the function as a tool with the specified name. + + Usage: + @pulse_tool("my_tool") + def my_function(): + # Function implementation + + @pulse_tool + def my_function(): + # Function implementation + """ + def decorator(func): + tool_name = name or func.__name__ + decorated_func = tool(tool_name)(func) + @functools.wraps(func) + def wrapper(*args, **kwargs): + return decorated_func(*args, **kwargs) + return wrapper + + # Handle @pulse_tool("name") + if isinstance(_func, str): + return decorator_with_name(_func) + # Handle @pulse_tool or @pulse_tool(name="name") + if _func is None: + return decorator + else: + return decorator(_func) + +def decorator_with_name(tool_name): + def wrapper(func): + decorated_func = tool(tool_name)(func) + @functools.wraps(func) + def inner(*args, **kwargs): + return decorated_func(*args, **kwargs) + return inner + return wrapper + + +def pulse_agent(_func=None, *, name=None): """ - # Wrap the original function with the third-party decorator - decorated_func = tool(func) + A decorator that integrates with the SingleStore Pulse agent to associate + session IDs with function calls for tracing purposes. It extracts the + session ID from the `baggage` header if available, or generates a random + session ID if not. The session ID is then set as an association property + for tracing. - # Preserve metadata and return - @functools.wraps(func) - def wrapper(*args, **kwargs): - return decorated_func(*args, **kwargs) - - return wrapper + Args: + _func (callable, optional): The function to be decorated. Defaults to None. + name (str, optional): The name to be used for the agent. If not provided, + it defaults to the value of the `SINGLESTOREDB_APP_NAME` environment variable. -def pulse_agent(func): + Returns: + callable: The wrapped function with tracing capabilities. + + Notes: + - If a session ID is found in the `baggage` header, it is used for tracing. + - If no session ID is found, a random session ID is generated. + - The `Traceloop.set_association_properties` method is used to set the + session ID as an association property. + - The `agent` function is used to wrap the original function with the + resolved name. + + Example: + @pulse_agent(name="my_app") + def my_function(headers): + # Function logic here + pass + + @pulse_agent + def my_function(headers): + # Function logic here + pass """ - A decorator that wraps a function to extract a `singlestore-session-id` from the - `baggage` header in the keyword arguments (if present) and associates it with - Traceloop properties. + def decorator(func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + session_id = None + if 'headers' in kwargs: + headers = kwargs['headers'] + baggage = headers.get('baggage') + if baggage: + parts = [item.strip() for item in baggage.split(',')] + for part in parts: + if '=' in part: + key, value = part.split('=', 1) + if key.strip() == HEADER_INCOMING_SESSION_ID: + session_id = value.strip() + break + + if session_id: + properties = {SESSION_ID: session_id} + Traceloop.set_association_properties(properties) + print(f"[pulse_agent] singlestore-session-id: {session_id}") + else: + random_session_id = random.randint(10**15, 10**16 - 1) + properties = {SESSION_ID: str(random_session_id)} + Traceloop.set_association_properties(properties) + print("[pulse_agent] No singlestore-session-id found in baggage.") - The decorated function is then wrapped with the `agent` decorator. + resolved_name = name or os.getenv("SINGLESTOREDB_APP_NAME", "MY_APP_NAME") + return agent(resolved_name)(func)(*args, **kwargs) - Args: - func (Callable): The function to be decorated. + return wrapped - Returns: - Callable: The wrapped function with additional functionality for handling - `singlestore-session-id` and associating it with Traceloop properties. - """ - @functools.wraps(func) - def wrapped(*args, **kwargs): - session_id = None - if 'headers' in kwargs: - headers = kwargs['headers'] - baggage = headers.get('baggage') - if baggage: - # baggage header is a comma-separated string of key=value pairs - # example: baggage: key1=value1;property1;property2, key2 = value2, key3=value3; propertyKey=propertyValue - parts = [item.strip() for item in baggage.split(',')] - for part in parts: - if '=' in part: - key, value = part.split('=', 1) - if key.strip() == HEADER_INCOMING_SESSION_ID: - session_id = value.strip() - break - - if session_id: - properties = {SESSION_ID: session_id} - Traceloop.set_association_properties(properties) - print(f"[pulse_agent] singlestore-session-id: {session_id}") - else: - random_session_id = random.randint(10**15, 10**16 - 1) - properties = {SESSION_ID: str(random_session_id)} - Traceloop.set_association_properties(properties) - print("[pulse_agent] No singlestore-session-id found in baggage.") - - return agent(func)(*args, **kwargs) - - return wrapped + if _func is None: + return decorator + else: + return decorator(_func) class CustomFileSpanExporter(SpanExporter): diff --git a/pulse_otel/vllm_logging_config.json b/pulse_otel/vllm_logging_config.json new file mode 100644 index 0000000..696e2cb --- /dev/null +++ b/pulse_otel/vllm_logging_config.json @@ -0,0 +1,21 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "handlers": { + "otel": { + "class": "logging.NullHandler" + } + }, + "root": { + "level": "INFO", + "handlers": ["otel"] + }, + "loggers": { + "vllm": { + "level": "DEBUG", + "handlers": ["otel"], + "propagate": false + } + } + } + \ No newline at end of file