Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 22, 2025

📄 33% (0.33x) speedup for run_async in cognee/infrastructure/utils/run_async.py

⏱️ Runtime : 2.76 seconds 2.07 seconds (best of 90 runs)

📝 Explanation and details

The optimization introduces signature caching to eliminate the expensive inspect.signature(func) calls that were consuming 37.4% of the original runtime.

Key changes:

  • Added a global _signature_cache dictionary that maps function IDs to their cached signatures
  • Created _has_loop_param() helper function that checks the cache first before calling inspect.signature()
  • Replaced the inline signature inspection with the cached version

Why this is faster:
inspect.signature() is computationally expensive as it performs deep introspection of function metadata. The line profiler shows this call took 28ms per hit in the original code. By caching signatures based on function id(), subsequent calls to the same function avoid this expensive operation entirely.

The optimization reduced the signature check time from 37.4% to just 7% of total runtime, resulting in a 33% overall speedup (2.76s → 2.07s).

Best suited for:

  • High-volume concurrent scenarios (like the 100+ concurrent call tests) where the same functions are executed repeatedly
  • Sustained execution patterns where functions are called multiple times in sequence
  • Any workload with function reuse, as the cache eliminates redundant signature inspections

The throughput improvement is modest (0.3%) because the executor overhead dominates in concurrent scenarios, but the runtime improvement is substantial for CPU-bound signature inspection bottlenecks.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 953 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 77.8%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
import inspect
import threading  # used to check thread identity in edge cases
import time  # used for timing throughput tests
from functools import partial

import pytest  # used for our unit tests
from cognee.infrastructure.utils.run_async import \
    run_async  # --- End: run_async function ---

# =========================
# Basic Test Cases
# =========================

@pytest.mark.asyncio
async def test_run_async_basic_return_value():
    # Test that run_async returns the correct value for a simple function
    def add(a, b):
        return a + b
    result = await run_async(add, 2, 3)

@pytest.mark.asyncio
async def test_run_async_basic_kwargs():
    # Test that run_async correctly passes kwargs
    def greet(name, greeting="Hello"):
        return f"{greeting}, {name}!"
    result = await run_async(greet, "Alice", greeting="Hi")

@pytest.mark.asyncio
async def test_run_async_basic_no_args():
    # Test that run_async works with a function that takes no arguments
    def get_number():
        return 42
    result = await run_async(get_number)

@pytest.mark.asyncio
async def test_run_async_basic_with_loop_param():
    # Test that run_async passes the loop argument if required
    def func_with_loop(x, loop):
        return x * 2
    result = await run_async(func_with_loop, 7)


# =========================
# Edge Test Cases
# =========================

@pytest.mark.asyncio
async def test_run_async_edge_exception_propagation():
    # Test that exceptions raised in the function are propagated
    def fail_func():
        raise ValueError("Intentional error")
    with pytest.raises(ValueError, match="Intentional error"):
        await run_async(fail_func)

@pytest.mark.asyncio
async def test_run_async_edge_concurrent_execution():
    # Test that concurrent run_async calls work correctly
    def multiply(x, y):
        return x * y
    coros = [run_async(multiply, i, i+1) for i in range(5)]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_run_async_edge_executor_thread():
    # Test that run_async actually runs the function in a different thread (not main thread)
    def get_thread_id():
        return threading.get_ident()
    main_thread_id = threading.get_ident()
    result_thread_id = await run_async(get_thread_id)

@pytest.mark.asyncio

async def test_run_async_edge_function_with_kwargs_and_loop():
    # Test a function that requires both kwargs and loop
    def func(x, y=10, loop=None):
        return x + y
    result = await run_async(func, 5, y=15)

@pytest.mark.asyncio
async def test_run_async_edge_function_with_many_args():
    # Test a function with many positional arguments
    def sum_many(*args):
        return sum(args)
    nums = list(range(10))
    result = await run_async(sum_many, *nums)


# =========================
# Large Scale Test Cases
# =========================

@pytest.mark.asyncio
async def test_run_async_large_scale_many_concurrent_calls():
    # Test run_async with many concurrent calls (up to 100)
    def double(x):
        return x * 2
    coros = [run_async(double, i) for i in range(100)]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_run_async_large_scale_large_data_structure():
    # Test run_async with a function that processes a large list
    def sum_list(lst):
        return sum(lst)
    data = list(range(500))
    result = await run_async(sum_list, data)

@pytest.mark.asyncio

async def test_run_async_throughput_small_load():
    # Test throughput with a small number of fast calls
    def fast_func(x):
        return x + 1
    coros = [run_async(fast_func, i) for i in range(10)]
    start = time.perf_counter()
    results = await asyncio.gather(*coros)
    duration = time.perf_counter() - start

@pytest.mark.asyncio
async def test_run_async_throughput_medium_load():
    # Test throughput with a medium number of calls
    def fast_func(x):
        return x * x
    coros = [run_async(fast_func, i) for i in range(50)]
    start = time.perf_counter()
    results = await asyncio.gather(*coros)
    duration = time.perf_counter() - start

@pytest.mark.asyncio
async def test_run_async_throughput_high_volume():
    # Test throughput with a high volume of concurrent calls (up to 200)
    def fast_func(x):
        return x - 1
    coros = [run_async(fast_func, i) for i in range(200)]
    start = time.perf_counter()
    results = await asyncio.gather(*coros)
    duration = time.perf_counter() - start

@pytest.mark.asyncio
async def test_run_async_throughput_sustained_execution_pattern():
    # Test sustained execution pattern with sequential batches
    def increment(x):
        return x + 1
    total = 0
    start = time.perf_counter()
    for batch in range(5):
        coros = [run_async(increment, i + batch * 20) for i in range(20)]
        results = await asyncio.gather(*coros)
        total += sum(results)
    duration = time.perf_counter() - start
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions
import inspect
import time  # used for timing throughput tests
from functools import partial

