From 0ab6a5169a0d4d17362a844ec480aab9936afc87 Mon Sep 17 00:00:00 2001 From: ming1753 Date: Tue, 2 Dec 2025 19:14:50 +0800 Subject: [PATCH 1/4] [Feature] support audio tts --- fastdeploy/engine/request.py | 1 + fastdeploy/entrypoints/openai/protocol.py | 2 + .../entrypoints/openai/response_processors.py | 53 +++++++++++++++---- fastdeploy/entrypoints/openai/serving_chat.py | 10 ++++ 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 888364bb672..34935a51b4d 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -416,6 +416,7 @@ def __repr__(self) -> str: f"send_idx={self.send_idx}, " f"text={self.text!r}, " f"token_ids={self.token_ids}, " + f"decode_type={self.decode_type}, " f"draft_token_ids={self.draft_token_ids}, " f"reasoning_content={self.reasoning_content!r}, " f"logprobs={self.logprobs}, " diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 0bb35a284dc..4587c48b578 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -210,6 +210,7 @@ class ChatMessage(BaseModel): content: Optional[str] = None multimodal_content: Optional[List[Any]] = None reasoning_content: Optional[str] = None + audio_content: Optional[str] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None prompt_token_ids: Optional[List[int]] = None completion_token_ids: Optional[List[int]] = None @@ -272,6 +273,7 @@ class DeltaMessage(BaseModel): role: Optional[str] = None content: Optional[str] = None multimodal_content: Optional[List[Any]] = None + audio_content: Optional[str] = None prompt_token_ids: Optional[List[int]] = None completion_token_ids: Optional[List[int]] = None reasoning_content: Optional[str] = None diff --git a/fastdeploy/entrypoints/openai/response_processors.py b/fastdeploy/entrypoints/openai/response_processors.py index 95a5e3ec404..4334acfc461 100644 --- a/fastdeploy/entrypoints/openai/response_processors.py +++ b/fastdeploy/entrypoints/openai/response_processors.py @@ -14,7 +14,8 @@ # limitations under the License. """ -from typing import Any, List, Optional +import inspect +from typing import Any, Dict, List, Optional from fastdeploy.entrypoints.openai.usage_calculator import count_tokens from fastdeploy.input.tokenzier_client import AsyncTokenizerClient, ImageDecodeRequest @@ -34,12 +35,14 @@ def __init__( data_processor, enable_mm_output: Optional[bool] = False, eoi_token_id: Optional[int] = 101032, + eoa_token_id: Optional[int] = 2048, eos_token_id: Optional[int] = 2, decoder_base_url: Optional[str] = None, ): self.data_processor = data_processor self.enable_mm_output = enable_mm_output self.eoi_token_id = eoi_token_id + self.eoa_token_id = eoa_token_id self.eos_token_id = eos_token_id if decoder_base_url is not None: self.decoder_client = AsyncTokenizerClient(base_url=decoder_base_url) @@ -47,6 +50,7 @@ def __init__( self.decoder_client = None self._mm_buffer: List[Any] = [] # Buffer for accumulating image token_ids self._end_image_code_request_output: Optional[Any] = None + self._audio_buffer: Dict[Any] = {} self._multipart_buffer = [] def enable_multimodal_content(self): @@ -80,16 +84,47 @@ async def process_response_chat(self, request_outputs, stream, enable_thinking, for request_output in request_outputs: api_server_logger.debug(f"request_output {request_output}") if not self.enable_mm_output: - yield self.data_processor.process_response_dict( - response_dict=request_output, - stream=stream, - enable_thinking=enable_thinking, - include_stop_str_in_output=include_stop_str_in_output, - ) + decode_type = request_output["outputs"].get("decode_type", 0) or 0 + req_id = request_output["request_id"] + if decode_type == 0: # text + token_ids = request_output["outputs"]["token_ids"] + tts = req_id in self._audio_buffer + if token_ids[-1] == self.eos_token_id: + all_audio_tokens = self._audio_buffer.pop(req_id, []) + else: + all_audio_tokens = None + + if inspect.iscoroutinefunction(self.data_processor.process_response_dict): + response = await self.data_processor.process_response_dict( + response_dict=request_output, + stream=stream, + enable_thinking=enable_thinking, + include_stop_str_in_output=include_stop_str_in_output, + audio_tokens=all_audio_tokens, + tts=tts, + ) + else: + response = self.data_processor.process_response_dict( + response_dict=request_output, + stream=stream, + enable_thinking=enable_thinking, + include_stop_str_in_output=include_stop_str_in_output, + audio_tokens=all_audio_tokens, + tts=tts, + ) + yield response + elif decode_type == 2: # audio + token_ids = request_output["outputs"]["token_ids"] + if self.eoa_token_id is not None and self.eoa_token_id in token_ids: + continue + if req_id in self._audio_buffer: + self._audio_buffer[req_id].append(token_ids) + else: + self._audio_buffer[req_id] = [token_ids] elif stream: decode_type = request_output["outputs"].get("decode_type", 0) token_ids = request_output["outputs"]["token_ids"] - if decode_type == 0: + if decode_type == 0: # text if self.eoi_token_id and self.eoi_token_id in token_ids: if self._mm_buffer: all_tokens = self._mm_buffer @@ -118,7 +153,7 @@ async def process_response_chat(self, request_outputs, stream, enable_thinking, request_output["outputs"]["multipart"] = [text] yield request_output - elif decode_type == 1: + elif decode_type == 1: # image self._mm_buffer.append(token_ids) self._end_image_code_request_output = request_output else: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 9bb15f90942..aa8cf8dd7b0 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -329,6 +329,9 @@ async def chat_completion_stream_generator( else: choice.delta.content = "" + if res["outputs"].get("audio_content", None) is not None: + choice.delta.audio_content = res["outputs"]["audio_content"] + if request.return_token_ids: choice.delta.prompt_token_ids = list(prompt_token_ids) choice.delta.prompt_tokens = prompt_tokens @@ -389,6 +392,10 @@ async def chat_completion_stream_generator( delta_message.multimodal_content = output["multipart"] else: delta_message.content = output["text"] + + if output.get("audio_content", None) is not None: + delta_message.audio_content = output["audio_content"] + if not res["finished"] and "delta_message" in output: delta_message_output = output["delta_message"] if delta_message_output is None: @@ -689,6 +696,9 @@ async def _create_chat_completion_choice( else: message.content = output["text"] + if output.get("audio_content", None) is not None: + message.audio_content = output["audio_content"] + logprobs_full_res = None draft_logprobs_full_res = None prompt_logprobs_full_res = None From 6d69bd2994d3a42c6c67bffee8354b28d7715df2 Mon Sep 17 00:00:00 2001 From: ming1753 Date: Wed, 3 Dec 2025 15:43:09 +0800 Subject: [PATCH 2/4] fix bug --- .../entrypoints/openai/response_processors.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/fastdeploy/entrypoints/openai/response_processors.py b/fastdeploy/entrypoints/openai/response_processors.py index 4334acfc461..30b73cfbc05 100644 --- a/fastdeploy/entrypoints/openai/response_processors.py +++ b/fastdeploy/entrypoints/openai/response_processors.py @@ -84,15 +84,22 @@ async def process_response_chat(self, request_outputs, stream, enable_thinking, for request_output in request_outputs: api_server_logger.debug(f"request_output {request_output}") if not self.enable_mm_output: - decode_type = request_output["outputs"].get("decode_type", 0) or 0 + outputs = request_output.get("outputs", None) + if outputs is None: + decode_type = 0 + else: + decode_type = request_output["outputs"].get("decode_type", 0) or 0 req_id = request_output["request_id"] if decode_type == 0: # text - token_ids = request_output["outputs"]["token_ids"] tts = req_id in self._audio_buffer - if token_ids[-1] == self.eos_token_id: - all_audio_tokens = self._audio_buffer.pop(req_id, []) - else: + if outputs is None: all_audio_tokens = None + else: + token_ids = request_output["outputs"]["token_ids"] + if token_ids[-1] == self.eos_token_id: + all_audio_tokens = self._audio_buffer.pop(req_id, []) + else: + all_audio_tokens = None if inspect.iscoroutinefunction(self.data_processor.process_response_dict): response = await self.data_processor.process_response_dict( From 7af8eaeeb193f84845c562c86534f6bcab17326d Mon Sep 17 00:00:00 2001 From: ming1753 Date: Wed, 3 Dec 2025 18:35:26 +0800 Subject: [PATCH 3/4] fix bug --- fastdeploy/engine/common_engine.py | 1 + .../entrypoints/openai/response_processors.py | 76 +++++++++---------- .../openai/test_response_processors.py | 22 ++++++ 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 8dd697308c9..58939ed2067 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1002,6 +1002,7 @@ def _zmq_send_generated_tokens(self): if len(results) == 0: time.sleep(0.005) continue + self.llm_logger.info(f"results: {results}") if envs.FD_ENABLE_INTERNAL_ADAPTER: new_contents = [] for step_batch_results in results: diff --git a/fastdeploy/entrypoints/openai/response_processors.py b/fastdeploy/entrypoints/openai/response_processors.py index 30b73cfbc05..0ce67a00b97 100644 --- a/fastdeploy/entrypoints/openai/response_processors.py +++ b/fastdeploy/entrypoints/openai/response_processors.py @@ -85,49 +85,49 @@ async def process_response_chat(self, request_outputs, stream, enable_thinking, api_server_logger.debug(f"request_output {request_output}") if not self.enable_mm_output: outputs = request_output.get("outputs", None) - if outputs is None: - decode_type = 0 - else: + token_ids = outputs.get("token_ids", None) if outputs is not None else None + req_id = request_output.get("request_id", None) + if outputs is not None and token_ids is not None and req_id is not None: decode_type = request_output["outputs"].get("decode_type", 0) or 0 - req_id = request_output["request_id"] - if decode_type == 0: # text - tts = req_id in self._audio_buffer - if outputs is None: - all_audio_tokens = None - else: - token_ids = request_output["outputs"]["token_ids"] + if decode_type == 0: # text + tts = req_id in self._audio_buffer if token_ids[-1] == self.eos_token_id: all_audio_tokens = self._audio_buffer.pop(req_id, []) else: all_audio_tokens = None - - if inspect.iscoroutinefunction(self.data_processor.process_response_dict): - response = await self.data_processor.process_response_dict( - response_dict=request_output, - stream=stream, - enable_thinking=enable_thinking, - include_stop_str_in_output=include_stop_str_in_output, - audio_tokens=all_audio_tokens, - tts=tts, - ) - else: - response = self.data_processor.process_response_dict( - response_dict=request_output, - stream=stream, - enable_thinking=enable_thinking, - include_stop_str_in_output=include_stop_str_in_output, - audio_tokens=all_audio_tokens, - tts=tts, - ) - yield response - elif decode_type == 2: # audio - token_ids = request_output["outputs"]["token_ids"] - if self.eoa_token_id is not None and self.eoa_token_id in token_ids: - continue - if req_id in self._audio_buffer: - self._audio_buffer[req_id].append(token_ids) - else: - self._audio_buffer[req_id] = [token_ids] + if inspect.iscoroutinefunction(self.data_processor.process_response_dict): + response = await self.data_processor.process_response_dict( + response_dict=request_output, + stream=stream, + enable_thinking=enable_thinking, + include_stop_str_in_output=include_stop_str_in_output, + audio_tokens=all_audio_tokens, + tts=tts, + ) + else: + response = self.data_processor.process_response_dict( + response_dict=request_output, + stream=stream, + enable_thinking=enable_thinking, + include_stop_str_in_output=include_stop_str_in_output, + audio_tokens=all_audio_tokens, + tts=tts, + ) + yield response + elif decode_type == 2: # audio + if self.eoa_token_id is not None and self.eoa_token_id in token_ids: + continue + if req_id in self._audio_buffer: + self._audio_buffer[req_id].append(token_ids) + else: + self._audio_buffer[req_id] = [token_ids] + else: + yield self.data_processor.process_response_dict( + response_dict=request_output, + stream=stream, + enable_thinking=enable_thinking, + include_stop_str_in_output=include_stop_str_in_output, + ) elif stream: decode_type = request_output["outputs"].get("decode_type", 0) token_ids = request_output["outputs"]["token_ids"] diff --git a/tests/entrypoints/openai/test_response_processors.py b/tests/entrypoints/openai/test_response_processors.py index afab163b97e..bfdc0010da7 100644 --- a/tests/entrypoints/openai/test_response_processors.py +++ b/tests/entrypoints/openai/test_response_processors.py @@ -56,6 +56,28 @@ async def test_text_only_mode(self): self.assertEqual(results[0]["processed"], True) self.assertEqual(results[0]["raw"]["outputs"]["text"], "hello") + async def test_audio_tts(self): + """不开启 multimodal,直接走 data_processor""" + processor = ChatResponseProcessor(self.mock_data_processor) + request_outputs = [ + {"request_id": "req1", "outputs": {"decode_type": 2, "token_ids": [[11, 22]]}}, + {"request_id": "req1", "outputs": {"decode_type": 0, "token_ids": [1]}}, + {"request_id": "req1", "outputs": {"decode_type": 2, "token_ids": [[11, 22]]}}, + {"request_id": "req1", "outputs": {"decode_type": 0, "token_ids": [2]}}, + ] + + results = [ + r + async for r in processor.process_response_chat( + request_outputs, stream=True, enable_thinking=False, include_stop_str_in_output=False + ) + ] + + self.assertEqual(results[0]["processed"], True) + self.assertEqual(results[0]["raw"]["outputs"]["token_ids"], [1]) + self.assertEqual(results[1]["processed"], True) + self.assertEqual(results[1]["raw"]["outputs"]["token_ids"], [2]) + async def test_streaming_text_and_image(self): """流式模式下:text → image → text""" request_outputs = [ From 410b502fffaceabf80e7b3fe37b173d1c9c8f9f4 Mon Sep 17 00:00:00 2001 From: ming1753 Date: Wed, 3 Dec 2025 18:37:59 +0800 Subject: [PATCH 4/4] fix bug --- fastdeploy/engine/common_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 58939ed2067..8dd697308c9 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1002,7 +1002,6 @@ def _zmq_send_generated_tokens(self): if len(results) == 0: time.sleep(0.005) continue - self.llm_logger.info(f"results: {results}") if envs.FD_ENABLE_INTERNAL_ADAPTER: new_contents = [] for step_batch_results in results: