Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation of python async engine, which decouples receiving requ… #2758

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

siddvenk
Copy link
Contributor

@siddvenk siddvenk commented Mar 11, 2025

…ests and sending responses in the python engine

Description

This PR implements an async/decoupled Python engine. This will allow for asynchronous code in the python handler, and enable integrations with LLM engines at a higher level than we do today. It's not exclusive to LLM integration, but that is the main purpose.

The goal of this implementation is to more cleanly integrate with LLM inference engines that have largely adopted the async/await paradigm in their public APIs. It will ideally replace the RollingBatch implementation we have today. Initial tests with vLLM 0.7.1 show a 5-7% performance improvement, all of which can be attributed to our current implementation.

When the frontend receives a request, it sends it to the Python backend and does not expect a response. When the Python backend completes a response, it sends it back to the frontend. The frontend maintains a mapping that allow it to associate the response from python with the client request so that it can send the response to the appropriate connection.

This has been tested via a unit test, and sample vLLM handler.

This is the initial PR, and improvements will be tackled in subsequent PRs. At the moment, this supports:

  • concurrent streaming and non-streaming requests.

The follow up PRs will address:

  • Solid error handling
  • Refactoring to support streaming responses on the Python side in a cleaner API
  • Improved vLLM handler to support
    • context manager for vLLM engine to support graceful termination on engine failures
    • LMI output schema support
    • Refactor some utilities to be shared with other engines

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

Checklist:

  • Please add the link of Integration Tests Executor run with related tests.
  • Have you manually built the docker image and verify the change?
  • Have you run related tests? Check how to set up the test environment here; One example would be pytest tests.py -k "TestCorrectnessLmiDist" -m "lmi_dist"
  • Have you added tests that prove your fix is effective or that this feature works?
  • Has code been commented, particularly in hard-to-understand areas?
  • Have you made corresponding changes to the documentation?

Feature/Issue validation/testing

Please describe the Unit or Integration tests that you ran to verify your changes and relevant result summary. Provide instructions so it can be reproduced.
Please also list any relevant details for your test configuration.

  • Test A
    Logs for Test A

  • Test B
    Logs for Test B

@siddvenk siddvenk force-pushed the py-continuous-batching branch from 77f36db to 1af9265 Compare March 11, 2025 19:57
Comment on lines 165 to 172
args[0] = "mpirun";
args[1] = "-np";
args[2] = String.valueOf(worldSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can also make this a list instead of hard coding the indices from 0 to 51..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think we should take this up in a follow up PR. I'm trying to keep refactoring in this PR to a minimum to avoid non-essential changes from distracting.

@siddvenk siddvenk force-pushed the py-continuous-batching branch 17 times, most recently from 10b9eb0 to 2d6aff2 Compare March 17, 2025 18:38
@siddvenk siddvenk marked this pull request as ready for review March 17, 2025 18:47
@siddvenk siddvenk requested review from zachgk and a team as code owners March 17, 2025 18:47
…ests and sending responses in the python engine
@siddvenk siddvenk force-pushed the py-continuous-batching branch from 2d6aff2 to 1bcd17b Compare March 17, 2025 22:30
if isinstance(response, ErrorResponse):
response_dict["code"] = response.code
response_dict["error"] = response.message
for k, v in inputs.get_properties().items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, reuse var:

Suggested change
for k, v in inputs.get_properties().items():
for k, v in properties.items():

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

await asyncio.get_event_loop().run_in_executor(
executor, check_threads)

asyncio.get_event_loop().run_until_complete(main())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just do asyncio.run() since there is no existing event loop at this point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call i'll update

self.loop = None
# Todo: for async mode we should maybe consider

def receive_requests(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should add a graceful shutdown mechanism to the receive_requests and send_responses threads, right now it is unclear how these threads would behave if the main proc is abruptly shut down

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, definitely. We need graceful shutdown in a few places:

  1. Python Engine - I think if the python process crashes or is terminated, the on the front-end we should receive that signal and send a response for all incomplete outstanding requests. I'd probably put this logic in AsyncRequestManager, similar to how we have something for RollingBatch called shutdown
  2. The vllm_handler - in local testing, i've triggered a few unrecoverable engine errors that at least trigger a python restart, but don't clean up the memory/resources used by vllm. Would be great too if we have some sort of internal health check.

I think I can tackle 1 in this PR, but 2 may need some more thought.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants