@@ -960,25 +960,42 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
960960 return ;
961961 }
962962
963- // Process one message from each selected queue
963+ // Process messages from each selected tenant
964+ // For fairness, process up to available concurrency slots per tenant
964965 for ( const { tenantId, queues } of tenantQueues ) {
965- for ( const queueId of queues ) {
966- // Check cooloff
967- if ( this . cooloffEnabled && this . #isInCooloff( queueId ) ) {
968- continue ;
969- }
966+ // Get available concurrency for this tenant
967+ let availableSlots = 1 ; // Default to 1 for backwards compatibility
968+ if ( this . concurrencyManager ) {
969+ const [ current , limit ] = await Promise . all ( [
970+ this . concurrencyManager . getCurrentConcurrency ( "tenant" , tenantId ) ,
971+ this . concurrencyManager . getConcurrencyLimit ( "tenant" , tenantId ) ,
972+ ] ) ;
973+ availableSlots = Math . max ( 1 , limit - current ) ;
974+ }
970975
971- const processed = await this . #processOneMessage( loopId , queueId , tenantId , shardId ) ;
976+ // Process up to availableSlots messages from this tenant's queues
977+ let slotsUsed = 0 ;
978+ queueLoop: for ( const queueId of queues ) {
979+ while ( slotsUsed < availableSlots ) {
980+ // Check cooloff
981+ if ( this . cooloffEnabled && this . #isInCooloff( queueId ) ) {
982+ break ; // Try next queue
983+ }
972984
973- if ( processed ) {
974- await this . scheduler . recordProcessed ?.( tenantId , queueId ) ;
975- this . #resetCooloff( queueId ) ;
976- } else {
977- this . #incrementCooloff( queueId ) ;
978- }
985+ const processed = await this . #processOneMessage( loopId , queueId , tenantId , shardId ) ;
979986
980- // Only process one message per queue per iteration for fairness
981- break ;
987+ if ( processed ) {
988+ await this . scheduler . recordProcessed ?.( tenantId , queueId ) ;
989+ this . #resetCooloff( queueId ) ;
990+ slotsUsed ++ ;
991+ } else {
992+ this . #incrementCooloff( queueId ) ;
993+ break ; // Queue empty or blocked, try next queue
994+ }
995+ }
996+ if ( slotsUsed >= availableSlots ) {
997+ break queueLoop;
998+ }
982999 }
9831000 }
9841001 }
0 commit comments