Skip to content

Number of workers can be exceeded + Unit Test #154

@ievgennaida

Description

@ievgennaida

Library function 'BusyWorkers' can sometimes return more workers than configured.
I am also suspecting that also +1 task can be taken.

Please check the unit test bellow:
11 wokers instead of expected 10.

func TestShouldShowCorrectNumberOfBusyWorkers(t *testing.T) {

	maxTasks := 100
	maxBusyWorkers := int64(10)
	waitAllTasksExecuted := &sync.WaitGroup{}
	waitAllTasksExecuted.Add(maxTasks)

	runningTasksCount := &atomic.Int32{}
	runningTasksCount.Store(0)

	taskGenerator := func(_ int) job.TaskFunc {
		return func(ctx context.Context) error {
			runningTasksCount.Add(1)
			defer runningTasksCount.Add(-1)
			defer waitAllTasksExecuted.Done()
			time.Sleep(100 * time.Millisecond)
			return nil
		}
	}

	tasks := []job.TaskFunc{}
	for i := 0; i < maxTasks; i++ {
		tasks = append(tasks, taskGenerator(i))
	}
	pool := *queue.NewPool(maxBusyWorkers)
	pool.Start()
	defer pool.Shutdown()

	ia := &atomic.Int32{}
	ia.Store(0)

	waitTasksAdded := &sync.WaitGroup{}
	waitTasksAdded.Add(maxTasks)
	// Send a tasks every n ms
	ticker := time.NewTicker(10 * time.Millisecond)
	go func() {

		for range ticker.C {
			i := ia.Load()
			if i >= int32(maxTasks) {
				ticker.Stop()
				return
			}

			go func() {
				defer waitTasksAdded.Done()
				err := pool.QueueTask(job.TaskFunc(tasks[i]))
				if err != nil {
					panic(err)
				}
			}()

			ia.Add(1)
		}
	}()

	// Monitor busy workers count and running tasks
	tickerMonitor := time.NewTicker(10 * time.Millisecond)
	go func() {

		for range tickerMonitor.C {
// -> This is actual test failure
			require.LessOrEqual(t, int32(pool.BusyWorkers()), int32(maxBusyWorkers), "busy workers should be less or equal to max busy workers")
			require.LessOrEqual(t, int32(runningTasksCount.Load()), int32(maxBusyWorkers), "busy workers should be less or equal to max busy workers")
		}
	}()

	// Wait all tasks to be added
	waitTasksAdded.Wait()

	require.Equal(t, uint64(maxTasks), pool.SubmittedTasks(), "all tasks are submitted")

	waitAllTasksExecuted.Wait()
	time.Sleep(1 * time.Second)
	tickerMonitor.Stop()

	require.Equal(t, uint64(maxTasks), pool.SubmittedTasks(), "all tasks are submitted")
	require.Equal(t, uint64(maxTasks), pool.SuccessTasks(), "all tasks are successful")
	require.Equal(t, int64(0), pool.BusyWorkers(), "no busy workers, all tasks are finished")

}


github.com/golang-queue/queue v0.4.0
github.com/stretchr/testify v1.11.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions