@@ -130,15 +130,17 @@ async def run(
130130            futures , queues , stop_event  =  await  self ._start_processes (
131131                manager , executor , scheduling_strategy 
132132            )
133-             run_info , requests_iter  =  self ._run_setup (
133+             run_info , requests_iter ,  times_iter  =  self ._run_setup (
134134                futures , scheduling_strategy , max_number , max_duration 
135135            )
136136
137137            # Add some initial requests to the queue 
138138            requests_iter  =  self ._add_requests (
139139                requests_iter ,
140140                queues .requests ,
141+                 times_iter ,
141142                run_info ,
143+                 loop_limit = run_info .strategy .queued_requests_limit ,
142144            )
143145            # Wait for the test to start 
144146            await  asyncio .sleep (time .time () -  scheduling_strategy .start_time )
@@ -169,6 +171,7 @@ async def run(
169171                    requests_iter  =  self ._add_requests (
170172                        requests_iter ,
171173                        queues .requests ,
174+                         times_iter ,
172175                        run_info ,
173176                    )
174177                    await  asyncio .sleep (0 )  # enable requests to start 
@@ -257,8 +260,9 @@ def _run_setup(
257260        scheduling_strategy : SchedulingStrategy ,
258261        max_number : Optional [int ],
259262        max_duration : Optional [float ],
260-     ) ->  tuple [SchedulerRunInfo , Iterator [Any ]]:
263+     ) ->  tuple [SchedulerRunInfo , Iterator [Any ],  Iterator [ float ] ]:
261264        requests_iter  =  iter (self .request_loader )
265+         times_iter  =  iter (scheduling_strategy .request_times ())
262266        end_time  =  scheduling_strategy .start_time  +  (max_duration  or  math .inf )
263267        end_number  =  max_number  or  math .inf 
264268
@@ -284,28 +288,32 @@ def _run_setup(
284288            strategy = scheduling_strategy ,
285289        )
286290
287-         return  info , requests_iter 
291+         return  info , requests_iter ,  times_iter 
288292
289293    def  _add_requests (
290294        self ,
291295        requests_iter : Optional [Iterator [Any ]],
292296        requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
297+         times_iter : Iterator [float ],
293298        run_info : SchedulerRunInfo ,
299+         loop_limit : Optional [int ] =  None ,
294300    ) ->  Optional [Iterator [Any ]]:
295301        if  requests_iter  is  not None :
296302            try :
297303                added_count  =  0 
298304
299-                 if  time .time () >=  run_info .end_time :
300-                     raise  StopIteration 
301- 
302305                while  not  requests_queue .full () and  added_count  <  (
303-                     run_info .strategy .queued_requests_limit 
304-                     or  settings .max_add_requests_per_loop 
306+                     loop_limit  or  settings .max_add_requests_per_loop 
305307                ):
306308                    if  run_info .created_requests  >=  run_info .end_number :
307309                        raise  StopIteration 
308310
311+                     if  (
312+                         next (times_iter ) >=  run_info .end_time 
313+                         or  time .time () >=  run_info .end_time 
314+                     ):
315+                         raise  StopIteration 
316+ 
309317                    work_req  =  WorkerProcessRequest [RequestT , ResponseT ](
310318                        request = next (requests_iter ),
311319                        timeout_time = run_info .end_time ,
0 commit comments