import pytest  # used for our unit tests
from cognee.infrastructure.utils.run_async import \
    run_async  # ----------------------------------------------------------

# ------------------ BASIC TEST CASES ----------------------

@pytest.mark.asyncio
async def test_run_async_basic_return_value():
    # Test that run_async returns the correct value for a simple function
    def add(a, b):
        return a + b
    result = await run_async(add, 2, 3)

@pytest.mark.asyncio
async def test_run_async_basic_kwargs():
    # Test that run_async works with keyword arguments
    def concat(a, b, sep=' '):
        return f"{a}{sep}{b}"
    result = await run_async(concat, "Hello", "World", sep=", ")

@pytest.mark.asyncio
async def test_run_async_basic_no_args():
    # Test a function with no arguments
    def give_five():
        return 5
    result = await run_async(give_five)

@pytest.mark.asyncio
async def test_run_async_basic_with_loop_param():
    # Test a function that expects a 'loop' argument
    def loop_echo(x, loop=None):
        # Just return the loop object type for verification
        return (x, type(loop))
    result = await run_async(loop_echo, "test")

# ------------------ EDGE TEST CASES -----------------------

@pytest.mark.asyncio
async def test_run_async_concurrent_execution():
    # Test concurrent execution of multiple functions
    def multiply(x, y):
        return x * y
    def add(x, y):
        return x + y
    results = await asyncio.gather(
        run_async(multiply, 2, 5),
        run_async(add, 10, 15)
    )

@pytest.mark.asyncio
async def test_run_async_exception_handling():
    # Test that exceptions raised in the function are propagated
    def raise_error():
        raise ValueError("Intentional error")
    with pytest.raises(ValueError, match="Intentional error"):
        await run_async(raise_error)

@pytest.mark.asyncio
async def test_run_async_function_with_various_types():
    # Test with different types of arguments
    def types_demo(a, b, c=None):
        return (type(a), type(b), type(c))
    result = await run_async(types_demo, 1, "str", c=[1,2,3])

@pytest.mark.asyncio
async def test_run_async_function_with_large_kwargs():
    # Test with a large number of keyword arguments
    def sum_kwargs(**kwargs):
        return sum(kwargs.values())
    kwargs = {f'k{i}': i for i in range(10)}
    result = await run_async(sum_kwargs, **kwargs)

@pytest.mark.asyncio

async def test_run_async_large_scale_concurrent_calls():
    # Test run_async with many concurrent calls
    def square(x):
        return x * x
    tasks = [run_async(square, i) for i in range(100)]
    results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_run_async_large_scale_mixed_functions():
    # Test run_async with different functions concurrently
    def inc(x): return x + 1
    def dec(x): return x - 1
    def mul(x): return x * 2
    tasks = []
    for i in range(30):
        tasks.append(run_async(inc, i))
        tasks.append(run_async(dec, i))
        tasks.append(run_async(mul, i))
    results = await asyncio.gather(*tasks)
    for i in range(30):
        pass

# ------------------ THROUGHPUT TEST CASES -----------------

@pytest.mark.asyncio
async def test_run_async_throughput_small_load():
    # Throughput test: small load
    def add(a, b): return a + b
    tasks = [run_async(add, i, i+1) for i in range(10)]
    start = time.time()
    results = await asyncio.gather(*tasks)
    duration = time.time() - start

@pytest.mark.asyncio
async def test_run_async_throughput_medium_load():
    # Throughput test: medium load
    def mul(a, b): return a * b
    tasks = [run_async(mul, i, i+2) for i in range(50)]
    start = time.time()
    results = await asyncio.gather(*tasks)
    duration = time.time() - start

@pytest.mark.asyncio
async def test_run_async_throughput_high_volume():
    # Throughput test: high volume, but bounded to 200 tasks
    def sub(a, b): return a - b
    tasks = [run_async(sub, i, i//2) for i in range(200)]
    start = time.time()
    results = await asyncio.gather(*tasks)
    duration = time.time() - start

@pytest.mark.asyncio
async def test_run_async_throughput_sustained_execution():
    # Throughput test: sustained execution pattern, repeated calls
    def echo(x): return x
    total = 0
    start = time.time()
    for i in range(20):
        result = await run_async(echo, i)
        total += result
    duration = time.time() - start
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-run_async-mh2k85js and push.

Codeflash

The optimization introduces **signature caching** to eliminate the expensive `inspect.signature(func)` calls that were consuming 37.4% of the original runtime.

**Key changes:**
- Added a global `_signature_cache` dictionary that maps function IDs to their cached signatures
- Created `_has_loop_param()` helper function that checks the cache first before calling `inspect.signature()`
- Replaced the inline signature inspection with the cached version

**Why this is faster:**
`inspect.signature()` is computationally expensive as it performs deep introspection of function metadata. The line profiler shows this call took 28ms per hit in the original code. By caching signatures based on function `id()`, subsequent calls to the same function avoid this expensive operation entirely.

The optimization reduced the signature check time from 37.4% to just 7% of total runtime, resulting in a **33% overall speedup** (2.76s → 2.07s).

**Best suited for:**
- **High-volume concurrent scenarios** (like the 100+ concurrent call tests) where the same functions are executed repeatedly
- **Sustained execution patterns** where functions are called multiple times in sequence
- Any workload with function reuse, as the cache eliminates redundant signature inspections

The throughput improvement is modest (0.3%) because the executor overhead dominates in concurrent scenarios, but the runtime improvement is substantial for CPU-bound signature inspection bottlenecks.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 22, 2025 22:23
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Oct 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant