5555active_requests_metric = Gauge ('LatencyProfileGenerator:active_requests' , 'How many requests actively being processed' )
5656total_request_count = Counter ('LatencyProfileGenerator:request_count' , 'How many total requests have been sent' )
5757
58+ # Singleton class to track requests for QPS counting and calculation.
59+ class AsyncRequestCounter :
60+ _instance = None
61+ _lock = asyncio .Lock ()
62+
63+ async def __new__ (cls , target_requests = None , * args , ** kwargs ):
64+ async with cls ._lock :
65+ if not cls ._instance :
66+ cls ._instance = super ().__new__ (cls )
67+ cls ._instance ._count = 0
68+ cls ._instance ._start_time = time .time ()
69+ cls ._instance ._target_requests = target_requests
70+ return cls ._instance
71+
72+ async def increment (self ):
73+ async with self ._lock :
74+ self ._count += 1
75+ if self ._count == self ._target_requests :
76+ self ._end_time = time .time ()
77+
78+ async def get_qps (self ):
79+ return self ._count / (self ._end_time - self ._start_time )
80+
81+
5882# Add trace config for monitoring in flight requests
5983async def on_request_start (session , trace_config_ctx , params ):
6084 active_requests_metric .inc ()
6185 total_request_count .inc ()
86+ counter = await AsyncRequestCounter ()
87+ await counter .increment ()
6288
6389async def on_request_end (session , trace_config_ctx , params ):
6490 active_requests_metric .dec ()
@@ -462,6 +488,8 @@ async def benchmark(
462488 model_weights = list (models_dict .values ())
463489
464490 benchmark_start_time_sec = time .time ()
491+ # Initialize the counter with target prompts
492+ await AsyncRequestCounter (args .num_prompts )
465493 tasks : List [asyncio .Task ] = []
466494 prompts_sent = 0
467495 async for request in generate_next_request (input_requests , args .request_rate ):
@@ -503,12 +531,12 @@ async def benchmark(
503531
504532 benchmark_duration_sec = time .time () - benchmark_start_time_sec
505533
506- print_and_save_result (args , benchmark_duration_sec , prompts_sent , "weighted" ,
534+ await print_and_save_result (args , benchmark_duration_sec , prompts_sent , "weighted" ,
507535 overall_results ["latencies" ], overall_results ["ttfts" ],
508536 overall_results ["itls" ], overall_results ["tpots" ],
509537 overall_results ["errors" ])
510538 for model , data in per_model_results .items ():
511- print_and_save_result (args , benchmark_duration_sec , len (data ["latencies" ]), model ,
539+ await print_and_save_result (args , benchmark_duration_sec , len (data ["latencies" ]), model ,
512540 data ["latencies" ], data ["ttfts" ], data ["itls" ],
513541 data ["tpots" ], data ["errors" ])
514542
@@ -524,6 +552,7 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics
524552 "num_prompts_attempted" : benchmark_result ['num_prompts_attempted' ],
525553 "num_prompts_succeeded" : benchmark_result ['num_prompts_succeeded' ],
526554 "request_rate" : args .request_rate ,
555+ "queries_per_second" : benchmark_result ['queries_per_second' ],
527556 'server_metrics' : {
528557 ** server_metrics
529558 },
@@ -762,15 +791,6 @@ def print_metrics(metrics: List[str], duration_sec: float, namespace: str, job:
762791 logger .debug ("HTTP Error: %s" % (response ))
763792 continue
764793 server_metrics [metric ] = metric_results
765-
766-
767- url = 'https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/query' % (project_id )
768- headers_api = {'Authorization' : 'Bearer ' + credentials .token }
769- params = {'query' : f'rate(LatencyProfileGenerator:request_count_total[{ duration } s])' }
770- logger .debug (f"Finding { query_name } { metric } with the following query: { query } " )
771- request_post = requests .get (url = url , headers = headers_api , params = params )
772- response = request_post .json ()
773- print (f"Got response for benchmarking prom metrics: { response } " )
774794
775795 return server_metrics
776796
@@ -795,14 +815,18 @@ def get_stats_for_set(name, description, points):
795815 f'p99_{ name } ' : p99 ,
796816 }
797817
798- def print_and_save_result (args : argparse .Namespace , benchmark_duration_sec , total_requests , model , request_latencies , ttfts , itls , tpots , errors ):
818+ async def print_and_save_result (args : argparse .Namespace , benchmark_duration_sec , total_requests , model , request_latencies , ttfts , itls , tpots , errors ):
799819 benchmark_result = {}
800820
801821 print (f"====Result for Model: { model } ====" )
802822 print (f"Errors: { errors } " )
803823 print (f"Total time (seconds): { benchmark_duration_sec :.2f} s" )
804824 print (f"Successful/total requests: { len (request_latencies )} /{ total_requests } " )
805825 print (f"Requests/sec: { total_requests / benchmark_duration_sec :.2f} " )
826+ counter = await AsyncRequestCounter ()
827+ queries_per_second = await counter .get_qps ()
828+ print (f"Queries/sec: { queries_per_second :.2f} " )
829+ benchmark_result ['queries_per_second' ] = queries_per_second
806830 benchmark_result ["num_prompts_attempted" ] = total_requests
807831 benchmark_result ["num_prompts_succeeded" ] = len (request_latencies )
808832 benchmark_result ['benchmark_time' ] = benchmark_duration_sec
0 commit comments