diff --git a/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb new file mode 100644 index 000000000..02d28b137 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb @@ -0,0 +1,188 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Remote Offline Batch Inference with Ray Data & vLLM Example\n", + "\n", + "This notebook presumes:\n", + "- You have a Ray Cluster URL given to you to run workloads on\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from codeflare_sdk import RayJobClient\n", + "\n", + "# Setup Authentication Configuration\n", + "auth_token = \"XXXX\"\n", + "header = {\"Authorization\": f\"Bearer {auth_token}\"}" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Gather the dashboard URL (provided by the creator of the RayCluster)\n", + "ray_dashboard = \"XXXX\" # Replace with the Ray dashboard URL\n", + "\n", + "# Initialize the RayJobClient\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Simple Example Explanation\n", + "\n", + "With the RayJobClient instantiated, lets run some batch inference. The following code is stored in `simple_batch_inf.py`, and is used as the entrypoint for the RayJob.\n", + "\n", + "What this processor configuration does:\n", + "- Set up a vLLM engine with your model\n", + "- Configure some settings for GPU processing\n", + "- Defines batch processing parameters (8 requests per batch, 2 GPU workers)\n", + "\n", + "```python\n", + "import ray\n", + "from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig\n", + "\n", + "processor_config = vLLMEngineProcessorConfig(\n", + " model_source=\"replace-me\",\n", + " engine_kwargs=dict(\n", + " enable_lora=False,\n", + " dtype=\"half\",\n", + " max_model_len=1024,\n", + " ),\n", + " batch_size=8,\n", + " concurrency=2,\n", + ")\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With the config defined, we can instantiate the processor. This enables batch inference by processing multiple requests through the vLLM engine, with two key steps:\n", + "- **Preprocess**: Converts each row into a structured chat format with system instructions and user queries, preparing the input for the LLM\n", + "- **Postprocess**: Extracts only the generated text from the model response, cleaning up the output\n", + "\n", + "The processor defines the pipeline that will be applied to each row in the dataset, enabling efficient batch processing through Ray Data's distributed execution framework.\n", + "\n", + "```python\n", + "processor = build_llm_processor(\n", + " processor_config,\n", + " preprocess=lambda row: dict(\n", + " messages=[\n", + " {\n", + " \"role\": \"system\",\n", + " \"content\": \"You are a calculator. Please only output the answer \"\n", + " \"of the given equation.\",\n", + " },\n", + " {\"role\": \"user\", \"content\": f\"{row['id']} ** 3 = ?\"},\n", + " ],\n", + " sampling_params=dict(\n", + " temperature=0.3,\n", + " max_tokens=20,\n", + " detokenize=False,\n", + " ),\n", + " ),\n", + " postprocess=lambda row: {\n", + " \"resp\": row[\"generated_text\"],\n", + " },\n", + ")\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can run the batch inference pipeline on our data, it will:\n", + "- In the background, the processor will download the model into memory where vLLM serves it locally (on Ray Cluster) for use in inference\n", + "- Generate a sample Ray Dataset with 32 rows (0-31) to process\n", + "- Run the LLM processor on the dataset, triggering the preprocessing, inference, and postprocessing steps\n", + "- Execute the lazy pipeline and loads results into memory\n", + "- Iterate through all outputs and print each response \n", + "\n", + "```python\n", + "ds = ray.data.range(30)\n", + "ds = processor(ds)\n", + "ds = ds.materialize()\n", + "\n", + "for out in ds.take_all():\n", + " print(out)\n", + " print(\"==========\")\n", + "```\n", + "\n", + "### Job Submission\n", + "\n", + "Now we can submit this job against the Ray Cluster using the `RayJobClient` from earlier " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "entrypoint_command = \"python simple_batch_inf.py\"\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=entrypoint_command,\n", + " runtime_env={\"working_dir\": \"./\", \"pip\": \"requirements.txt\"},\n", + ")\n", + "\n", + "print(submission_id + \" successfully submitted\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the job's status\n", + "client.get_job_status(submission_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get the job's logs\n", + "client.get_job_logs(submission_id)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/demo-notebooks/additional-demos/batch-inference/requirements.txt b/demo-notebooks/additional-demos/batch-inference/requirements.txt new file mode 100644 index 000000000..d9e8b73b2 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/requirements.txt @@ -0,0 +1,4 @@ +vllm +transformers +triton>=2.0.0 +torch>=2.0.0 diff --git a/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py b/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py new file mode 100644 index 000000000..c86ed15b4 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py @@ -0,0 +1,62 @@ +import ray +from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig + + +# 1. Construct a vLLM processor config. +processor_config = vLLMEngineProcessorConfig( + # The base model. + model_source="unsloth/Llama-3.2-1B-Instruct", + # vLLM engine config. + engine_kwargs=dict( + enable_lora=False, + # # Older GPUs (e.g. T4) don't support bfloat16. You should remove + # # this line if you're using later GPUs. + dtype="half", + # Reduce the model length to fit small GPUs. You should remove + # this line if you're using large GPUs. + max_model_len=1024, + ), + # The batch size used in Ray Data. + batch_size=8, + # Use one GPU in this example. + concurrency=1, + # If you save the LoRA adapter in S3, you can set the following path. + # dynamic_lora_loading_path="s3://your-lora-bucket/", +) + +# 2. Construct a processor using the processor config. +processor = build_llm_processor( + processor_config, + preprocess=lambda row: dict( + # Remove the LoRA model specification + messages=[ + { + "role": "system", + "content": "You are a calculator. Please only output the answer " + "of the given equation.", + }, + {"role": "user", "content": f"{row['id']} ** 3 = ?"}, + ], + sampling_params=dict( + temperature=0.3, + max_tokens=20, + detokenize=False, + ), + ), + postprocess=lambda row: { + "resp": row["generated_text"], + }, +) + +# 3. Synthesize a dataset with 32 rows. +ds = ray.data.range(32) +# 4. Apply the processor to the dataset. Note that this line won't kick off +# anything because processor is execution lazily. +ds = processor(ds) +# Materialization kicks off the pipeline execution. +ds = ds.materialize() + +# 5. Print all outputs. +for out in ds.take_all(): + print(out) + print("==========")