diff --git a/README.md b/README.md index 7147f40..63f9016 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,10 @@ cat docker/example.env ```bash cd docker && docker-compose --env-file example.env -f docker-compose.yml up ``` +If running on a GPU instance, use gpu docker-compose file +```bash +cd docker && docker-compose --env-file example.env -f docker-compose-gpu.yml up +``` If you need Triton support (keras/pytorch/onnx etc.), use the triton docker-compose file ```bash cd docker && docker-compose --env-file example.env -f docker-compose-triton.yml up diff --git a/clearml_serving/engines/triton/triton_helper.py b/clearml_serving/engines/triton/triton_helper.py index 19fd241..b4921c5 100644 --- a/clearml_serving/engines/triton/triton_helper.py +++ b/clearml_serving/engines/triton/triton_helper.py @@ -561,7 +561,7 @@ def main(): setattr(args, args_var, type(t)(v) if t is not None else v) # noinspection PyProtectedMember - serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.inference_task_id) + serving_task = ModelRequestProcessor._get_control_plane_task(task_id=args.serving_id) task = Task.init( project_name=args.project or serving_task.get_project_name() or "serving", diff --git a/clearml_serving/serving/entrypoint.sh b/clearml_serving/serving/entrypoint.sh index a5efea1..06cb4be 100755 --- a/clearml_serving/serving/entrypoint.sh +++ b/clearml_serving/serving/entrypoint.sh @@ -18,7 +18,8 @@ UVICORN_SERVE_LOOP="${UVICORN_SERVE_LOOP:-uvloop}" UVICORN_LOG_LEVEL="${UVICORN_LOG_LEVEL:-warning}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}" +CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration @@ -31,6 +32,7 @@ echo GUNICORN_EXTRA_ARGS="$GUNICORN_EXTRA_ARGS" echo UVICORN_SERVE_LOOP="$UVICORN_SERVE_LOOP" echo UVICORN_EXTRA_ARGS="$UVICORN_EXTRA_ARGS" echo UVICORN_LOG_LEVEL="$UVICORN_LOG_LEVEL" +echo CLEARML_DEFAULT_SERVE_SUFFIX="$CLEARML_DEFAULT_SERVE_SUFFIX" echo CLEARML_DEFAULT_BASE_SERVE_URL="$CLEARML_DEFAULT_BASE_SERVE_URL" echo CLEARML_DEFAULT_TRITON_GRPC_ADDR="$CLEARML_DEFAULT_TRITON_GRPC_ADDR" diff --git a/clearml_serving/serving/main.py b/clearml_serving/serving/main.py index 2536760..e092093 100644 --- a/clearml_serving/serving/main.py +++ b/clearml_serving/serving/main.py @@ -4,11 +4,15 @@ import gzip import asyncio -from fastapi import FastAPI, Request, Response, APIRouter, HTTPException +from fastapi import FastAPI, Request, Response, APIRouter, HTTPException, Depends from fastapi.routing import APIRoute from fastapi.responses import PlainTextResponse from grpc.aio import AioRpcError +from http import HTTPStatus + +from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest + from starlette.background import BackgroundTask from typing import Optional, Dict, Any, Callable, Union @@ -110,21 +114,19 @@ async def cuda_exception_handler(request, exc): return PlainTextResponse("CUDA out of memory. Restarting service", status_code=500, background=task) -router = APIRouter( - prefix="/serve", - tags=["models"], - responses={404: {"description": "Model Serving Endpoint Not found"}}, - route_class=GzipRoute, # mark-out to remove support for GZip content encoding -) - - -# cover all routing options for model version `/{model_id}`, `/{model_id}/123`, `/{model_id}?version=123` -@router.post("/{model_id}/{version}") -@router.post("/{model_id}/") -@router.post("/{model_id}") -async def serve_model(model_id: str, version: Optional[str] = None, request: Union[bytes, Dict[Any, Any]] = None): +async def process_with_exceptions( + base_url: str, + version: Optional[str], + request: Union[bytes, Dict[Any, Any]], + serve_type: str +): try: - return_value = await processor.process_request(base_url=model_id, version=version, request_body=request) + return_value = await processor.process_request( + base_url=base_url, + version=version, + request_body=request, + serve_type=serve_type + ) except EndpointNotFoundException as ex: raise HTTPException(status_code=404, detail="Error processing request, endpoint was not found: {}".format(ex)) except (EndpointModelLoadException, EndpointBackendEngineException) as ex: @@ -171,4 +173,54 @@ async def serve_model(model_id: str, version: Optional[str] = None, request: Uni return return_value +router = APIRouter( + prefix=f"/{os.environ.get('CLEARML_DEFAULT_SERVE_SUFFIX', 'serve')}", + tags=["models"], + responses={404: {"description": "Model Serving Endpoint Not found"}}, + route_class=GzipRoute, # mark-out to remove support for GZip content encoding +) + + +@router.post("/{model_id}/{version}") +@router.post("/{model_id}/") +@router.post("/{model_id}") +async def base_serve_model( + model_id: str, + version: Optional[str] = None, + request: Union[bytes, Dict[Any, Any]] = None +): + return_value = await process_with_exceptions( + base_url=model_id, + version=version, + request_body=request, + serve_type="process" + ) + return return_value + + +async def validate_json_request(raw_request: Request): + content_type = raw_request.headers.get("content-type", "").lower() + media_type = content_type.split(";", maxsplit=1)[0] + if media_type != "application/json": + raise HTTPException( + status_code=HTTPStatus.UNSUPPORTED_MEDIA_TYPE, + detail="Unsupported Media Type: Only 'application/json' is allowed" + ) + +@router.post("/openai/v1/{endpoint_type:path}", dependencies=[Depends(validate_json_request)]) +@router.get("/openai/v1/{endpoint_type:path}", dependencies=[Depends(validate_json_request)]) +async def openai_serve_model( + endpoint_type: str, + request: Union[CompletionRequest, ChatCompletionRequest], + raw_request: Request +): + combined_request = {"request": request, "raw_request": raw_request} + return_value = await process_with_exceptions( + base_url=request.model, + version=None, + request_body=combined_request, + serve_type=endpoint_type + ) + return return_value + app.include_router(router) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index 2fcdcab..11b6fd4 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -159,7 +159,7 @@ def __init__( self._serving_base_url = None self._metric_log_freq = None - async def process_request(self, base_url: str, version: str, request_body: dict) -> dict: + async def process_request(self, base_url: str, version: str, request_body: dict, serve_type: str) -> dict: """ Process request coming in, Raise Value error if url does not match existing endpoints @@ -171,7 +171,12 @@ async def process_request(self, base_url: str, version: str, request_body: dict) while self._update_lock_flag: await asyncio.sleep(0.5+random()) # retry to process - return await self.process_request(base_url=base_url, version=version, request_body=request_body) + return await self.process_request( + base_url=base_url, + version=version, + request_body=request_body, + serve_type=serve_type + ) try: # normalize url and version @@ -192,7 +197,12 @@ async def process_request(self, base_url: str, version: str, request_body: dict) processor = processor_cls(model_endpoint=ep, task=self._task) self._engine_processor_lookup[url] = processor - return_value = await self._process_request(processor=processor, url=url, body=request_body) + return_value = await self._process_request( + processor=processor, + url=url, + body=request_body, + serve_type=serve_type + ) finally: self._request_processing_state.dec() @@ -1193,7 +1203,7 @@ def _deserialize_conf_dict(self, configuration: dict) -> None: # update preprocessing classes BasePreprocessRequest.set_server_config(self._configuration) - async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict) -> dict: + async def _process_request(self, processor: BasePreprocessRequest, url: str, body: dict, serve_type: str) -> dict: # collect statistics for this request stats_collect_fn = None collect_stats = False @@ -1215,10 +1225,11 @@ async def _process_request(self, processor: BasePreprocessRequest, url: str, bod preprocessed = await processor.preprocess(body, state, stats_collect_fn) \ if processor.is_preprocess_async \ else processor.preprocess(body, state, stats_collect_fn) + processed_func = getattr(processor, serve_type.replace("/", "_")) # noinspection PyUnresolvedReferences - processed = await processor.process(preprocessed, state, stats_collect_fn) \ + processed = await processed_func(preprocessed, state, stats_collect_fn) \ if processor.is_process_async \ - else processor.process(preprocessed, state, stats_collect_fn) + else processed_func(preprocessed, state, stats_collect_fn) # noinspection PyUnresolvedReferences return_value = await processor.postprocess(processed, state, stats_collect_fn) \ if processor.is_postprocess_async \ diff --git a/clearml_serving/serving/preprocess_service.py b/clearml_serving/serving/preprocess_service.py index a5c069c..c0271d7 100644 --- a/clearml_serving/serving/preprocess_service.py +++ b/clearml_serving/serving/preprocess_service.py @@ -1,4 +1,5 @@ import os +import json import sys import threading import traceback @@ -14,6 +15,12 @@ from .endpoints import ModelEndpoint +class Singleton(object): + _instance = None + def __new__(class_, *args, **kwargs): + if not isinstance(class_._instance, class_): + class_._instance = object.__new__(class_, *args, **kwargs) + return class_._instance class BasePreprocessRequest(object): __preprocessing_lookup = {} @@ -596,3 +603,373 @@ async def _preprocess_send_request(_, endpoint: str, version: str = None, data: if not return_value.ok: return None return return_value.json() + + +class VllmEngine(Singleton): + _vllm = None + _fastapi = None + is_already_loaded = False + + def __init__(self) -> None: + # load vLLM Modules + if self._vllm is None: + from vllm.engine.arg_utils import AsyncEngineArgs + from vllm.engine.async_llm_engine import AsyncLLMEngine + from vllm.entrypoints.logger import RequestLogger + from vllm.entrypoints.openai.serving_engine import OpenAIServing + from vllm.entrypoints.openai.serving_models import ( + OpenAIServingModels, + LoRAModulePath, + PromptAdapterPath, + BaseModelPath + ) + from vllm.entrypoints.openai.serving_chat import OpenAIServingChat + from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion + from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding + from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization + from vllm.entrypoints.openai.protocol import ( + ChatCompletionResponse, + CompletionResponse, + ErrorResponse + ) + from vllm.entrypoints.chat_utils import ChatTemplateContentFormatOption + from vllm.usage.usage_lib import UsageContext + self._vllm = { + "AsyncEngineArgs": AsyncEngineArgs, + "AsyncLLMEngine": AsyncLLMEngine, + "RequestLogger": RequestLogger, + "OpenAIServing": OpenAIServing, + "OpenAIServingModels": OpenAIServingModels, + "LoRAModulePath": LoRAModulePath, + "PromptAdapterPath": PromptAdapterPath, + "BaseModelPath": BaseModelPath, + "OpenAIServingChat": OpenAIServingChat, + "OpenAIServingCompletion": OpenAIServingCompletion, + "OpenAIServingEmbedding": OpenAIServingEmbedding, + "OpenAIServingTokenization": OpenAIServingTokenization, + "ChatCompletionResponse": ChatCompletionResponse, + "CompletionResponse": CompletionResponse, + "ErrorResponse": ErrorResponse, + "ChatTemplateContentFormatOption": ChatTemplateContentFormatOption, + "UsageContext": UsageContext + } + + if self._fastapi is None: + from fastapi.responses import JSONResponse, StreamingResponse + self._fastapi = {} + self._fastapi["json_response"] = JSONResponse + self._fastapi["streaming_response"] = StreamingResponse + + from vllm.logger import init_logger + self.logger = init_logger(__name__) + + import socket + import prometheus_client + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if not s.connect_ex(('localhost', 8000)) == 0: + prometheus_client.start_http_server(8000) + + def load_engine( + self, + name: str, + model_path: str, + vllm_model_config: dict, + chat_settings: dict + ) -> None: + if self.is_already_loaded: + self.add_models(name=name, model_path=model_path) + return None + + vllm_engine_config = json.loads(os.environ.get("VLLM_ENGINE_ARGS").replace("'", "")) + vllm_engine_config["model"] = model_path + vllm_engine_config["served_model_name"] = name + engine_args = self._vllm["AsyncEngineArgs"](**vllm_engine_config) + async_engine_client = self._vllm["AsyncLLMEngine"].from_engine_args( + engine_args, + usage_context=self._vllm["UsageContext"].OPENAI_API_SERVER + ) + model_config = async_engine_client.engine.get_model_config() + request_logger = self._vllm["RequestLogger"]( + max_log_len=vllm_model_config["max_log_len"] + ) + self.openai_serving_models = self._vllm["OpenAIServingModels"]( + async_engine_client, + model_config, + [ + self._vllm["BaseModelPath"]( + name=name, + model_path=model_path + ) + ], + lora_modules=vllm_model_config["lora_modules"], + prompt_adapters=vllm_model_config["prompt_adapters"], + ) + # await self.openai_serving_models.init_static_loras() + self.openai_serving = self._vllm["OpenAIServing"]( + async_engine_client, + model_config, + self.openai_serving_models, + request_logger=request_logger, + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] + ) + self.openai_serving_chat = self._vllm["OpenAIServingChat"]( + async_engine_client, + model_config, + self.openai_serving_models, + response_role=vllm_model_config["response_role"], + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"], + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"], + enable_reasoning=chat_settings["enable_reasoning"], + reasoning_parser=chat_settings["reasoning_parser"], + enable_auto_tools=chat_settings["enable_auto_tools"], + tool_parser=chat_settings["tool_parser"], + enable_prompt_tokens_details=chat_settings["enable_prompt_tokens_details"] + ) if model_config.runner_type == "generate" else None + self.openai_serving_completion = self._vllm["OpenAIServingCompletion"]( + async_engine_client, + model_config, + self.openai_serving_models, + request_logger=request_logger, + return_tokens_as_token_ids=vllm_model_config["return_tokens_as_token_ids"] + ) if model_config.runner_type == "generate" else None + self.openai_serving_embedding = self._vllm["OpenAIServingEmbedding"]( + async_engine_client, + model_config, + self.openai_serving_models, + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"] + ) if model_config.task == "embed" else None + self.openai_serving_tokenization = self._vllm["OpenAIServingTokenization"]( + async_engine_client, + model_config, + self.openai_serving_models, + request_logger=request_logger, + chat_template=vllm_model_config["chat_template"], + chat_template_content_format=chat_settings["chat_template_content_format"] + ) + self.logger.info("vLLM Engine was successfully initialized") + self.is_already_loaded = True + return None + + def add_models(self, name: str, model_path: str) -> None: + self.openai_serving_models.base_model_paths.append( + self._vllm["BaseModelPath"]( + name=name, + model_path=model_path + ) + ) + self.logger.info("Model {} was added to vllm engine".format(name)) + return None + + def remove_model(self, name: str) -> None: + self.openai_serving_models.base_model_paths = [ + model for model in self.openai_serving_models.base_model_paths + if model.name != name + ] + self.logger.info("Model {} was removed from vllm engine".format(name)) + return None + + async def completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + request, raw_request = data["request"], data["raw_request"] + handler = self.openai_serving_completion + if handler is None: + return self.openai_serving.create_error_response( + message="The model does not support Completions API" + ) + generator = await handler.create_completion(request=request, raw_request=raw_request) + if isinstance(generator, self._vllm["ErrorResponse"]): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) + elif isinstance(generator, self._vllm["CompletionResponse"]): + return self._fastapi["json_response"](content=generator.model_dump()) + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + + async def chat_completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + request, raw_request = data["request"], data["raw_request"] + handler = self.openai_serving_chat + if handler is None: + return self.openai_serving.create_error_response( + message="The model does not support Chat Completions API" + ) + generator = await handler.create_chat_completion(request=request, raw_request=raw_request) + if isinstance(generator, self._vllm["ErrorResponse"]): + return self._fastapi["json_response"](content=generator.model_dump(), status_code=generator.code) + elif isinstance(generator, self._vllm["ChatCompletionResponse"]): + return self._fastapi["json_response"](content=generator.model_dump()) + return self._fastapi["streaming_response"](content=generator, media_type="text/event-stream") + + async def models( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + request, raw_request = data["request"], data["raw_request"] + models_ = await self.openai_serving_models.show_available_models() + return self._fastapi["json_response"](content=models_.model_dump()) + + +@BasePreprocessRequest.register_engine("vllm", modules=["vllm", "fastapi"]) +class VllmPreprocessRequest(BasePreprocessRequest): + is_preprocess_async = True + is_process_async = True + is_postprocess_async = True + asyncio_to_thread = None + _vllm_engine = None + + def __init__(self, model_endpoint: ModelEndpoint, task: Task = None): + super(VllmPreprocessRequest, self).__init__( + model_endpoint=model_endpoint, task=task) + self._vllm_engine = VllmEngine() + self._vllm_engine.load_engine( + name=model_endpoint.serving_url, + model_path=self._get_local_model_file(), + **self._model + ) + + if VllmPreprocessRequest.asyncio_to_thread is None: + from asyncio import to_thread as asyncio_to_thread + VllmPreprocessRequest.asyncio_to_thread = asyncio_to_thread + + # override `send_request` method with the async version + self._preprocess.__class__.send_request = VllmPreprocessRequest._preprocess_send_request + + async def preprocess( + self, + request: dict, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None, + ) -> Optional[Any]: + """ + Raise exception to report an error + Return value will be passed to serving engine + + :param request: dictionary as recieved from the RestAPI + :param state: Use state dict to store data passed to the post-processing function call. + Usage example: + >>> def preprocess(..., state): + state['preprocess_aux_data'] = [1,2,3] + >>> def postprocess(..., state): + print(state['preprocess_aux_data']) + :param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values + to the statictics collector servicd + + Usage example: + >>> print(request) + {"x0": 1, "x1": 2} + >>> collect_custom_statistics_fn({"x0": 1, "x1": 2}) + + :return: Object to be passed directly to the model inference + """ + if self._preprocess is not None and hasattr(self._preprocess, 'preprocess'): + return await self._preprocess.preprocess(request, state, collect_custom_statistics_fn) + return request + + async def postprocess( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Optional[dict]: + """ + Raise exception to report an error + Return value will be passed to serving engine + + :param data: object as recieved from the inference model function + :param state: Use state dict to store data passed to the post-processing function call. + Usage example: + >>> def preprocess(..., state): + state['preprocess_aux_data'] = [1,2,3] + >>> def postprocess(..., state): + print(state['preprocess_aux_data']) + :param collect_custom_statistics_fn: Optional, allows to send a custom set of key/values + to the statictics collector servicd + + Usage example: + >>> collect_custom_statistics_fn({"y": 1}) + + :return: Dictionary passed directly as the returned result of the RestAPI + """ + if self._preprocess is not None and hasattr(self._preprocess, 'postprocess'): + return await self._preprocess.postprocess(data, state, collect_custom_statistics_fn) + return data + + async def completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + return await self._vllm_engine.completions( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) + + async def chat_completions( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + return await self._vllm_engine.chat_completions( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) + + async def models( + self, + data: Any, + state: dict, + collect_custom_statistics_fn: Callable[[dict], None] = None + ) -> Any: + """ + The actual processing function. + We run the process in this context + """ + return await self._vllm_engine.models( + data=data, + state=state, + collect_custom_statistics_fn=collect_custom_statistics_fn + ) + + @staticmethod + async def _preprocess_send_request(_, endpoint: str, version: str = None, data: dict = None) -> Optional[dict]: + endpoint = "/openai/v1/{}".format(endpoint.strip("/")) + base_url = BasePreprocessRequest.get_server_config().get("base_serving_url") + base_url = (base_url or BasePreprocessRequest._default_serving_base_url).strip("/") + url = "{}/{}".format(base_url, endpoint.strip("/")) + return_value = await VllmPreprocessRequest.asyncio_to_thread( + request_post, url, json=data, timeout=BasePreprocessRequest._timeout) + if not return_value.ok: + return None + return return_value.json() diff --git a/clearml_serving/serving/requirements.txt b/clearml_serving/serving/requirements.txt index da12834..922f8e3 100644 --- a/clearml_serving/serving/requirements.txt +++ b/clearml_serving/serving/requirements.txt @@ -18,3 +18,5 @@ lightgbm>=3.3.2,<3.4 requests>=2.31.0 kafka-python>=2.0.2,<2.1 lz4>=4.0.0,<5 +prometheus_client==0.21.1 +vllm==0.7.3 diff --git a/clearml_serving/statistics/entrypoint.sh b/clearml_serving/statistics/entrypoint.sh index 1af8bef..5b4de93 100755 --- a/clearml_serving/statistics/entrypoint.sh +++ b/clearml_serving/statistics/entrypoint.sh @@ -10,7 +10,8 @@ echo CLEARML_DEFAULT_KAFKA_SERVE_URL="$CLEARML_DEFAULT_KAFKA_SERVE_URL" SERVING_PORT="${CLEARML_SERVING_PORT:-9999}" # set default internal serve endpoint (for request pipelining) -CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/serve}" +CLEARML_DEFAULT_SERVE_SUFFIX="${CLEARML_DEFAULT_SERVE_SUFFIX:-serve}" +CLEARML_DEFAULT_BASE_SERVE_URL="${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:$SERVING_PORT/$CLEARML_DEFAULT_SERVE_SUFFIX}" CLEARML_DEFAULT_TRITON_GRPC_ADDR="${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-127.0.0.1:8001}" # print configuration diff --git a/clearml_serving/version.py b/clearml_serving/version.py index e398332..5b8f37a 100644 --- a/clearml_serving/version.py +++ b/clearml_serving/version.py @@ -1 +1 @@ -__version__ = '1.3.2' +__version__ = '1.3.5' diff --git a/docker/docker-compose-gpu.yml b/docker/docker-compose-gpu.yml new file mode 100644 index 0000000..bfeead7 --- /dev/null +++ b/docker/docker-compose-gpu.yml @@ -0,0 +1,147 @@ +version: "3" + +services: + zookeeper: + image: bitnami/zookeeper:3.7.0 + container_name: clearml-serving-zookeeper + # ports: + # - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + networks: + - clearml-serving-backend + + kafka: + image: bitnami/kafka:3.1.1 + container_name: clearml-serving-kafka + # ports: + # - "9092:9092" + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://clearml-serving-kafka:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=clearml-serving-zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CREATE_TOPICS="topic_test:1:1" + depends_on: + - zookeeper + networks: + - clearml-serving-backend + + prometheus: + image: prom/prometheus:v2.34.0 + container_name: clearml-serving-prometheus + volumes: + - ./prometheus.yml:/prometheus.yml + command: + - '--config.file=/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--storage.tsdb.retention.time=200h' + - '--web.enable-lifecycle' + restart: unless-stopped + # ports: + # - "9090:9090" + depends_on: + - clearml-serving-statistics + networks: + - clearml-serving-backend + + alertmanager: + image: prom/alertmanager:v0.23.0 + container_name: clearml-serving-alertmanager + restart: unless-stopped + # ports: + # - "9093:9093" + depends_on: + - prometheus + - grafana + networks: + - clearml-serving-backend + + grafana: + image: grafana/grafana:8.4.4-ubuntu + container_name: clearml-serving-grafana + volumes: + - './datasource.yml:/etc/grafana/provisioning/datasources/datasource.yaml' + restart: unless-stopped + ports: + - "3000:3000" + depends_on: + - prometheus + networks: + - clearml-serving-backend + + + clearml-serving-inference: + image: allegroai/clearml-serving-inference:latest + container_name: clearml-serving-inference + restart: unless-stopped + # optimize perforamnce + security_opt: + - seccomp:unconfined + ports: + - "8080:8080" + environment: + CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml} + CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml} + CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml} + CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY} + CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY} + CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} + CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} + CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} + CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} + CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} + CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-} + CLEARML_USE_GUNICORN: ${CLEARML_USE_GUNICORN:-} + CLEARML_SERVING_NUM_PROCESS: ${CLEARML_SERVING_NUM_PROCESS:-} + CLEARML_EXTRA_PYTHON_PACKAGES: ${CLEARML_EXTRA_PYTHON_PACKAGES:-} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-} + AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION:-} + GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-} + AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-} + AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-} + VLLM_ENGINE_ARGS: ${VLLM_ENGINE_ARGS:-'{"disable_log_requests":true,"disable_log_stats":false,"gpu_memory_utilization":0.95,"enforce_eager":true}'} + depends_on: + - kafka + networks: + - clearml-serving-backend + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: ['0'] + capabilities: [gpu] + + clearml-serving-statistics: + image: allegroai/clearml-serving-statistics:latest + container_name: clearml-serving-statistics + restart: unless-stopped + # optimize perforamnce + security_opt: + - seccomp:unconfined + # ports: + # - "9999:9999" + environment: + CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-https://app.clear.ml} + CLEARML_API_HOST: ${CLEARML_API_HOST:-https://api.clear.ml} + CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-https://files.clear.ml} + CLEARML_API_ACCESS_KEY: ${CLEARML_API_ACCESS_KEY} + CLEARML_API_SECRET_KEY: ${CLEARML_API_SECRET_KEY} + CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} + CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} + CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + depends_on: + - kafka + networks: + - clearml-serving-backend + + +networks: + clearml-serving-backend: + driver: bridge diff --git a/docker/docker-compose-triton-gpu.yml b/docker/docker-compose-triton-gpu.yml index 8e54073..a2ec754 100644 --- a/docker/docker-compose-triton-gpu.yml +++ b/docker/docker-compose-triton-gpu.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} diff --git a/docker/docker-compose-triton.yml b/docker/docker-compose-triton.yml index b815583..edf92a4 100644 --- a/docker/docker-compose-triton.yml +++ b/docker/docker-compose-triton.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-clearml-serving-triton:8001} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 24e3b95..e73e184 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -92,6 +92,7 @@ services: CLEARML_SERVING_TASK_ID: ${CLEARML_SERVING_TASK_ID:-} CLEARML_SERVING_PORT: ${CLEARML_SERVING_PORT:-8080} CLEARML_SERVING_POLL_FREQ: ${CLEARML_SERVING_POLL_FREQ:-1.0} + CLEARML_DEFAULT_SERVE_SUFFIX: ${CLEARML_DEFAULT_SERVE_SUFFIX:-serve} CLEARML_DEFAULT_BASE_SERVE_URL: ${CLEARML_DEFAULT_BASE_SERVE_URL:-http://127.0.0.1:8080/serve} CLEARML_DEFAULT_KAFKA_SERVE_URL: ${CLEARML_DEFAULT_KAFKA_SERVE_URL:-clearml-serving-kafka:9092} CLEARML_DEFAULT_TRITON_GRPC_ADDR: ${CLEARML_DEFAULT_TRITON_GRPC_ADDR:-} diff --git a/docker/prometheus.yml b/docker/prometheus.yml index 469e220..d93387b 100644 --- a/docker/prometheus.yml +++ b/docker/prometheus.yml @@ -20,3 +20,10 @@ scrape_configs: static_configs: - targets: ['clearml-serving-statistics:9999'] + + - job_name: 'vllm' + + scrape_interval: 5s + + static_configs: + - targets: ['clearml-serving-inference:8000'] diff --git a/examples/vllm/preprocess.py b/examples/vllm/preprocess.py new file mode 100644 index 0000000..87ca1be --- /dev/null +++ b/examples/vllm/preprocess.py @@ -0,0 +1,54 @@ +"""Hugginface preprocessing module for ClearML Serving.""" +from typing import Any, Optional, List, Callable, Union + + +class Preprocess: + """Processing class will be run by the ClearML inference services before and after each request.""" + + def __init__(self): + """Set internal state, this will be called only once. (i.e. not per request).""" + self.model_endpoint = None + + def load(self, local_file_name: str) -> Optional[Any]: # noqa + vllm_model_config = { + "lora_modules": None, # [LoRAModulePath(name=a, path=b)] + "prompt_adapters": None, # [PromptAdapterPath(name=a, path=b)] + "response_role": "assistant", + "chat_template": None, + "return_tokens_as_token_ids": False, + "max_log_len": None + } + chat_settings = { + "enable_reasoning": False, + "reasoning_parser": None, + "enable_auto_tools": False, + "tool_parser": None, + "enable_prompt_tokens_details": False, + "chat_template_content_format": "auto" + } + return { + "vllm_model_config": vllm_model_config, + "chat_settings": chat_settings + } + + def remove_extra_system_prompts(self, messages: List) -> List: + system_messages_indices = [] + for i, msg in enumerate(messages): + if msg["role"] == "system": + system_messages_indices.append(i) + else: + break + if len(system_messages_indices) > 1: + last_system_index = system_messages_indices[-1] + messages = [msg for i, msg in enumerate(messages) if msg["role"] != "system" or i == last_system_index] + return messages + + async def preprocess( + self, + body: Union[bytes, dict], + state: dict, + collect_custom_statistics_fn: Optional[Callable[[dict], None]], + ) -> Any: # noqa + if "messages" in body["request"]: + body["request"]["messages"] = self.remove_extra_system_prompts(body["request"]["messages"]) + return body diff --git a/examples/vllm/readme.md b/examples/vllm/readme.md new file mode 100644 index 0000000..48b93be --- /dev/null +++ b/examples/vllm/readme.md @@ -0,0 +1,42 @@ +# Deploy vLLM model + +## setting up the serving service + +1. Create serving Service: `clearml-serving create --name "serving example"` (write down the service ID) + +2. Add vLLM engine parameters in `VLLM_ENGINE_ARGS` variable as it was done in [this file](/docker/docker-compose-gpu.yml#L108). Make sure to add any required additional packages (for your custom model) to the [requirements.txt](/clearml_serving/serving/requirements.txt) or [docker-compose.yml](https://github.com/allegroai/clearml-serving/blob/826f503cf4a9b069b89eb053696d218d1ce26f47/docker/docker-compose.yml#L97) (or as environment variable to the `clearml-serving-inference` container), by defining for example: `CLEARML_EXTRA_PYTHON_PACKAGES="vllm==0.7.3 prometheus_client==0.21.1"` + +3. Create model endpoint: + ``` + clearml-serving --id model add --model-id --engine vllm --endpoint "test_vllm" --preprocess "examples/vllm/preprocess.py" + ``` + +4. If you already have the `clearml-serving` docker-compose running, it might take it a minute or two to sync with the new endpoint. To run docker-compose, see [docker-compose instructions](/README.md#nail_care-initial-setup), p. 8 (and use [docker-compose-gpu.yml](/docker/docker-compose-gpu.yml) file for vllm on gpu and [docker-compose.yml](/docker/docker-compose.yml) otherwise) + +5. Test new endpoint (do notice the first call will trigger the model pulling, so it might take longer, from here on, it's all in memory): + + ```bash + python examples/vllm/test_openai_api.py + ``` + + **Available routes**: + + + /v1/completions + + /v1/chat/completions + + /v1/models + + see [test_openai_api.py](test_openai_api.py) for more information. + +6. Check metrics using grafana (You have to select Prometheus as data source, all of vLLM metrics have "vllm:" prefix). For more information, see [Model monitoring and performance metrics](/README.md#bar_chart-model-monitoring-and-performance-metrics-bell) + +NOTE! + +If you want to use send_request method, keep in mind that you have to pass "completions" or "chat/completions" in entrypoint (and pass model as a part of "data" parameter) and use it for non-streaming models: + +```python +prompt = "Hi there, goodman!" +result = self.send_request(endpoint="chat/completions", version=None, data={"model": "test_vllm", "messages": [{"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": prompt}]}) +answer = result.choises[0].message.content +``` +OR +If you want to use send_request method, use openai client instead \ No newline at end of file diff --git a/examples/vllm/test_openai_api.py b/examples/vllm/test_openai_api.py new file mode 100644 index 0000000..527f353 --- /dev/null +++ b/examples/vllm/test_openai_api.py @@ -0,0 +1,34 @@ +from openai import OpenAI + +def main(model_name: str): + client = OpenAI(api_key="-") + client.base_url = "http://127.0.0.1:8080/serve/openai/v1" + + chat_response = client.chat.completions.create( + model=model_name, + messages=[ + {"role": "system", "content": ""}, + {"role": "user", "content": "Hi there, goodman!"} + ], + temperature=1.0, + max_tokens=1024, + top_p=1.0 + ) + print(f"\n\nChatCompletion:\n\n {chat_response.choices[0].message.content}") + + comp_response = client.completions.create( + model=model_name, + prompt="Hi there, goodman!", + temperature=1.0, + max_tokens=256 + ) + print(f"\n\nCompletion:\n\n {comp_response.choices[0].text}") + + fake_body = {"stream": False, "model": model_name, "prompt": "test"} + print(f"\n\nModels:\n") + print('\n\n'.join(map(str, client.models.list(extra_body=fake_body).data))) + + return None + +if __name__ == '__main__': + main(model_name="test_vllm")