@@ -205,6 +205,7 @@ let _sync ?(pending = false) x =
205
205
206
206
let sync c = _sync (Unifier. deref c)
207
207
let cleanup_source s = try s#force_sleep with _ -> ()
208
+ let pending_clocks = WeakQueue. create ()
208
209
let clocks = Queue. create ()
209
210
210
211
let rec _cleanup ~clock { outputs; passive_sources; active_sources } =
@@ -425,8 +426,7 @@ and _tick ~clock x =
425
426
Atomic. incr x.ticks;
426
427
check_stopped () ;
427
428
_after_tick ~clock x;
428
- check_stopped () ;
429
- Queue. iter clocks start
429
+ check_stopped ()
430
430
431
431
and _clock_thread ~clock x =
432
432
let has_sources_to_process () =
@@ -516,6 +516,22 @@ and start ?force c =
516
516
| `True sync -> _start ?force ~sync clock
517
517
| `False -> ()
518
518
519
+ let add_pending_clock =
520
+ (* Make sure that we're not collecting clocks between
521
+ the time they have sources attached to them and before
522
+ we get a chance to call [start_pending]. *)
523
+ let finalise c =
524
+ let clock = Unifier. deref c in
525
+ match _can_start clock with
526
+ | `True sync when sync <> `Passive ->
527
+ _start ~sync clock;
528
+ Queue. push clocks c
529
+ | _ -> ()
530
+ in
531
+ fun c ->
532
+ Gc. finalise finalise c;
533
+ WeakQueue. push pending_clocks c
534
+
519
535
let create ?(stack = [] ) ?on_error ?id ?(sub_ids = [] ) ?(sync = `Automatic ) () =
520
536
let on_error_queue = Queue. create () in
521
537
(match on_error with None -> () | Some fn -> Queue. push on_error_queue fn);
@@ -531,7 +547,7 @@ let create ?(stack = []) ?on_error ?id ?(sub_ids = []) ?(sync = `Automatic) () =
531
547
on_error = on_error_queue;
532
548
}
533
549
in
534
- if sync <> `Passive then Queue. push clocks c;
550
+ if sync <> `Passive then add_pending_clock c;
535
551
c
536
552
537
553
let time c =
@@ -540,18 +556,20 @@ let time c =
540
556
Time .to_float (_time c )
541
557
542
558
let start_pending () =
543
- let c = Queue . flush_elements clocks in
559
+ let c = WeakQueue . flush_elements pending_clocks in
544
560
let c = List. map (fun c -> (c, Unifier. deref c)) c in
545
561
let c = List. sort_uniq (fun (_ , c ) (_ , c' ) -> Stdlib. compare c c') c in
546
562
List. iter
547
563
(fun (c , clock ) ->
548
- ( match Atomic. get clock.state with
564
+ match Atomic. get clock.state with
549
565
| `Stopped _ -> (
550
566
match _can_start clock with
551
- | `True sync -> _start ~sync clock
552
- | `False -> () )
553
- | _ -> () );
554
- Queue. push clocks c)
567
+ | `True `Passive -> ()
568
+ | `True sync ->
569
+ _start ~sync clock;
570
+ Queue. push clocks c
571
+ | `False -> WeakQueue. push pending_clocks c)
572
+ | _ -> () )
555
573
c
556
574
557
575
let () =
@@ -598,4 +616,4 @@ let create ?stack ?on_error ?id ?sync () = create ?stack ?on_error ?id ?sync ()
598
616
let clocks () =
599
617
List. sort_uniq
600
618
(fun c c' -> Stdlib. compare (Unifier. deref c) (Unifier. deref c'))
601
- (Queue. elements clocks)
619
+ (WeakQueue. elements pending_clocks @ Queue. elements clocks)
0 commit comments