diff --git a/benchmark_serving.py b/benchmark_serving.py index 43038df..44f0849 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -25,6 +25,7 @@ from datetime import datetime import json import random +import sys import requests import time from typing import AsyncGenerator, List, Optional, Tuple, Dict @@ -45,6 +46,7 @@ MIN_SEQ_LEN = 4 NEW_TEXT_KEY = "\nOutput:\n" PROMETHEUS_PORT = 9090 +CONNECTIONS_LIMIT = 28000 # Prometheus Metrics prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)]) @@ -53,13 +55,24 @@ tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)') ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request') active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') +active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections') + +# Exhaused connections warning should only be printed once per run +connection_limit_reached = False # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): + global connection_limit_reached active_requests_metric.inc() + active_connections_metric.set(len(session.connector._acquired)) + if not connection_limit_reached and len(session.connector._acquired) == CONNECTIONS_LIMIT: + print("Warning: Connection limit reached. Omitting server metrics due to inaccuracy") + connection_limit_reached = True + async def on_request_end(session, trace_config_ctx, params): active_requests_metric.dec() + active_connections_metric.set(len(session.connector._acquired)) trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) @@ -153,6 +166,7 @@ def init_errors_map() -> Dict[str, int]: async def send_stream_request( backend: str, + clientSession: any, api_url: str, prompt: str, prompt_len: int, @@ -198,51 +212,50 @@ async def send_stream_request( most_recent_timestamp = st output = "" timeout = aiohttp.ClientTimeout(total=timeout) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - async for chunk_bytes in response.content.iter_chunks(): - chunk_bytes = chunk_bytes[0].strip() - if not chunk_bytes: - continue - timestamp = time.perf_counter() - # First token - if ttft == 0.0: - ttft = timestamp - st - else: - itl.append(timestamp - most_recent_timestamp) - most_recent_timestamp = timestamp - if backend == "vllm": - if chunk_bytes.decode("utf-8")[6:] != "[DONE]": - output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] - elif backend == "jetstream": - if chunk_bytes.decode("utf-8") != "": - output += json.loads(chunk_bytes.decode("utf-8"))["text"] - - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, None, errors + try: + async with clientSession.post(api_url, headers=headers, json=pload, ssl=False) as response: + async for chunk_bytes in response.content.iter_chunks(): + chunk_bytes = chunk_bytes[0].strip() + if not chunk_bytes: + continue + timestamp = time.perf_counter() + # First token + if ttft == 0.0: + ttft = timestamp - st + else: + itl.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + if backend == "vllm": + if chunk_bytes.decode("utf-8")[6:] != "[DONE]": + output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] + elif backend == "jetstream": + if chunk_bytes.decode("utf-8") != "": + output += json.loads(chunk_bytes.decode("utf-8"))["text"] + + except aiohttp.client_exceptions.ClientConnectorError as client_err: + errors["ClientConnectorError"] += 1 + print(f"ClientConnectorError: {client_err}") + return None, None, None, errors + except asyncio.TimeoutError as timeout_err: + errors["TimeoutError"] += 1 + print(f"TimeoutError: {timeout_err}") + return None, None, None, errors + except aiohttp.client_exceptions.ClientOSError as e: + errors["ClientOSError"] += 1 + print(f"ClientOSError: {e}") + return None, None, None, errors + except aiohttp.client_exceptions.ContentTypeError as e: + print(f"ContentTypeError: {e}, response: {response}") + errors["ContentTypeError"] += 1 + return None, None, None, errors + except aiohttp.client_exceptions.ServerDisconnectedError as e: + errors["ServerDisconnectedError"] += 1 + print(f"ServerDisconnectedError: {e}") + return None, None, None, errors + except Exception as e: + print(f"Unknown error {e}") + errors["unknown_error"] += 1 + return None, None, None, errors request_end_time = time.time() output_token_ids = tokenizer(output).input_ids output_len = len(output_token_ids) @@ -259,6 +272,7 @@ async def send_stream_request( return request_latency, ttft, itl, None async def send_request( + clientSession: any, backend: str, api_url: str, prompt: str, @@ -343,41 +357,37 @@ async def send_request( else: raise ValueError(f"Unknown backend: {backend}") - # Set client timeout to be 3 hrs. - timeout = aiohttp.ClientTimeout(total=timeout) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session: - while True: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - output = await response.json() - - # Re-send the request if it failed. - if "error" not in output: - break - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, None, errors + while True: + try: + async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response: + output = await response.json() + # Re-send the request if it failed. + if "error" not in output: + break + except aiohttp.client_exceptions.ClientConnectorError as client_err: + errors["ClientConnectorError"] += 1 + print(f"ClientConnectorError: {client_err}") + return None, None, None, errors + except asyncio.TimeoutError as timeout_err: + errors["TimeoutError"] += 1 + print(f"TimeoutError: {timeout_err}") + return None, None, None, errors + except aiohttp.client_exceptions.ClientOSError as e: + errors["ClientOSError"] += 1 + print(f"ClientOSError: {e}") + return None, None, None, errors + except aiohttp.client_exceptions.ContentTypeError as e: + print(f"ContentTypeError: {e}, response: {response}") + errors["ContentTypeError"] += 1 + return None, None, None, errors + except aiohttp.client_exceptions.ServerDisconnectedError as e: + errors["ServerDisconnectedError"] += 1 + print(f"ServerDisconnectedError: {e}") + return None, None, None, errors + except Exception as e: + print(f"Unknown error {e}") + errors["unknown_error"] += 1 + return None, None, None, errors request_end_time = time.time() # Naive HF transformers generation and TensorRT-LLM generation stops at EOS @@ -414,15 +424,15 @@ async def send_request( return request_latency, None, None, None -async def run_single_request(args: argparse.Namespace, api_url: str, tokenizer: PreTrainedTokenizerBase, +async def run_single_request(args: argparse.Namespace, clientSession: any, api_url: str, tokenizer: PreTrainedTokenizerBase, prompt: str, prompt_len: int, output_len: int, chosen_model: str) -> Tuple[str, Tuple]: if args.stream_request: result = await send_stream_request( - args.backend, api_url, prompt, prompt_len, output_len, + clientSession, args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,) else: result = await send_request( - args.backend, api_url, prompt, prompt_len, output_len, + clientSession, args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,) return chosen_model, result @@ -456,16 +466,16 @@ async def benchmark( benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] prompts_sent = 0 - async for request in generate_next_request(input_requests, args.request_rate): - if prompts_sent >= args.num_prompts: - break - prompt, prompt_len, output_len = request - chosen_model = random.choices(model_names, weights=model_weights)[0] - task = asyncio.create_task(run_single_request(args, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model)) - tasks.append(task) - prompts_sent += 1 - - results = await asyncio.gather(*tasks) + async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=CONNECTIONS_LIMIT,),timeout=None, trace_configs=[trace_config]) as clientSession: + async for request in generate_next_request(input_requests, args.request_rate): + if prompts_sent >= args.num_prompts: + break + prompt, prompt_len, output_len = request + chosen_model = random.choices(model_names, weights=model_weights)[0] + task = asyncio.create_task(run_single_request(args, clientSession, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model)) + tasks.append(task) + prompts_sent += 1 + results = await asyncio.gather(*tasks) overall_results = {"latencies": [], "ttfts": [], "itls": [], "tpots": [], "errors": init_errors_map()} per_model_results: Dict[str, Dict[str, List]] = {} @@ -830,7 +840,8 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re } server_metrics = {} - if args.scrape_server_metrics: + global connection_limit_reached + if args.scrape_server_metrics and not connection_limit_reached: server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.pm_namespace, args.pm_job) if args.save_json_results: save_json_results(args, benchmark_result, server_metrics, model, errors)