Skip to content

Limit concurrent requests to 28000 #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 95 additions & 94 deletions benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from datetime import datetime
import json
import random
import sys
import requests
import time
from typing import AsyncGenerator, List, Optional, Tuple, Dict
Expand Down Expand Up @@ -53,13 +54,16 @@
tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)')
ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request')
active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed')
active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections')

# Add trace config for monitoring in flight requests
async def on_request_start(session, trace_config_ctx, params):
active_requests_metric.inc()
active_connections_metric.set(len(session.connector._acquired))

async def on_request_end(session, trace_config_ctx, params):
active_requests_metric.dec()
active_connections_metric.set(len(session.connector._acquired))

trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
Expand Down Expand Up @@ -153,6 +157,7 @@ def init_errors_map() -> Dict[str, int]:

async def send_stream_request(
backend: str,
clientSession: any,
api_url: str,
prompt: str,
prompt_len: int,
Expand Down Expand Up @@ -198,51 +203,50 @@ async def send_stream_request(
most_recent_timestamp = st
output = ""
timeout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session:
try:
async with session.post(api_url, headers=headers, json=pload, ssl=False) as response:
async for chunk_bytes in response.content.iter_chunks():
chunk_bytes = chunk_bytes[0].strip()
if not chunk_bytes:
continue
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = timestamp - st
else:
itl.append(timestamp - most_recent_timestamp)
most_recent_timestamp = timestamp
if backend == "vllm":
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
elif backend == "jetstream":
if chunk_bytes.decode("utf-8") != "":
output += json.loads(chunk_bytes.decode("utf-8"))["text"]

except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
try:
async with clientSession.post(api_url, headers=headers, json=pload, ssl=False) as response:
async for chunk_bytes in response.content.iter_chunks():
chunk_bytes = chunk_bytes[0].strip()
if not chunk_bytes:
continue
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = timestamp - st
else:
itl.append(timestamp - most_recent_timestamp)
most_recent_timestamp = timestamp
if backend == "vllm":
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
elif backend == "jetstream":
if chunk_bytes.decode("utf-8") != "":
output += json.loads(chunk_bytes.decode("utf-8"))["text"]

except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
output_len = len(output_token_ids)
Expand All @@ -259,6 +263,7 @@ async def send_stream_request(
return request_latency, ttft, itl, None

async def send_request(
clientSession: any,
backend: str,
api_url: str,
prompt: str,
Expand Down Expand Up @@ -287,7 +292,7 @@ async def send_request(
"temperature": 0.0 if use_beam_search else 1.0,
"top_p": 1.0,
"max_tokens": output_len,
"ignore_eos": False,
"ignore_eos": True,
"stream": False,
}
elif backend == "tgi":
Expand Down Expand Up @@ -343,41 +348,37 @@ async def send_request(
else:
raise ValueError(f"Unknown backend: {backend}")

# Set client timeout to be 3 hrs.
timeout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session:
while True:
try:
async with session.post(api_url, headers=headers, json=pload, ssl=False) as response:
output = await response.json()

# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
while True:
try:
async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response:
output = await response.json()
# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors

request_end_time = time.time()
# Naive HF transformers generation and TensorRT-LLM generation stops at EOS
Expand Down Expand Up @@ -414,15 +415,15 @@ async def send_request(
return request_latency, None, None, None


async def run_single_request(args: argparse.Namespace, api_url: str, tokenizer: PreTrainedTokenizerBase,
async def run_single_request(args: argparse.Namespace, clientSession: any, api_url: str, tokenizer: PreTrainedTokenizerBase,
prompt: str, prompt_len: int, output_len: int, chosen_model: str) -> Tuple[str, Tuple]:
if args.stream_request:
result = await send_stream_request(
args.backend, api_url, prompt, prompt_len, output_len,
clientSession, args.backend, api_url, prompt, prompt_len, output_len,
args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,)
else:
result = await send_request(
args.backend, api_url, prompt, prompt_len, output_len,
clientSession, args.backend, api_url, prompt, prompt_len, output_len,
args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,)
return chosen_model, result

Expand Down Expand Up @@ -456,16 +457,16 @@ async def benchmark(
benchmark_start_time = time.time()
tasks: List[asyncio.Task] = []
prompts_sent = 0
async for request in generate_next_request(input_requests, args.request_rate):
if prompts_sent >= args.num_prompts:
break
prompt, prompt_len, output_len = request
chosen_model = random.choices(model_names, weights=model_weights)[0]
task = asyncio.create_task(run_single_request(args, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model))
tasks.append(task)
prompts_sent += 1

results = await asyncio.gather(*tasks)
async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=28000,),timeout=None, trace_configs=[trace_config]) as clientSession:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am less inclined to hard code 28k vs set to 0 (no limit) here and catch the appropriate error, log, and retry.

The added metric and logging will help observability. The retry is effectively the same outcome (qps slowdown).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes if we can log failures due to ephemeral port exhaustion so that we know the experiment is not valid and the user needs to reduce the QPS or num_prompts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to logging when we exhaust ports, added and prevented including server metrics when we exhaust ports since the wait time invalidates these. Non-server metrics could still be valuable since the measured e2e latency includes the time waiting to send the request, if no requests are ever being queued on any model server and the bottleneck is this tool then yes the experiment data would for certain be invalid.

async for request in generate_next_request(input_requests, args.request_rate):
if prompts_sent >= args.num_prompts:
break
prompt, prompt_len, output_len = request
chosen_model = random.choices(model_names, weights=model_weights)[0]
task = asyncio.create_task(run_single_request(args, clientSession, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model))
tasks.append(task)
prompts_sent += 1
results = await asyncio.gather(*tasks)

overall_results = {"latencies": [], "ttfts": [], "itls": [], "tpots": [], "errors": init_errors_map()}
per_model_results: Dict[str, Dict[str, List]] = {}
Expand Down