|
| 1 | +# Openlayer Duplicate Traces Analysis & Solutions |
| 2 | + |
| 3 | +## Problem Summary |
| 4 | + |
| 5 | +Your code is generating duplicate traces in Openlayer because you're using **both** `@trace()` decorator **and** `trace_async_openai()` simultaneously. This creates two separate traces: |
| 6 | + |
| 7 | +1. **Function-level trace** from `@trace()`: Captures the async generator object as output (not useful) |
| 8 | +2. **OpenAI-level trace** from `trace_async_openai()`: Captures only the OpenAI response without function context |
| 9 | + |
| 10 | +## Root Cause Analysis |
| 11 | + |
| 12 | +### Issue 1: Async Generator Handling |
| 13 | +The `@trace()` and `trace_async()` decorators don't properly handle async generators. They capture the generator object itself as the output, not the actual streamed content. |
| 14 | + |
| 15 | +```python |
| 16 | +# Current behavior in tracer.py |
| 17 | +output = await func(*func_args, **func_kwargs) # This returns <async_generator> |
| 18 | +step.log(output=output) # Logs the generator object, not content |
| 19 | +``` |
| 20 | + |
| 21 | +### Issue 2: Double Tracing |
| 22 | +- `@trace()` creates a user-level trace for your `hi()` function |
| 23 | +- `trace_async_openai()` creates an OpenAI-specific trace for the API call |
| 24 | +- Both traces are independent and don't coordinate |
| 25 | + |
| 26 | +## Solutions |
| 27 | + |
| 28 | +### Solution 1: Use Only Client-Level Tracing (Recommended) |
| 29 | + |
| 30 | +Remove the `@trace()` decorator and rely solely on `trace_async_openai()`: |
| 31 | + |
| 32 | +```python |
| 33 | +import asyncio |
| 34 | +from openai import AsyncOpenAI |
| 35 | +from openlayer.lib import trace_async_openai |
| 36 | + |
| 37 | +class say_hi: |
| 38 | + def __init__(self): |
| 39 | + self.openai_client = trace_async_openai(AsyncOpenAI()) |
| 40 | + |
| 41 | + # Remove @trace() decorator |
| 42 | + async def hi(self, cur_str: str): |
| 43 | + messages = [ |
| 44 | + {"role": "system", "content": "say hi !"}, |
| 45 | + {"role": "user", "content": cur_str} |
| 46 | + ] |
| 47 | + response = await self.openai_client.chat.completions.create( |
| 48 | + model="gpt-3.5-turbo-16k", |
| 49 | + messages=messages, |
| 50 | + temperature=0, |
| 51 | + max_tokens=100, |
| 52 | + stream=True, |
| 53 | + ) |
| 54 | + complete_answer = "" |
| 55 | + async for chunk in response: |
| 56 | + delta = chunk.choices[0].delta |
| 57 | + if hasattr(delta, "content") and delta.content: |
| 58 | + chunk_content = delta.content |
| 59 | + complete_answer += chunk_content |
| 60 | + yield chunk_content |
| 61 | +``` |
| 62 | + |
| 63 | +### Solution 2: Use Only Function-Level Tracing |
| 64 | + |
| 65 | +Remove `trace_async_openai()` and use only `@trace_async()` with a non-streaming approach: |
| 66 | + |
| 67 | +```python |
| 68 | +import asyncio |
| 69 | +from openai import AsyncOpenAI |
| 70 | +from openlayer.lib.tracing.tracer import trace_async |
| 71 | + |
| 72 | +class say_hi: |
| 73 | + def __init__(self): |
| 74 | + self.openai_client = AsyncOpenAI() # No tracing wrapper |
| 75 | + |
| 76 | + @trace_async() |
| 77 | + async def hi(self, cur_str: str) -> str: # Return string, not generator |
| 78 | + messages = [ |
| 79 | + {"role": "system", "content": "say hi !"}, |
| 80 | + {"role": "user", "content": cur_str} |
| 81 | + ] |
| 82 | + response = await self.openai_client.chat.completions.create( |
| 83 | + model="gpt-3.5-turbo-16k", |
| 84 | + messages=messages, |
| 85 | + temperature=0, |
| 86 | + max_tokens=100, |
| 87 | + stream=True, # Still stream internally |
| 88 | + ) |
| 89 | + complete_answer = "" |
| 90 | + async for chunk in response: |
| 91 | + delta = chunk.choices[0].delta |
| 92 | + if hasattr(delta, "content") and delta.content: |
| 93 | + complete_answer += delta.content |
| 94 | + return complete_answer # Return complete response |
| 95 | +``` |
| 96 | + |
| 97 | +### Solution 3: Custom Async Streaming Decorator (Advanced) |
| 98 | + |
| 99 | +Create a specialized decorator that properly handles async generators: |
| 100 | + |
| 101 | +```python |
| 102 | +import asyncio |
| 103 | +import inspect |
| 104 | +import time |
| 105 | +from functools import wraps |
| 106 | +from typing import AsyncGenerator, Any |
| 107 | +from openlayer.lib.tracing.tracer import create_step |
| 108 | + |
| 109 | +def trace_async_streaming( |
| 110 | + *step_args, |
| 111 | + inference_pipeline_id: str = None, |
| 112 | + **step_kwargs, |
| 113 | +): |
| 114 | + """Decorator specifically for async streaming functions.""" |
| 115 | + |
| 116 | + def decorator(func): |
| 117 | + func_signature = inspect.signature(func) |
| 118 | + |
| 119 | + @wraps(func) |
| 120 | + async def wrapper(*func_args, **func_kwargs): |
| 121 | + if step_kwargs.get("name") is None: |
| 122 | + step_kwargs["name"] = func.__name__ |
| 123 | + |
| 124 | + with create_step( |
| 125 | + *step_args, |
| 126 | + inference_pipeline_id=inference_pipeline_id, |
| 127 | + **step_kwargs |
| 128 | + ) as step: |
| 129 | + # Bind arguments |
| 130 | + bound = func_signature.bind(*func_args, **func_kwargs) |
| 131 | + bound.apply_defaults() |
| 132 | + inputs = dict(bound.arguments) |
| 133 | + inputs.pop("self", None) |
| 134 | + inputs.pop("cls", None) |
| 135 | + |
| 136 | + # Execute the async generator |
| 137 | + async_gen = func(*func_args, **func_kwargs) |
| 138 | + collected_output = [] |
| 139 | + |
| 140 | + async def traced_generator(): |
| 141 | + try: |
| 142 | + async for chunk in async_gen: |
| 143 | + collected_output.append(str(chunk)) |
| 144 | + yield chunk |
| 145 | + except Exception as exc: |
| 146 | + step.log(metadata={"Exceptions": str(exc)}) |
| 147 | + raise |
| 148 | + finally: |
| 149 | + # Log the complete output |
| 150 | + end_time = time.time() |
| 151 | + latency = (end_time - step.start_time) * 1000 |
| 152 | + complete_output = "".join(collected_output) |
| 153 | + |
| 154 | + step.log( |
| 155 | + inputs=inputs, |
| 156 | + output=complete_output, |
| 157 | + end_time=end_time, |
| 158 | + latency=latency, |
| 159 | + ) |
| 160 | + |
| 161 | + return traced_generator() |
| 162 | + return wrapper |
| 163 | + return decorator |
| 164 | + |
| 165 | +# Usage: |
| 166 | +class say_hi: |
| 167 | + def __init__(self): |
| 168 | + self.openai_client = AsyncOpenAI() # No trace_async_openai |
| 169 | + |
| 170 | + @trace_async_streaming() |
| 171 | + async def hi(self, cur_str: str): |
| 172 | + messages = [ |
| 173 | + {"role": "system", "content": "say hi !"}, |
| 174 | + {"role": "user", "content": cur_str} |
| 175 | + ] |
| 176 | + response = await self.openai_client.chat.completions.create( |
| 177 | + model="gpt-3.5-turbo-16k", |
| 178 | + messages=messages, |
| 179 | + temperature=0, |
| 180 | + max_tokens=100, |
| 181 | + stream=True, |
| 182 | + ) |
| 183 | + async for chunk in response: |
| 184 | + delta = chunk.choices[0].delta |
| 185 | + if hasattr(delta, "content") and delta.content: |
| 186 | + yield delta.content |
| 187 | +``` |
| 188 | + |
| 189 | +## Recommended Approach |
| 190 | + |
| 191 | +**Use Solution 1** (client-level tracing only) because: |
| 192 | + |
| 193 | +1. **Simplest**: Just remove the `@trace()` decorator |
| 194 | +2. **Most reliable**: `trace_async_openai()` is specifically designed for streaming |
| 195 | +3. **Complete data**: Captures all OpenAI-specific metrics (tokens, cost, etc.) |
| 196 | +4. **Less error-prone**: Avoids the complexity of handling async generators |
| 197 | + |
| 198 | +## Why `trace_async()` Doesn't Work Well |
| 199 | + |
| 200 | +The current `trace_async()` implementation has these limitations: |
| 201 | + |
| 202 | +1. **Generator object capture**: It captures `<async_generator>` as output, not the actual content |
| 203 | +2. **Timing issues**: It completes before the generator is fully consumed |
| 204 | +3. **No streaming awareness**: It doesn't understand that the function yields values over time |
| 205 | + |
| 206 | +## Testing Your Fix |
| 207 | + |
| 208 | +After implementing Solution 1, you should see: |
| 209 | +- **Single trace** per function call |
| 210 | +- **Complete output** showing the full generated response |
| 211 | +- **Proper timing** and token counts |
| 212 | +- **No duplicate entries** in Openlayer |
| 213 | + |
| 214 | +## Future Improvements |
| 215 | + |
| 216 | +Consider contributing to the Openlayer project by: |
| 217 | +1. Improving async generator handling in the decorators |
| 218 | +2. Adding detection for double-tracing scenarios |
| 219 | +3. Creating specialized decorators for streaming functions |
0 commit comments