Skip to content

Commit 9a57299

Browse files
authored
Made workers' requests channel non-blocking, changed stress test (#239)
Signed-off-by: irar2 <[email protected]>
1 parent 5ca355b commit 9a57299

File tree

2 files changed

+18
-21
lines changed

2 files changed

+18
-21
lines changed

pkg/llm-d-inference-sim/simulator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (s *VllmSimulator) initializeSim(ctx context.Context) error {
345345
ctx: ctx,
346346
logger: s.logger,
347347
finishedChan: s.workerFinished,
348-
reqChan: make(chan *openaiserverapi.CompletionReqCtx),
348+
reqChan: make(chan *openaiserverapi.CompletionReqCtx, 1),
349349
processor: s,
350350
}
351351
go worker.waitForRequests()
@@ -402,8 +402,8 @@ func (s *VllmSimulator) processing(ctx context.Context) {
402402
s.logger.Info("Request processing done")
403403
return
404404
case completedReq := <-s.workerFinished:
405-
s.logger.V(4).Info("Worker finished")
406405
worker := completedReq.worker
406+
s.logger.V(4).Info("Worker finished", "worker", worker.id)
407407
s.decrementLora(completedReq.model)
408408
// there is a free worker - find a request for it and send this request for
409409
// processing with this worker

pkg/llm-d-inference-sim/worker_test.go

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,15 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() {
344344
option.WithBaseURL(baseURL),
345345
option.WithHTTPClient(client))
346346

347+
var wg sync.WaitGroup
348+
347349
// Run 2000 requests for 2 loras simultaneously
348350
numberOfRequests := 2000
351+
wg.Add(numberOfRequests)
352+
349353
for i := range numberOfRequests {
350354
go func() {
355+
defer wg.Done()
351356
defer GinkgoRecover()
352357
params := openai.ChatCompletionNewParams{
353358
Messages: []openai.ChatCompletionMessageParamUnion{
@@ -385,8 +390,11 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() {
385390

386391
// After about 2 secs (the mean ttft), send 500 more requests
387392
numberOfRequests = 500
393+
wg.Add(numberOfRequests)
394+
388395
for i := range numberOfRequests {
389396
go func() {
397+
defer wg.Done()
390398
defer GinkgoRecover()
391399
params := openai.ChatCompletionNewParams{
392400
Messages: []openai.ChatCompletionMessageParamUnion{
@@ -408,15 +416,8 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() {
408416
metrics = strings.Split(string(data), "\n")
409417

410418
// We sent 2500 requests, after about 2.5 seconds
411-
// number of running requests should be 1000
419+
// number of running requests should be about 1000
412420
// and the number of waiting requests should be less than 1000.
413-
// Since we are in the middle of requests scheduling,
414-
// the number of running requests can be 999.
415-
runningStr = findMetric(metrics, runningMetric)
416-
Expect(runningStr).NotTo(Equal(""))
417-
running, err = strconv.Atoi(runningStr)
418-
Expect(err).NotTo(HaveOccurred())
419-
Expect(running).To(Or(Equal(1000), Equal(999)))
420421
waitingStr = findMetric(metrics, waitingMetric)
421422
waiting, err = strconv.Atoi(waitingStr)
422423
Expect(err).NotTo(HaveOccurred())
@@ -431,19 +432,15 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() {
431432
Expect(err).NotTo(HaveOccurred())
432433
metrics = strings.Split(string(data), "\n")
433434

434-
// The number of running requests should be 1000
435-
// and the number of waiting requests should be less than 1000.
436-
// Since we are in the middle of requests scheduling,
437-
// the number of running requests can be 999.
438-
runningStr = findMetric(metrics, runningMetric)
439-
Expect(runningStr).NotTo(Equal(""))
440-
running, err = strconv.Atoi(runningStr)
441-
Expect(err).NotTo(HaveOccurred())
442-
Expect(running).To(Or(Equal(1000), Equal(999)))
435+
// The number of running requests should be about 1000
436+
// and the number of waiting requests should be less than the
437+
// previous number of waiting requests.
443438
waitingStr = findMetric(metrics, waitingMetric)
444-
waiting, err = strconv.Atoi(waitingStr)
439+
newWaiting, err := strconv.Atoi(waitingStr)
445440
Expect(err).NotTo(HaveOccurred())
446-
Expect(waiting).To(BeNumerically("<", 1000))
441+
Expect(newWaiting).To(BeNumerically("<=", waiting))
442+
443+
wg.Wait()
447444
})
448445
})
449446
})

0 commit comments

Comments
 (0)