@@ -86,6 +86,29 @@ func (q *queue) push(ctx context.Context) (EvictFunc, <-chan core.Listener) {
8686 return q .evictionFunc (listElement ), releaseChan
8787}
8888
89+ var errQueueIsFull = fmt .Errorf ("queue is full" )
90+
91+ func (q * queue ) pushWithCapacity (ctx context.Context , maxCapacity uint64 ) (EvictFunc , <- chan core.Listener , error ) {
92+ q .mu .Lock ()
93+ defer q .mu .Unlock ()
94+
95+ // Restrict backlog size so the queue doesn't grow unbounded during an outage
96+ if uint64 (q .list .Len ()) >= maxCapacity {
97+ return nil , nil , errQueueIsFull
98+ }
99+
100+ releaseChan := make (chan core.Listener )
101+
102+ e := & queueElement {ctx : ctx , releaseChan : releaseChan }
103+
104+ // We always push to the front of the list regardless of
105+ // queue order. As usage of the list will always assume
106+ // Front == newest and Back == Oldest
107+ listElement := q .list .PushFront (e )
108+
109+ return q .evictionFunc (listElement ), releaseChan , nil
110+ }
111+
89112func (q * queue ) pop () * queueElement {
90113 evict , ele := q .peek ()
91114 if evict != nil {
@@ -274,15 +297,13 @@ func (l *QueueBlockingLimiter) tryAcquire(ctx context.Context) core.Listener {
274297 return listener
275298 }
276299
277- // Restrict backlog size so the queue doesn't grow unbounded during an outage
278- if l .backlog .len () >= l .maxBacklogSize {
279- return nil
280- }
281-
282300 // Create a holder for a listener and block until a listener is released by another
283301 // operation. Holders will be unblocked in LIFO or FIFO order depending on whatever
284302 // ordering was configured when backlog was instantiated
285- evict , eventReleaseChan := l .backlog .push (ctx )
303+ evict , eventReleaseChan , err := l .backlog .pushWithCapacity (ctx , l .maxBacklogSize )
304+ if err != nil {
305+ return nil
306+ }
286307
287308 // We're using a nil chan so that we
288309 // can avoid needing to duplicate the
0 commit comments