@@ -208,15 +208,15 @@ func ExecCollectValues[T directive.Value](
208
208
//
209
209
// errorCb can be nil.
210
210
//
211
- // Returns the directive instance and reference for cleanup. If err != nil, ref == nil.
211
+ // Returns the directive instance and function for cleanup. If err != nil, ref == nil.
212
212
func ExecCollectValuesWatch [T directive.Value ](
213
213
ctx context.Context ,
214
214
bus Bus ,
215
215
dir directive.Directive ,
216
216
waitIdle bool ,
217
217
callback func (resErr []error , vals []T ) error ,
218
218
errorCb func (err error ),
219
- ) (directive.Instance , directive. Reference , error ) {
219
+ ) (directive.Instance , func () , error ) {
220
220
// bcast guards these variables
221
221
var bcast broadcast.Broadcast
222
222
@@ -291,7 +291,7 @@ func ExecCollectValuesWatch[T directive.Value](
291
291
return nil , nil , err
292
292
}
293
293
294
- defer di .AddIdleCallback (func (isIdle bool , errs []error ) {
294
+ relIdleCallback := di .AddIdleCallback (func (isIdle bool , errs []error ) {
295
295
bcast .HoldLock (func (broadcast func (), getWaitCh func () <- chan struct {}) {
296
296
wasIdle := idle
297
297
idle = isIdle
@@ -311,10 +311,12 @@ func ExecCollectValuesWatch[T directive.Value](
311
311
broadcast ()
312
312
}
313
313
})
314
- })()
314
+ })
315
315
316
316
// Goroutine to handle callbacks
317
317
go func () {
318
+ defer relIdleCallback ()
319
+
318
320
for {
319
321
var currVals []T
320
322
var currErr []error
@@ -355,5 +357,8 @@ func ExecCollectValuesWatch[T directive.Value](
355
357
}
356
358
}()
357
359
358
- return di , ref , nil
360
+ return di , func () {
361
+ relIdleCallback ()
362
+ ref .Release ()
363
+ }, nil
359
364
}
0 commit comments