From f6a26a6bce7fa9cbcddd565bb347bd1c5c937a9f Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 23 Jan 2025 13:58:11 +0100 Subject: [PATCH 1/5] Replace with_store by open/close_store --- src/frontend/ocamlmerlin/new/new_merlin.ml | 12 +++++++----- src/kernel/mocaml.mli | 2 +- src/kernel/mpipeline.ml | 2 ++ src/kernel/mpipeline.mli | 6 ++++++ src/ocaml/utils/local_store.ml | 9 +++++++++ src/ocaml/utils/local_store.mli | 4 ++++ 6 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/frontend/ocamlmerlin/new/new_merlin.ml b/src/frontend/ocamlmerlin/new/new_merlin.ml index 78e13d9c34..c43c2baa99 100644 --- a/src/frontend/ocamlmerlin/new/new_merlin.ml +++ b/src/frontend/ocamlmerlin/new/new_merlin.ml @@ -109,15 +109,16 @@ let run = (float_of_int (60 * Mconfig.(config.merlin.cache_lifespan))) (); File_id.with_cache @@ fun () -> + let store = Mpipeline.Cache.get config in + Local_store.open_store store; let source = Msource.make (Misc.string_of_file stdin) in - let pipeline = Mpipeline.make config source in + let pipeline = + Mpipeline.get ~state:(Mpipeline.Cache.get config) config source + in let json = let class_, message = Printexc.record_backtrace true; - match - Mpipeline.with_pipeline pipeline @@ fun () -> - command_action pipeline command_args - with + match command_action pipeline command_args with | result -> ("return", result) | exception Failure str -> let trace = Printexc.get_backtrace () in @@ -133,6 +134,7 @@ let run = Location.print_main Format.str_formatter err; ("error", `String (Format.flush_str_formatter ()))) in + Local_store.close_store store; let cpu_time = Misc.time_spent () -. start_cpu in let gc_stats = Gc.quick_stat () in let heap_mbytes = diff --git a/src/kernel/mocaml.mli b/src/kernel/mocaml.mli index 62b45e5525..2e9c9fee87 100644 --- a/src/kernel/mocaml.mli +++ b/src/kernel/mocaml.mli @@ -1,5 +1,5 @@ (* An instance of load path, environment cache & btype unification log *) -type typer_state +type typer_state = Local_store.store val new_state : unit -> typer_state val with_state : typer_state -> (unit -> 'a) -> 'a diff --git a/src/kernel/mpipeline.ml b/src/kernel/mpipeline.ml index 2180675a74..d49a93ce47 100644 --- a/src/kernel/mpipeline.ml +++ b/src/kernel/mpipeline.ml @@ -329,6 +329,8 @@ let process ?state ?(pp_time = ref 0.0) ?(reader_time = ref 0.0) let make config source = process (Mconfig.normalize config) source +let get ?state config source = process ?state (Mconfig.normalize config) source + let for_completion position { config; state; diff --git a/src/kernel/mpipeline.mli b/src/kernel/mpipeline.mli index f6f1d21df6..c7e9ba80b4 100644 --- a/src/kernel/mpipeline.mli +++ b/src/kernel/mpipeline.mli @@ -1,5 +1,7 @@ type t val make : Mconfig.t -> Msource.t -> t +val get : ?state:Mocaml.typer_state -> Mconfig.t -> Msource.t -> t + val with_pipeline : t -> (unit -> 'a) -> 'a val for_completion : Msource.position -> t -> t @@ -27,3 +29,7 @@ val typer_errors : t -> exn list val timing_information : t -> (string * float) list val cache_information : t -> Std.json + +module Cache : sig + val get : Mconfig.t -> Mocaml.typer_state +end diff --git a/src/ocaml/utils/local_store.ml b/src/ocaml/utils/local_store.ml index b6d117ea3b..a44d02fee4 100644 --- a/src/ocaml/utils/local_store.ml +++ b/src/ocaml/utils/local_store.ml @@ -57,3 +57,12 @@ let with_store slots f = List.iter (fun (Slot s) -> s.value <- !(s.ref)) slots; global_bindings.is_bound <- false; ) + + let open_store slots = + assert (not global_bindings.is_bound); + global_bindings.is_bound <- true; + List.iter (fun (Slot {ref;value}) -> ref := value) slots + + let close_store slots = + List.iter (fun (Slot s) -> s.value <- !(s.ref)) slots; + global_bindings.is_bound <- false \ No newline at end of file diff --git a/src/ocaml/utils/local_store.mli b/src/ocaml/utils/local_store.mli index 545cf71e02..f949c54ac0 100644 --- a/src/ocaml/utils/local_store.mli +++ b/src/ocaml/utils/local_store.mli @@ -65,3 +65,7 @@ val reset : unit -> unit val is_bound : unit -> bool (** Returns [true] when a store is active (i.e. when called from the callback passed to {!with_store}), [false] otherwise. *) + +val open_store : store -> unit + +val close_store : store -> unit From 32ca533f9f0987655867c0227100e6ec7360ad38 Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 23 Jan 2025 14:33:10 +0100 Subject: [PATCH 2/5] Remove lazyness in Mpipeline --- src/kernel/mpipeline.ml | 166 +++++++++++++++++++--------------------- 1 file changed, 79 insertions(+), 87 deletions(-) diff --git a/src/kernel/mpipeline.ml b/src/kernel/mpipeline.ml index d49a93ce47..953ef47cfd 100644 --- a/src/kernel/mpipeline.ml +++ b/src/kernel/mpipeline.ml @@ -4,23 +4,22 @@ let { Logger.log } = Logger.for_section "Pipeline" let time_shift = ref 0.0 -let timed_lazy r x = - lazy - (let start = Misc.time_spent () in - let time_shift0 = !time_shift in - let update () = - let delta = Misc.time_spent () -. start in - let shift = !time_shift -. time_shift0 in - time_shift := time_shift0 +. delta; - r := !r +. delta -. shift - in - match Lazy.force x with - | x -> - update (); - x - | exception exn -> - update (); - Std.reraise exn) +let timed r x = + let start = Misc.time_spent () in + let time_shift0 = !time_shift in + let update () = + let delta = Misc.time_spent () -. start in + let shift = !time_shift -. time_shift0 in + time_shift := time_shift0 +. delta; + r := !r +. delta -. shift + in + match x () with + | x -> + update (); + x + | exception exn -> + update (); + Std.reraise exn module Cache = struct let cache = ref [] @@ -65,7 +64,7 @@ module Cache = struct end module Typer = struct - type t = { errors : exn list lazy_t; result : Mtyper.result } + type t = { errors : exn list; result : Mtyper.result } end module Ppx = struct @@ -82,10 +81,10 @@ type t = { config : Mconfig.t; state : Mocaml.typer_state; raw_source : Msource.t; - source : (Msource.t * Mreader.parsetree option) lazy_t; - reader : Reader.t lazy_t; - ppx : Ppx.t lazy_t; - typer : Typer.t lazy_t; + source : Msource.t * Mreader.parsetree option; + reader : Reader.t; + ppx : Ppx.t; + typer : Typer.t; pp_time : float ref; reader_time : float ref; ppx_time : float ref; @@ -99,7 +98,7 @@ type t = let raw_source t = t.raw_source let input_config t = t.config -let input_source t = fst (Lazy.force t.source) +let input_source t = fst t.source let with_pipeline t f = Mocaml.with_state t.state @@ fun () -> @@ -110,10 +109,10 @@ let get_lexing_pos t pos = ~filename:(Mconfig.filename t.config) pos -let reader t = Lazy.force t.reader +let reader t = t.reader -let ppx t = Lazy.force t.ppx -let typer t = Lazy.force t.typer +let ppx t = t.ppx +let typer t = t.typer let reader_config t = (reader t).config let reader_parsetree t = (reader t).result.Mreader.parsetree @@ -131,7 +130,7 @@ let ppx_errors t = (ppx t).Ppx.errors let final_config t = (ppx t).Ppx.config let typer_result t = (typer t).Typer.result -let typer_errors t = Lazy.force (typer t).Typer.errors +let typer_errors t = (typer t).Typer.errors module Reader_phase = struct type t = @@ -230,9 +229,8 @@ let process ?state ?(pp_time = ref 0.0) ?(reader_time = ref 0.0) | Some state -> state in let source = - timed_lazy pp_time - (lazy - (match Mconfig.(config.ocaml.pp) with + timed pp_time (fun () -> + match Mconfig.(config.ocaml.pp) with | None -> (raw_source, None) | Some { workdir; workval } -> ( let source = Msource.text raw_source in @@ -242,73 +240,67 @@ let process ?state ?(pp_time = ref 0.0) ?(reader_time = ref 0.0) ~source ~pp:workval with | `Source source -> (Msource.make source, None) - | (`Interface _ | `Implementation _) as ast -> (raw_source, Some ast)))) + | (`Interface _ | `Implementation _) as ast -> (raw_source, Some ast))) in let reader = - timed_lazy reader_time - (lazy - (let (lazy ((_, pp_result) as source)) = source in - let config = Mconfig.normalize config in - Mocaml.setup_reader_config config; - let cache_disabling = - match (config.merlin.use_ppx_cache, pp_result) with - | false, _ -> Some "configuration" - | true, Some _ -> - (* The cache could be refined in the future to also act on the + timed reader_time (fun () -> + let ((_, pp_result) as source) = source in + let config = Mconfig.normalize config in + Mocaml.setup_reader_config config; + let cache_disabling = + match (config.merlin.use_ppx_cache, pp_result) with + | false, _ -> Some "configuration" + | true, Some _ -> + (* The cache could be refined in the future to also act on the PP phase. For now, let's disable the whole cache when there's a PP. *) - Some "source preprocessor usage" - | true, None -> None - in - let { Reader_with_cache.output = { result; cache_version }; - cache_was_hit - } = - Reader_with_cache.apply ~cache_disabling - { source; for_completion; config } - in - reader_cache_hit := cache_was_hit; - let cache_version = - if Option.is_some cache_disabling then None else Some cache_version - in - { Reader.result; config; cache_version })) + Some "source preprocessor usage" + | true, None -> None + in + let { Reader_with_cache.output = { result; cache_version }; + cache_was_hit + } = + Reader_with_cache.apply ~cache_disabling + { source; for_completion; config } + in + reader_cache_hit := cache_was_hit; + let cache_version = + if Option.is_some cache_disabling then None else Some cache_version + in + { Reader.result; config; cache_version }) in let ppx = - timed_lazy ppx_time - (lazy - (let (lazy - { Reader.result = { Mreader.parsetree; _ }; - config; - cache_version - }) = - reader - in - let caught = ref [] in - Msupport.catch_errors Mconfig.(config.ocaml.warnings) caught - @@ fun () -> - (* Currently the cache is invalidated even for source changes that don't + timed ppx_time (fun () -> + let { Reader.result = { Mreader.parsetree; _ }; config; cache_version } + = + reader + in + let caught = ref [] in + Msupport.catch_errors Mconfig.(config.ocaml.warnings) caught + @@ fun () -> + (* Currently the cache is invalidated even for source changes that don't change the parsetree. To avoid that, we'd have to digest the parsetree in the cache. *) - let cache_disabling, reader_cache = - match cache_version with - | Some v -> (None, Ppx_phase.Version v) - | None -> (Some "reader cache is disabled", Off) - in - let { Ppx_with_cache.output = parsetree; cache_was_hit } = - Ppx_with_cache.apply ~cache_disabling - { parsetree; config; reader_cache } - in - ppx_cache_hit := cache_was_hit; - { Ppx.config; parsetree; errors = !caught })) + let cache_disabling, reader_cache = + match cache_version with + | Some v -> (None, Ppx_phase.Version v) + | None -> (Some "reader cache is disabled", Off) + in + let { Ppx_with_cache.output = parsetree; cache_was_hit } = + Ppx_with_cache.apply ~cache_disabling + { parsetree; config; reader_cache } + in + ppx_cache_hit := cache_was_hit; + { Ppx.config; parsetree; errors = !caught }) in let typer = - timed_lazy typer_time - (lazy - (let (lazy { Ppx.config; parsetree; _ }) = ppx in - Mocaml.setup_typer_config config; - let result = Mtyper.run config parsetree in - let errors = timed_lazy error_time (lazy (Mtyper.get_errors result)) in - typer_cache_stats := Mtyper.get_cache_stat result; - { Typer.errors; result })) + timed typer_time (fun () -> + let { Ppx.config; parsetree; _ } = ppx in + Mocaml.setup_typer_config config; + let result = Mtyper.run config parsetree in + let errors = timed error_time (fun () -> Mtyper.get_errors result) in + typer_cache_stats := Mtyper.get_cache_stat result; + { Typer.errors; result }) in { config; state; From 54d216af5a62f33d2c55cd5d1b0cc318d74909ba Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 23 Jan 2025 18:14:03 +0100 Subject: [PATCH 3/5] First try with a typer domain --- src/frontend/ocamlmerlin/new/new_merlin.ml | 4 +- .../ocamlmerlin/ocamlmerlin_server.ml | 11 ++++- src/kernel/mpipeline.ml | 44 ++++++++++++++++++- src/kernel/mpipeline.mli | 6 ++- 4 files changed, 58 insertions(+), 7 deletions(-) diff --git a/src/frontend/ocamlmerlin/new/new_merlin.ml b/src/frontend/ocamlmerlin/new/new_merlin.ml index c43c2baa99..374ddc0bc3 100644 --- a/src/frontend/ocamlmerlin/new/new_merlin.ml +++ b/src/frontend/ocamlmerlin/new/new_merlin.ml @@ -112,9 +112,7 @@ let run = let store = Mpipeline.Cache.get config in Local_store.open_store store; let source = Msource.make (Misc.string_of_file stdin) in - let pipeline = - Mpipeline.get ~state:(Mpipeline.Cache.get config) config source - in + let pipeline = Mpipeline.get config source in let json = let class_, message = Printexc.record_backtrace true; diff --git a/src/frontend/ocamlmerlin/ocamlmerlin_server.ml b/src/frontend/ocamlmerlin/ocamlmerlin_server.ml index 35ca8a3a64..6641b0af1e 100644 --- a/src/frontend/ocamlmerlin/ocamlmerlin_server.ml +++ b/src/frontend/ocamlmerlin/ocamlmerlin_server.ml @@ -56,8 +56,12 @@ module Server = struct | None -> Logger.log ~section:"server" ~title:"cannot setup listener" "" | Some server -> (* If the client closes its connection, don't let it kill us with a SIGPIPE. *) + let domain_typer = Domain.spawn Mpipeline.domain_typer in if Sys.unix then Sys.set_signal Sys.sigpipe Sys.Signal_ignore; loop (File_id.get Sys.executable_name) server; + + Atomic.set Mpipeline.close_typer `True; + Domain.join domain_typer; Os_ipc.server_close server end @@ -65,7 +69,12 @@ let main () = (* Setup env for extensions *) Unix.putenv "__MERLIN_MASTER_PID" (string_of_int (Unix.getpid ())); match List.tl (Array.to_list Sys.argv) with - | "single" :: args -> exit (New_merlin.run ~new_env:None None args) + | "single" :: args -> + let domain_typer = Domain.spawn Mpipeline.domain_typer in + let vexit = New_merlin.run ~new_env:None None args in + Atomic.set Mpipeline.close_typer `True; + Domain.join domain_typer; + exit vexit | "old-protocol" :: args -> Old_merlin.run args | [ "server"; socket_path; socket_fd ] -> Server.start socket_path socket_fd | ("-help" | "--help" | "-h" | "server") :: _ -> diff --git a/src/kernel/mpipeline.ml b/src/kernel/mpipeline.ml index 953ef47cfd..79e285d974 100644 --- a/src/kernel/mpipeline.ml +++ b/src/kernel/mpipeline.ml @@ -321,8 +321,6 @@ let process ?state ?(pp_time = ref 0.0) ?(reader_time = ref 0.0) let make config source = process (Mconfig.normalize config) source -let get ?state config source = process ?state (Mconfig.normalize config) source - let for_completion position { config; state; @@ -367,3 +365,45 @@ let cache_information t = ("cmt", cmt); ("cmi", cmi) ] + +let shared_config = Atomic.make None +let shared_pipeline = Atomic.make None + +let close_typer = Atomic.make `False + +let domain_typer () = + let rec loop () = + if Atomic.get close_typer = `True then () + else + match Atomic.get shared_config with + | None -> + Domain.cpu_relax (); + loop () + | Some (config, source) as curr -> ( + try + let pipeline = make config source in + if Atomic.compare_and_set shared_config curr None then + Atomic.set shared_pipeline (Some pipeline); + loop () + with exn -> Atomic.set close_typer (`Exn exn)) + in + loop () + +let get config source = + Atomic.set shared_config (Some (config, source)); + + let rec loop count = + match Atomic.get shared_pipeline with + | None -> begin + match Atomic.get close_typer with + | `Exn exn -> raise exn + | `True -> assert false + | _ -> + Domain.cpu_relax (); + loop (count + 1) + end + | Some pipeline -> + Atomic.set shared_pipeline None; + pipeline + in + loop 0 diff --git a/src/kernel/mpipeline.mli b/src/kernel/mpipeline.mli index c7e9ba80b4..2586132ff1 100644 --- a/src/kernel/mpipeline.mli +++ b/src/kernel/mpipeline.mli @@ -1,6 +1,6 @@ type t val make : Mconfig.t -> Msource.t -> t -val get : ?state:Mocaml.typer_state -> Mconfig.t -> Msource.t -> t +val get : Mconfig.t -> Msource.t -> t val with_pipeline : t -> (unit -> 'a) -> 'a val for_completion : Msource.position -> t -> t @@ -33,3 +33,7 @@ val cache_information : t -> Std.json module Cache : sig val get : Mconfig.t -> Mocaml.typer_state end + +val close_typer : [ `True | `False | `Exn of exn ] Atomic.t + +val domain_typer : unit -> unit From 358ae277cbc1672987a60b163ee06f837388ec12 Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Mon, 27 Jan 2025 17:19:38 +0100 Subject: [PATCH 4/5] Version of the parallel typer with locks. --- src/frontend/ocamlmerlin/new/new_merlin.ml | 10 +- .../ocamlmerlin/ocamlmerlin_server.ml | 32 +++-- src/kernel/mpipeline.ml | 112 ++++++++++++++---- src/kernel/mpipeline.mli | 16 ++- src/kernel/shared.ml | 65 ++++++++++ src/kernel/shared.mli | 9 ++ 6 files changed, 199 insertions(+), 45 deletions(-) create mode 100644 src/kernel/shared.ml create mode 100644 src/kernel/shared.mli diff --git a/src/frontend/ocamlmerlin/new/new_merlin.ml b/src/frontend/ocamlmerlin/new/new_merlin.ml index 374ddc0bc3..0a5feabc58 100644 --- a/src/frontend/ocamlmerlin/new/new_merlin.ml +++ b/src/frontend/ocamlmerlin/new/new_merlin.ml @@ -47,7 +47,7 @@ let commands_help () = print_endline doc) New_commands.all_commands -let run = +let run shared = let query_num = ref (-1) in function | [] -> @@ -112,7 +112,7 @@ let run = let store = Mpipeline.Cache.get config in Local_store.open_store store; let source = Msource.make (Misc.string_of_file stdin) in - let pipeline = Mpipeline.get config source in + let pipeline = Mpipeline.get shared config source in let json = let class_, message = Printexc.record_backtrace true; @@ -186,7 +186,7 @@ let with_wd ~wd ~old_wd f args = old_wd; f args -let run ~new_env wd args = +let run ~new_env shared wd args = begin match new_env with | Some env -> @@ -197,10 +197,10 @@ let run ~new_env wd args = let old_wd = Sys.getcwd () in let run args () = match wd with - | Some wd -> with_wd ~wd ~old_wd run args + | Some wd -> with_wd ~wd ~old_wd (run shared) args | None -> log ~title:"run" "No working directory specified (old wd: %S)" old_wd; - run args + run shared args in let `Log_file_path log_file, `Log_sections sections = Log_info.get () in Logger.with_log_file log_file ~sections @@ run args diff --git a/src/frontend/ocamlmerlin/ocamlmerlin_server.ml b/src/frontend/ocamlmerlin/ocamlmerlin_server.ml index 6641b0af1e..04c8f85113 100644 --- a/src/frontend/ocamlmerlin/ocamlmerlin_server.ml +++ b/src/frontend/ocamlmerlin/ocamlmerlin_server.ml @@ -2,19 +2,19 @@ let merlin_timeout = try float_of_string (Sys.getenv "MERLIN_TIMEOUT") with _ -> 600.0 module Server = struct - let process_request { Os_ipc.wd; environ; argv; context = _ } = + let process_request { Os_ipc.wd; environ; argv; context = _ } shared = match Array.to_list argv with | "stop-server" :: _ -> raise Exit - | args -> New_merlin.run ~new_env:(Some environ) (Some wd) args + | args -> New_merlin.run ~new_env:(Some environ) shared (Some wd) args - let process_client client = + let process_client client shared = let context = client.Os_ipc.context in Os_ipc.context_setup context; let close_with return_code = flush_all (); Os_ipc.context_close context ~return_code in - match process_request client with + match process_request client shared with | code -> close_with code | exception Exit -> close_with (-1); @@ -38,29 +38,32 @@ module Server = struct | Some _ as result -> result | None -> loop 1.0 - let rec loop merlinid server = + let rec loop merlinid server shared = match server_accept merlinid server with | None -> (* Timeout *) () | Some client -> let continue = - match process_client client with + match process_client client shared with | exception Exit -> false | () -> true in - if continue then loop merlinid server + if continue then loop merlinid server shared let start socket_path socket_fd = match Os_ipc.server_setup socket_path socket_fd with | None -> Logger.log ~section:"server" ~title:"cannot setup listener" "" | Some server -> (* If the client closes its connection, don't let it kill us with a SIGPIPE. *) - let domain_typer = Domain.spawn Mpipeline.domain_typer in + let shared = Mpipeline.create_shared () in + let domain_typer = Domain.spawn @@ Mpipeline.domain_typer shared in if Sys.unix then Sys.set_signal Sys.sigpipe Sys.Signal_ignore; - loop (File_id.get Sys.executable_name) server; + loop (File_id.get Sys.executable_name) server shared; + + Atomic.set Mpipeline.(shared.closed) `True; + Shared.signal shared.curr_config; - Atomic.set Mpipeline.close_typer `True; Domain.join domain_typer; Os_ipc.server_close server end @@ -70,9 +73,12 @@ let main () = Unix.putenv "__MERLIN_MASTER_PID" (string_of_int (Unix.getpid ())); match List.tl (Array.to_list Sys.argv) with | "single" :: args -> - let domain_typer = Domain.spawn Mpipeline.domain_typer in - let vexit = New_merlin.run ~new_env:None None args in - Atomic.set Mpipeline.close_typer `True; + let shared = Mpipeline.create_shared () in + let domain_typer = Domain.spawn @@ Mpipeline.domain_typer shared in + let vexit = New_merlin.run ~new_env:None shared None args in + Atomic.set Mpipeline.(shared.closed) `True; + (* to unlock the typer domain *) + Shared.signal shared.curr_config; Domain.join domain_typer; exit vexit | "old-protocol" :: args -> Old_merlin.run args diff --git a/src/kernel/mpipeline.ml b/src/kernel/mpipeline.ml index 79e285d974..b473271aae 100644 --- a/src/kernel/mpipeline.ml +++ b/src/kernel/mpipeline.ml @@ -366,44 +366,108 @@ let cache_information t = ("cmi", cmi) ] -let shared_config = Atomic.make None -let shared_pipeline = Atomic.make None +(* ****************************************************************** *) +(* ********************** Parallel stuff **************************** *) +(* ****************************************************************** *) + +(** About closed : +Main domain writes: +- `True when closing +- `Closed to ack the reception of an exception + +Typer domain writes: +- `Exn exn when an catching an exception is found +- `Closed to ack the reception of `True +*) +type shared = + { closed : [ `True | `False | `Exn of exn | `Closed ] Atomic.t; + curr_config : (Mconfig.t * Msource.t) option Shared.t; + partial_result : t option Shared.t; + complete_result : t option Shared.t + } -let close_typer = Atomic.make `False +let create_shared () = + { closed = Atomic.make `False; + curr_config = Shared.create None; + partial_result = Shared.create None; + complete_result = Shared.create None + } -let domain_typer () = +let rec share_exn (shared : shared) exn = + match Atomic.get shared.closed with + | `False as prev -> + if Atomic.compare_and_set shared.closed prev (`Exn exn) then + while Atomic.get shared.closed <> `Closed do + Shared.signal shared.partial_result + done + else share_exn shared exn + | `True -> () + | _ -> assert false + +let domain_typer shared () = let rec loop () = - if Atomic.get close_typer = `True then () - else - match Atomic.get shared_config with + if Atomic.get shared.closed = `True then Atomic.set shared.closed `Closed + else begin + match Shared.get shared.curr_config with | None -> - Domain.cpu_relax (); + Shared.wait shared.curr_config; loop () - | Some (config, source) as curr -> ( + | Some (config, source) -> ( try let pipeline = make config source in - if Atomic.compare_and_set shared_config curr None then - Atomic.set shared_pipeline (Some pipeline); + Shared.set shared.curr_config None; + Shared.locking_set shared.partial_result (Some pipeline); loop () - with exn -> Atomic.set close_typer (`Exn exn)) + with exn -> + Atomic.set shared.closed (`Exn exn); + shared.curr_config.value <- None; + loop ()) + end in - loop () + Shared.protect shared.curr_config @@ fun () -> loop () -let get config source = - Atomic.set shared_config (Some (config, source)); +let get { closed; curr_config; partial_result; _ } config source = + Shared.set curr_config (Some (config, source)); - let rec loop count = - match Atomic.get shared_pipeline with + let rec loop () = + match Shared.get partial_result with | None -> begin - match Atomic.get close_typer with - | `Exn exn -> raise exn - | `True -> assert false + match Atomic.get closed with + | `True | `Closed -> assert false + | `Exn exn -> + Atomic.set closed `Closed; + raise exn | _ -> - Domain.cpu_relax (); - loop (count + 1) + Shared.wait partial_result; + loop () end | Some pipeline -> - Atomic.set shared_pipeline None; + Shared.set partial_result None; pipeline in - loop 0 + Shared.protect partial_result @@ fun () -> loop () + +(* The exchange of message on [shared.closed] is inevitable to avoid some bad + interleavings. In particular, the following implementation of [closing] + + {[ + let closing shared = + Atomic.set shared.closed `True; + Shared.signal shared.curr_config + ]} + + could lead to the following interleaving: +- the typer domain read `closed` as `False +- the main domain change the value of close and call signal +- the typer domain wait forever. +*) +let rec closing shared = + match Atomic.get shared.closed with + | `False -> + if Atomic.compare_and_set shared.closed `False `True then + while Atomic.get shared.closed = `Closed do + Shared.signal shared.curr_config + done + else closing shared + | `Exn exn -> raise exn + | _ -> assert false diff --git a/src/kernel/mpipeline.mli b/src/kernel/mpipeline.mli index 2586132ff1..3ba7e37d37 100644 --- a/src/kernel/mpipeline.mli +++ b/src/kernel/mpipeline.mli @@ -1,6 +1,5 @@ type t val make : Mconfig.t -> Msource.t -> t -val get : Mconfig.t -> Msource.t -> t val with_pipeline : t -> (unit -> 'a) -> 'a val for_completion : Msource.position -> t -> t @@ -34,6 +33,17 @@ module Cache : sig val get : Mconfig.t -> Mocaml.typer_state end -val close_typer : [ `True | `False | `Exn of exn ] Atomic.t +type shared = + { closed : [ `True | `False | `Closed | `Exn of exn ] Atomic.t; + curr_config : (Mconfig.t * Msource.t) option Shared.t; + partial_result : t option Shared.t; + complete_result : t option Shared.t + } -val domain_typer : unit -> unit +val create_shared : unit -> shared + +val domain_typer : shared -> unit -> unit + +val get : shared -> Mconfig.t -> Msource.t -> t + +val closing : shared -> unit diff --git a/src/kernel/shared.ml b/src/kernel/shared.ml new file mode 100644 index 0000000000..93b47eab70 --- /dev/null +++ b/src/kernel/shared.ml @@ -0,0 +1,65 @@ +type 'a t = { mutex : Mutex.t; cond : Condition.t; mutable value : 'a } + +let locking_set t a = + Mutex.protect t.mutex @@ fun () -> + t.value <- a; + Condition.signal t.cond + +let set t a = + t.value <- a; + Condition.signal t.cond +let locking_get t = Mutex.protect t.mutex @@ fun () -> t.value + +let get t = t.value + +let create a = + { mutex = Mutex.create (); cond = Condition.create (); value = a } + +let wait a = Condition.wait a.cond a.mutex +let signal a = Condition.signal a.cond +let protect a = Mutex.protect a.mutex + +(* + Design question : + + here some value are only read by one side and write by the other. The side + reading has to wait for a new value to be provided thus the use of the + condition module. However locking to do thought seems quite useless. + + + Current use : + + + let d1_work shared = + ... + Shared.set shared new_value + ... + + let d2_work shared = + match Shared.get shared with + | None -> Condition.wait shared.cond shared.mutex + | Some new_value -> do something from new_value + + + Better use : + - Shared.get could wait until a new value is written + + let await_get (t : 'a option t) = + Mutex.protect t.mutex @@ fun () -> + let rec loop () = + match t.value with + | None -> + Condition.wait t.cond t.mutex; + loop () + | Some v -> v + in + loop () + + Bad idea -> the value can then be changed as the mutex is unlock. + + + + Issue : + + + *) diff --git a/src/kernel/shared.mli b/src/kernel/shared.mli new file mode 100644 index 0000000000..88288a8d1a --- /dev/null +++ b/src/kernel/shared.mli @@ -0,0 +1,9 @@ +type 'a t = { mutex : Mutex.t; cond : Condition.t; mutable value : 'a } +val set : 'a t -> 'a -> unit +val locking_set : 'a t -> 'a -> unit +val get : 'a t -> 'a +val locking_get : 'a t -> 'a +val create : 'a -> 'a t +val signal : 'a t -> unit +val wait : 'a t -> unit +val protect : 'a t -> (unit -> 'b) -> 'b From c07b017c73f3594b95c9c7afd94697a14ffcad90 Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 20 Feb 2025 15:55:35 +0100 Subject: [PATCH 5/5] Debug lock and begin moving the computation of mpipeline closer to the point of computation (in New_commands.run) --- src/commands/new_commands.ml | 174 ++++++++++++--------- src/commands/new_commands.mli | 6 +- src/frontend/ocamlmerlin/new/new_merlin.ml | 27 ++-- src/kernel/mpipeline.ml | 69 ++++---- 4 files changed, 164 insertions(+), 112 deletions(-) diff --git a/src/commands/new_commands.ml b/src/commands/new_commands.ml index ebf1aee4ae..3b9eaeb4dc 100644 --- a/src/commands/new_commands.ml +++ b/src/commands/new_commands.ml @@ -35,7 +35,11 @@ type command = * Marg.docstring * ([ `Mandatory | `Optional | `Many ] * 'args Marg.spec) list * 'args - * (Mpipeline.t -> 'args -> json) + * (Mpipeline.shared -> + Mconfig.t -> + Msource.t -> + 'args -> + json * Mpipeline.t option) -> command let command name ?(doc = "") ~spec ~default f = @@ -92,12 +96,13 @@ let find_command name = List.find ~f:(command_is ~name) let find_command_opt name = List.find_opt ~f:(command_is ~name) -let run pipeline query = +let run shared config source query = + let pipeline = Mpipeline.get shared config source in Logger.log ~section:"New_commands" ~title:"run(query)" "%a" Logger.json (fun () -> Query_json.dump query); let result = Query_commands.dispatch pipeline query in let json = Query_json.json_of_response query result in - json + (json, Some pipeline) let all_commands = [ command "case-analysis" @@ -119,17 +124,20 @@ let all_commands = position}, content]`, where content is string.\n" ~default:(`Offset (-1), `Offset (-1)) begin - fun buffer -> function + fun shared config source -> function | `Offset -1, _ -> failwith "-start is mandatory" | _, `Offset -1 -> failwith "-end is mandatory" | startp, endp -> - run buffer (Query_protocol.Case_analysis (startp, endp)) + (* let pos = .. in *) + run shared config source + (Query_protocol.Case_analysis (startp, endp)) end; command "holes" ~spec:[] ~doc:"Returns the list of the positions of all the holes in the file." ~default:() begin - fun buffer () -> run buffer Query_protocol.Holes + fun shared config source () -> + run shared config source Query_protocol.Holes end; command "construct" ~spec: @@ -160,11 +168,12 @@ let all_commands = inferior depth will not be returned." ~default:(`Offset (-1), None, None) begin - fun buffer (pos, with_values, max_depth) -> + fun shared config source (pos, with_values, max_depth) -> match pos with | `Offset -1 -> failwith "-position is mandatory" | pos -> - run buffer (Query_protocol.Construct (pos, with_values, max_depth)) + run shared config source + (Query_protocol.Construct (pos, with_values, max_depth)) end; command "complete-prefix" ~spec: @@ -217,11 +226,11 @@ let all_commands = like signatures for modules or documentation string." ~default:("", `None, [], false, true) begin - fun buffer (txt, pos, kinds, doc, typ) -> + fun shared config source (txt, pos, kinds, doc, typ) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer + run shared config source (Query_protocol.Complete_prefix (txt, pos, List.rev kinds, doc, typ)) end; @@ -240,11 +249,11 @@ let all_commands = ] ~default:(None, `None) begin - fun buffer (ident, pos) -> + fun shared config source (ident, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Document (ident, pos)) + run shared config source (Query_protocol.Document (ident, pos)) end; command "syntax-document" ~doc: @@ -255,11 +264,11 @@ let all_commands = ] ~default:`None begin - fun buffer pos -> + fun shared config source pos -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Syntax_document pos) + run shared config source (Query_protocol.Syntax_document pos) end; command "expand-ppx" ~doc:"Returns the generated code of a PPX." ~spec: @@ -268,11 +277,11 @@ let all_commands = ] ~default:`None begin - fun buffer pos -> + fun shared config source pos -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Expand_ppx pos) + run shared config source (Query_protocol.Expand_ppx pos) end; command "enclosing" ~spec: @@ -286,11 +295,11 @@ let all_commands = the cursor.)" ~default:`None begin - fun buffer pos -> + fun shared config source pos -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Enclosing pos) + run shared config source (Query_protocol.Enclosing pos) end; command "errors" ~spec: @@ -323,8 +332,9 @@ let all_commands = `message` is the error description to be shown to the user." ~default:(true, true, true) begin - fun buffer (lexing, parsing, typing) -> - run buffer (Query_protocol.Errors { lexing; parsing; typing }) + fun shared config source (lexing, parsing, typing) -> + run shared config source + (Query_protocol.Errors { lexing; parsing; typing }) end; command "expand-prefix" ~doc: @@ -356,11 +366,11 @@ let all_commands = ] ~default:("", `None, [], false) begin - fun buffer (txt, pos, kinds, typ) -> + fun shared config source (txt, pos, kinds, typ) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer + run shared config source (Query_protocol.Expand_prefix (txt, pos, List.rev kinds, typ)) end; command "extension-list" @@ -379,13 +389,15 @@ let all_commands = a list of strings." ~default:`All begin - fun buffer status -> run buffer (Query_protocol.Extension_list status) + fun shared config source status -> + run shared config source (Query_protocol.Extension_list status) end; command "findlib-list" ~doc:"Returns all known findlib packages as a list of string." ~spec:[] ~default:() begin - fun buffer () -> run buffer Query_protocol.Findlib_list + fun shared config source () -> + run shared config source Query_protocol.Findlib_list end; command "flags-list" ~spec:[] ~doc: @@ -393,8 +405,9 @@ let all_commands = implement interactive completion of compiler settings in an IDE." ~default:() begin - fun _ () -> - `List (List.map ~f:Json.string (Mconfig.flags_for_completion ())) + fun _ _ _ () -> + ( `List (List.map ~f:Json.string (Mconfig.flags_for_completion ())), + None ) end; command "jump" ~spec: @@ -411,11 +424,11 @@ let all_commands = module or match expression that contains the cursor\n" ~default:("", `None) begin - fun buffer (target, pos) -> + fun shared config source (target, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Jump (target, pos)) + run shared config source (Query_protocol.Jump (target, pos)) end; command "phrase" ~spec: @@ -433,11 +446,11 @@ let all_commands = definition or module definition)." ~default:(`Next, `None) begin - fun buffer (target, pos) -> + fun shared config source (target, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Phrase (target, pos)) + run shared config source (Query_protocol.Phrase (target, pos)) end; command "list-modules" ~spec: @@ -449,8 +462,9 @@ let all_commands = and prints the corresponding module name." ~default:[] begin - fun buffer extensions -> - run buffer (Query_protocol.List_modules (List.rev extensions)) + fun shared config source extensions -> + run shared config source + (Query_protocol.List_modules (List.rev extensions)) end; command "locate" ~spec: @@ -481,11 +495,12 @@ let all_commands = different file." ~default:(None, `None, `MLI) begin - fun buffer (prefix, pos, lookfor) -> + fun shared config source (prefix, pos, lookfor) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Locate (prefix, lookfor, pos)) + run shared config source + (Query_protocol.Locate (prefix, lookfor, pos)) end; command "locate-type" ~spec: @@ -494,11 +509,11 @@ let all_commands = ] ~doc:"Locate the declaration of the type of the expression" ~default:`None begin - fun buffer pos -> + fun shared config source pos -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Locate_type pos) + run shared config source (Query_protocol.Locate_type pos) end; command "occurrences" ~spec: @@ -518,10 +533,11 @@ let all_commands = position." ~default:(`None, `Buffer) begin - fun buffer -> function + fun shared config source -> function | `None, _ -> failwith "-identifier-at is mandatory" | `Ident_at pos, scope -> - run buffer (Query_protocol.Occurrences (`Ident_at pos, scope)) + run shared config source + (Query_protocol.Occurrences (`Ident_at pos, scope)) end; command "outline" ~spec:[] ~doc: @@ -530,7 +546,8 @@ let all_commands = content of the buffer." ~default:() begin - fun buffer () -> run buffer Query_protocol.Outline + fun shared config source () -> + run shared config source Query_protocol.Outline end; command "path-of-source" ~doc: @@ -542,8 +559,9 @@ let all_commands = ] ~default:[] begin - fun buffer filenames -> - run buffer (Query_protocol.Path_of_source (List.rev filenames)) + fun shared config source filenames -> + run shared config source + (Query_protocol.Path_of_source (List.rev filenames)) end; command "refactor-open" ~doc:"refactor-open -position pos -action \n\tTODO" @@ -559,11 +577,12 @@ let all_commands = ] ~default:(None, `None) begin - fun buffer -> function + fun shared config source -> function | None, _ -> failwith "-action is mandatory" | _, `None -> failwith "-position is mandatory" | Some action, (#Msource.position as pos) -> - run buffer (Query_protocol.Refactor_open (action, pos)) + run shared config source + (Query_protocol.Refactor_open (action, pos)) end; command "search-by-polarity" ~doc:"search-by-polarity -position pos -query ident\n\tTODO" @@ -579,11 +598,12 @@ let all_commands = ] ~default:("", `None) begin - fun buffer (query, pos) -> + fun shared config source (query, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Polarity_search (query, pos)) + run shared config source + (Query_protocol.Polarity_search (query, pos)) end; command "search-by-type" ~doc:"return a list of values that match a query" ~spec: @@ -603,14 +623,14 @@ let all_commands = ] ~default:(None, `None, 100, false) begin - fun buffer (query, pos, limit, with_doc) -> + fun shared config source (query, pos, limit, with_doc) -> match (query, pos) with | None, `None -> failwith "-position and -query are mandatory" | None, _ -> failwith "-query is mandatory" | _, `None -> failwith "-position is mandatory" | Some query, (#Msource.position as pos) -> - run buffer + run shared config source (Query_protocol.Type_search (query, pos, limit, with_doc)) end; command "inlay-hints" @@ -645,14 +665,18 @@ let all_commands = ] ~default:(`None, `None, false, false, true) begin - fun buffer (start, stop, let_binding, pattern_binding, avoid_ghost) -> + fun shared + config + source + (start, stop, let_binding, pattern_binding, avoid_ghost) + -> match (start, stop) with | `None, `None -> failwith "-start and -end are mandatory" | `None, _ -> failwith "-start is mandatory" | _, `None -> failwith "-end is mandatory" | (#Msource.position, #Msource.position) as position -> let start, stop = position in - run buffer + run shared config source (Query_protocol.Inlay_hints (start, stop, let_binding, pattern_binding, avoid_ghost)) end; @@ -676,9 +700,10 @@ let all_commands = ] ~default:`None begin - fun buffer -> function + fun shared config source -> function | `None -> failwith "-position is mandatory" - | #Msource.position as pos -> run buffer (Query_protocol.Shape pos) + | #Msource.position as pos -> + run shared config source (Query_protocol.Shape pos) end; command "type-enclosing" ~doc: @@ -725,7 +750,7 @@ let all_commands = ] ~default:("", -1, `None, None) begin - fun buffer (expr, cursor, pos, index) -> + fun shared config source (expr, cursor, pos, index) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> @@ -737,7 +762,8 @@ let all_commands = in Some (expr, cursor) in - run buffer (Query_protocol.Type_enclosing (expr, pos, index)) + run shared config source + (Query_protocol.Type_enclosing (expr, pos, index)) end; command "type-expression" ~doc: @@ -751,11 +777,11 @@ let all_commands = ] ~default:("", `None) begin - fun buffer (expr, pos) -> + fun shared config source (expr, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as pos -> - run buffer (Query_protocol.Type_expr (expr, pos)) + run shared config source (Query_protocol.Type_expr (expr, pos)) end; (* Implemented without support from Query_protocol. This command might be refactored if it proves useful for old protocol too. *) @@ -771,25 +797,28 @@ let all_commands = ```" ~default:() begin - fun pipeline () -> + fun shared config source () -> + let pipeline = Mpipeline.get shared config source in let config = Mpipeline.final_config pipeline in - `Assoc - [ (* TODO Remove support for multiple configuration files + ( `Assoc + [ (* TODO Remove support for multiple configuration files The protocol could be changed to: 'config_file': path_to_dot_merlin_or_dune For now, if the configurator is dune, the field 'dot_merlins' will contain the path to the dune file (or jbuild, or dune-project) *) - ( "dot_merlins", - `List - (match Mconfig.(config.merlin.config_path) with - | Some path -> [ Json.string path ] - | None -> []) ); - ( "failures", - `List (List.map ~f:Json.string Mconfig.(config.merlin.failures)) - ) - ] + ( "dot_merlins", + `List + (match Mconfig.(config.merlin.config_path) with + | Some path -> [ Json.string path ] + | None -> []) ); + ( "failures", + `List + (List.map ~f:Json.string Mconfig.(config.merlin.failures)) + ) + ], + Some pipeline ) end; command "signature-help" ~doc:"Returns LSP Signature Help response" ~spec: @@ -798,7 +827,7 @@ let all_commands = ] ~default:("", `None) begin - fun buffer (_, pos) -> + fun shared config source (_, pos) -> match pos with | `None -> failwith "-position is mandatory" | #Msource.position as position -> @@ -809,7 +838,7 @@ let all_commands = active_signature_help = None } in - run buffer (Query_protocol.Signature_help sh) + run shared config source (Query_protocol.Signature_help sh) end; (* Used only for testing *) command "dump" @@ -821,12 +850,15 @@ let all_commands = ] ~default:"" ~doc:"Not for the casual user, used for debugging merlin" begin - fun pipeline what -> run pipeline (Query_protocol.Dump [ `String what ]) + fun shared config source what -> + run shared config source (Query_protocol.Dump [ `String what ]) end; (* Used only for testing *) command "dump-configuration" ~spec:[] ~default:() ~doc:"Not for the casual user, used for merlin tests" begin - fun pipeline () -> Mconfig.dump (Mpipeline.final_config pipeline) + fun shared config source () -> + let pipeline = Mpipeline.get shared config source in + (Mconfig.dump (Mpipeline.final_config pipeline), Some pipeline) end ] diff --git a/src/commands/new_commands.mli b/src/commands/new_commands.mli index 0cb3ad5b24..7890a16f95 100644 --- a/src/commands/new_commands.mli +++ b/src/commands/new_commands.mli @@ -35,7 +35,11 @@ type command = * Marg.docstring * ([ `Mandatory | `Optional | `Many ] * 'args Marg.spec) list * 'args - * (Mpipeline.t -> 'args -> json) + * (Mpipeline.shared -> + Mconfig.t -> + Msource.t -> + 'args -> + json * Mpipeline.t option) -> command val all_commands : command list diff --git a/src/frontend/ocamlmerlin/new/new_merlin.ml b/src/frontend/ocamlmerlin/new/new_merlin.ml index 0a5feabc58..4aa240d368 100644 --- a/src/frontend/ocamlmerlin/new/new_merlin.ml +++ b/src/frontend/ocamlmerlin/new/new_merlin.ml @@ -112,26 +112,29 @@ let run shared = let store = Mpipeline.Cache.get config in Local_store.open_store store; let source = Msource.make (Misc.string_of_file stdin) in - let pipeline = Mpipeline.get shared config source in + (* let pipeline = Mpipeline.get shared config source in *) let json = - let class_, message = + let class_, message, pipeline = Printexc.record_backtrace true; - match command_action pipeline command_args with - | result -> ("return", result) + match command_action shared config source command_args with + | result, pipeline -> ("return", result, pipeline) | exception Failure str -> let trace = Printexc.get_backtrace () in log ~title:"run" "Command error backtrace: %s" trace; - ("failure", `String str) + ("failure", `String str, None) | exception exn -> ( let trace = Printexc.get_backtrace () in log ~title:"run" "Command error backtrace: %s" trace; match Location.error_of_exn exn with | None | Some `Already_displayed -> - ("exception", `String (Printexc.to_string exn ^ "\n" ^ trace)) + ( "exception", + `String (Printexc.to_string exn ^ "\n" ^ trace), + None ) | Some (`Ok err) -> Location.print_main Format.str_formatter err; - ("error", `String (Format.flush_str_formatter ()))) + ("error", `String (Format.flush_str_formatter ()), None)) in + Local_store.close_store store; let cpu_time = Misc.time_spent () -. start_cpu in let gc_stats = Gc.quick_stat () in @@ -139,7 +142,9 @@ let run shared = gc_stats.heap_words * (Sys.word_size / 8) / 1_000_000 in let clock_time = (Unix.gettimeofday () *. 1000.) -. start_clock in - let timing = Mpipeline.timing_information pipeline in + let timing = + Option.fold ~none:[] ~some:Mpipeline.timing_information pipeline + in let pipeline_time = List.fold_left (fun acc (_, k) -> k +. acc) 0.0 timing in @@ -152,13 +157,17 @@ let run shared = `String (Printf.sprintf "%s: %s" section msg) in let format_timing (k, v) = (k, `Int (int_of_float (0.5 +. v))) in + let cache = + Option.fold ~none:(`Assoc []) ~some:Mpipeline.cache_information + pipeline + in `Assoc [ ("class", `String class_); ("value", message); ("notifications", `List (List.rev_map notify !notifications)); ("timing", `Assoc (List.map format_timing timing)); ("heap_mbytes", `Int heap_mbytes); - ("cache", Mpipeline.cache_information pipeline); + ("cache", cache); ("query_num", `Int !query_num) ] in diff --git a/src/kernel/mpipeline.ml b/src/kernel/mpipeline.ml index b473271aae..e291d4e7fb 100644 --- a/src/kernel/mpipeline.ml +++ b/src/kernel/mpipeline.ml @@ -393,11 +393,44 @@ let create_shared () = complete_result = Shared.create None } +(* The exchange of message on [shared.closed] is inevitable to avoid some bad + interleavings. In particular, the following implementation of [closing] + + {[ + let closing shared = + Atomic.set shared.closed `True; + Shared.signal shared.curr_config + ]} + + could lead to the following interleaving: +- the typer domain read `closed` as `False +- the main domain change the value of close and call signal +- the typer domain wait forever. +*) +let rec closing shared = + match Atomic.get shared.closed with + | `False -> + if Atomic.compare_and_set shared.closed `False `True then + while Atomic.get shared.closed == `True do + Shared.signal shared.curr_config + done + else closing shared + | `Exn exn as prev -> + if Atomic.compare_and_set shared.closed prev `True then ( + while Atomic.get shared.closed == `True do + Shared.signal shared.curr_config + done; + raise exn) + else closing shared + | `True -> failwith "Closing: `True" + | `Closed -> failwith "Closing: `Closed" + let rec share_exn (shared : shared) exn = match Atomic.get shared.closed with - | `False as prev -> - if Atomic.compare_and_set shared.closed prev (`Exn exn) then - while Atomic.get shared.closed <> `Closed do + | `False -> + let exn_v = `Exn exn in + if Atomic.compare_and_set shared.closed `False exn_v then + while Atomic.get shared.closed == exn_v do Shared.signal shared.partial_result done else share_exn shared exn @@ -419,8 +452,7 @@ let domain_typer shared () = Shared.locking_set shared.partial_result (Some pipeline); loop () with exn -> - Atomic.set shared.closed (`Exn exn); - shared.curr_config.value <- None; + share_exn shared exn; loop ()) end in @@ -435,7 +467,7 @@ let get { closed; curr_config; partial_result; _ } config source = match Atomic.get closed with | `True | `Closed -> assert false | `Exn exn -> - Atomic.set closed `Closed; + Atomic.set closed `False; raise exn | _ -> Shared.wait partial_result; @@ -446,28 +478,3 @@ let get { closed; curr_config; partial_result; _ } config source = pipeline in Shared.protect partial_result @@ fun () -> loop () - -(* The exchange of message on [shared.closed] is inevitable to avoid some bad - interleavings. In particular, the following implementation of [closing] - - {[ - let closing shared = - Atomic.set shared.closed `True; - Shared.signal shared.curr_config - ]} - - could lead to the following interleaving: -- the typer domain read `closed` as `False -- the main domain change the value of close and call signal -- the typer domain wait forever. -*) -let rec closing shared = - match Atomic.get shared.closed with - | `False -> - if Atomic.compare_and_set shared.closed `False `True then - while Atomic.get shared.closed = `Closed do - Shared.signal shared.curr_config - done - else closing shared - | `Exn exn -> raise exn - | _ -> assert false