Skip to content


In progress: fix and clarify the _length-1 queue synchronization_ sem…
Browse files Browse the repository at this point in the history
…antics and API
  • Loading branch information
lukstafi committed May 22, 2024
1 parent 486f925 commit 5a2d4e7
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 100 deletions.
240 changes: 172 additions & 68 deletions arrayjit/lib/
Original file line number Diff line number Diff line change
Expand Up @@ -45,46 +45,96 @@ module type No_device_backend = sig
be backend-specific differences. Only array entries for which [occupancy] returns true are included. *)

val link : context -> code -> routine
(** Returns the routine for the code's procedure, in a new context derived from the given context. See also
[supports_merge_buffers]. *)

val link_batch : context -> code_batch -> routine option array
(** Returns the routines for the procedures included in the code batch. All returned routines share the same
new context. *)
new context. See also [supports_merge_buffers]. *)

val unsafe_cleanup : ?unsafe_shutdown:bool -> unit -> unit
(** Cleans up all work on a backend. If [~unsafe_shutdown:true], releases resources, potentially making the
backend unusable. *)

val from_host : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from host to context and returns true. *)

val to_host : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from context to host and returns true. *)
val from_host_callback : context -> Tnode.t -> option
val to_host_callback : context -> Tnode.t -> option
val merge_callback : Tnode.t -> accum:Ops.binop -> dst:context -> src:context -> option

val merge : ?name_prefix:string -> Tnode.t -> accum:Ops.binop -> src:context -> code option
(** Merges the array from the source context into the destination context: [dst =: dst accum src]. If the
array is hosted, its state on host is undefined after this operation. (A backend may choose to use the
host array as a buffer, if that is beneficial.) [name_prefix] is prepended to the jitted function's
name. Returns [None] if the array is not in the context. *)

val merge_batch :
val compile_merges :
?name_prefixes:string array ->
occupancy:(Tnode.t -> src_n:int -> src:context -> Utils.requirement) ->
occupancy:(Tnode.t -> dst_n:int -> dst:context -> src_n:int -> src:context -> Utils.requirement) ->
Tnode.t list ->
accum:Ops.binop ->
dsts:context array ->
srcs:context array ->
(Tnode.t, code option array) Hashtbl.t
(** [merge_batch] is to [merge] as [compile_batch] is to [compile] -- see above. Includes only the merges
for a node from the given list and [src] from [srcs] for which [occupancy] does not return [Skip], and
the node is present in [src]. *)
(** Compiles the code, if any, required by [merge], as if applying [compile_batch] to [srcs] for each of
[dsts]. Includes only the merges for for which [occupancy] does not return [Skip], and the node is
present in both the destination and the source. *)

val supports_merge_buffers : bool
(** If [false], using [Ops.Merge_buffer_unsafe] in assignments will fail during {!compile} or {!link}, resp.
{!compile_batch} or {!link_batch}. If [true], each device maintains (at most) one merge buffer, that is
grown by calls to [compile_merges], to ensure that the merge buffer can fit the data arriving for
{!merge}, {!merge_unsafe} tasks. *)

module type Backend = sig
include No_device_backend

val from_host : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from host to context and returns true. This function
is synchronous in the sense that when it returns, the host data is no longer needed. Beware that
depending on the backend, calling [from_host] might even synchronize all devices of the backend. *)

val to_host : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from context to host and returns true. This function
is synchronous: returns when fully complete. Beware that depending on the backend, calling [to_host]
might even synchronize all devices of the backend. *)

val from_host_async : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from host to context and returns true. NOTE: this
function returns once it books (schedules) the copying task and it's the caller's responsibility to
synchronize the device before the host's data is overwritten. *)

val to_host_async : context -> Tnode.t -> bool
(** If the array is both hosted and in-context, copies from context to host and returns true. NOTE: this
function returns once it books (schedules) the copying task and it's the caller's responsibility to
synchronize the device before the host's data is read. *)

val merge : Tnode.t -> accum:Ops.binop -> dst:context -> src:context -> bool
(** Merges the array from the source context into the destination context: [dst =: dst accum src]. If the
array is hosted, its state on host is undefined after this operation. (A backend may choose to use the
host array as a buffer, if that is beneficial.) The corresponding tuple of [tnode, accum, dst, src] must
be in the scope of an earlier {!compile_merges} call; otherwise, [merge] will fail, even if it would
otherwise return [false]. Returns [false] if the array is not in both the [dst] and [src] contexts.
Otherwise, it synchronizes the [src] device: calls [Backend.await src.device], {i then} it schedules the
merge task on the [dst] device, then it returns [true]. NOTE: [merge] is asynchronous, it's the caller's
responsibility to not overwrite source before the merge completes. *)

val merge_unsafe : Tnode.t -> accum:Ops.binop -> dst:context -> src:context -> bool
(** Merges the array from the source context into the destination context: see {!merge} for details.
[merge_unsafe] starts with a [Backend.acknowledge src.device] call (blocking until the latest task on
[src] starts), {i then} it schedules the merge task on the [dst] device. NOTE: [merge_unsafe] is likely
to cause {i read before intended write} bugs. *)

type device

val init : device -> context

val await : device -> unit
(** Blocks till the device becomes idle, i.e. synchronizes the device. *)

val acknowledge : device -> unit
(** Blocks till the device becomes not booked (its queue becomes empty). *)

val is_idle : device -> bool
(** The device is currently waiting for work. See also {!is_booked}: if a device is idle, then it's not
booked; but a device might be both not idle and not booked. *)

val is_booked : device -> bool
(* The device queue is not empty: the next task is waiting to start execution. See also {!is_idle}. *)

val sexp_of_device : device -> Sexp.t
val num_devices : unit -> int
val get_device : ordinal:int -> device
Expand All @@ -100,10 +150,12 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
module Domain = Domain [@warning "-3"]

type device = {
next_task : ( option ref[@sexp.opaque]);
is_idle : bool ref;
next_task : ([@sexp.opaque]) option ref;
keep_spinning : bool ref;
wait_for_device : (Utils.waiter[@sexp.opaque]);
wait_for_ackn : (Utils.waiter[@sexp.opaque]);
wait_for_work : (Utils.waiter[@sexp.opaque]);
wait_for_idle : (Utils.waiter[@sexp.opaque]);
ordinal : int;
domain : (unit Domain.t[@sexp.opaque]);
Expand All @@ -118,11 +170,20 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
let init device = { device; ctx = Backend.init ~label:(name ^ " " ^ Int.to_string device.ordinal) }
let initialize = Backend.initialize
let is_initialized = Backend.is_initialized
let supports_merge_buffers = Backend.supports_merge_buffers
let is_idle device = !(device.is_idle)
let is_booked device = Option.is_some !(device.next_task)

let await device =
assert (Domain.is_main_domain ());
while !(device.keep_spinning) && not !(device.is_idle) do
device.wait_for_idle.await ()

let acknowledge device =
assert (Domain.is_main_domain ());
while !(device.keep_spinning) && Option.is_some !(device.next_task) do
device.wait_for_device.await ()
device.wait_for_ackn.await ()

let finalize { device; ctx } =
Expand All @@ -132,18 +193,20 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
let compile = Backend.compile
let compile_batch = Backend.compile_batch

let make_work device task =
let%diagn_rt_sexp work () =
acknowledge device;
if not !(device.keep_spinning) then invalid_arg "Multicore_backend: device not available";
device.next_task := Some task;
device.wait_for_work.release ()
Tnode.Work work

let link { ctx; device } code =
let task = ctx code in
let%diagn_sexp schedule () =
[%log_result "Scheduling",];
let task = task.schedule () in
let%diagn_rt_sexp work () =
await device;
if not !(device.keep_spinning) then invalid_arg "Multicore_backend.jit: device not available";
device.next_task := Some task;
device.wait_for_work.release ()
Tnode.Work work
make_work device @@ task.schedule ()
{ task with context = { ctx = task.context; device }; schedule }

Expand All @@ -152,52 +215,86 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
( ~f:(fun task ->
let%diagn_sexp schedule () =
[%log_result "Scheduling",];
let task = task.schedule () in
let%diagn_rt_sexp work () =
await device;
if not !(device.keep_spinning) then invalid_arg "Multicore_backend.jit: device not available";
device.next_task := Some task;
device.wait_for_work.release ()
Tnode.Work work
[%log_result "Scheduling from batch",];
make_work device @@ task.schedule ()
{ task with context = { ctx = task.context; device }; schedule }))

let from_host { device; ctx } =
await device;
Backend.from_host ctx
let from_host { device; ctx } tn =
match Backend.from_host_callback ctx tn with
| None -> false
| Some task ->
await device; (module Debug_runtime) task;

let to_host { device; ctx } =
await device;
Backend.to_host ctx

let merge ?name_prefix tn ~accum ~src =
let name_prefix : string = Option.value ~default:"" @@ name_prefix ~f:(fun s -> s ^ "_") in
let ord ctx = ctx.device.ordinal in
let name_prefix : string = [%string "%{name_prefix}from_dev_%{ord src#Int}"] in
Backend.merge ~name_prefix tn ~accum ~src:src.ctx

let merge_batch ?name_prefixes ~occupancy tns ~accum ~srcs =
let ord ctx = ctx.device.ordinal in
let name_prefixes =
let f prefix src = [%string "%{prefix}from_dev_%{ord src#Int}"] in
match name_prefixes with
| None -> srcs ~f:(f "")
| Some prefixes -> Array.map2_exn prefixes srcs ~f:(fun p -> f @@ p ^ "_")
let to_host { device; ctx } tn =
match Backend.to_host_callback ctx tn with
| None -> false
| Some task ->
await device; (module Debug_runtime) task;

let from_host_callback { device; ctx } tn = (Backend.from_host_callback ctx tn) ~f:(fun task -> make_work device task)

let to_host_callback { device; ctx } tn = (Backend.to_host_callback ctx tn) ~f:(fun task -> make_work device task)

let from_host_async context tn =
match from_host_callback context tn with
| None -> false
| Some task -> (module Debug_runtime) task;

let to_host_async context tn =
match to_host_callback context tn with
| None -> false
| Some task -> (module Debug_runtime) task;

let merge_callback tn ~accum ~dst ~src = (Backend.merge_callback tn ~accum ~dst:dst.ctx ~src:src.ctx) ~f:(fun task ->
make_work dst.device task)

let merge tn ~accum ~dst ~src =
match merge_callback tn ~accum ~dst ~src with
| None -> false
| Some task ->
await src.device; (module Debug_runtime) task;

let merge_unsafe tn ~accum ~dst ~src =
match merge_callback tn ~accum ~dst ~src with
| None -> false
| Some task ->
acknowledge src.device; (module Debug_runtime) task;

let compile_merges ?name_prefixes ~occupancy tns ~accum ~dsts ~srcs =
let ctxs = ~f:(fun ctx -> ctx.ctx) in
let occupancy tn ~dst_n ~dst:_ ~src_n ~src:_ =
let dst = dsts.(dst_n) in
let src = srcs.(src_n) in
occupancy tn ~dst_n ~dst ~src_n ~src
let devices, srcs = Array.unzip ( srcs ~f:(fun s -> (s.device, s.ctx))) in
let occupancy tn ~src_n ~src = occupancy tn ~src_n ~src:{ device = devices.(src_n); ctx = src } in
Backend.merge_batch ~name_prefixes ~occupancy tns ~accum ~srcs
Backend.compile_merges ?name_prefixes ~occupancy tns ~accum ~dsts:(ctxs dsts) ~srcs:(ctxs srcs)

let num_devices () = Domain.recommended_domain_count () - 1
let global_run_no = ref 0

let%debug_sexp spinup_device ~(ordinal : int) : device =
Int.incr global_run_no;
let next_task = ref None in
let is_idle = ref true in
let keep_spinning = ref true in
let wait_for_device = Utils.waiter () in
let wait_for_idle = Utils.waiter () in
let wait_for_ackn = Utils.waiter () in
let wait_for_work = Utils.waiter () in
let run_no = !global_run_no in
let debug_runtime = Utils.get_debug ("dev-" ^ Int.to_string ordinal ^ "-run-" ^ Int.to_string run_no) in
Expand All @@ -209,15 +306,21 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
wait_for_work.await ()
if !keep_spinning then ( debug_runtime @@ Option.value_exn !next_task;
let task = Option.value_exn !next_task in
next_task := None;
wait_for_device.release ())
wait_for_ackn.release ();
is_idle := false; debug_runtime task;
is_idle := true;
wait_for_idle.release ())
(* For detailed debugging of a worker operation, uncomment: *)
Expand All @@ -236,7 +339,8 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
(* Important to release after setting to not keep spinning. *)
device.wait_for_work.release ();
Domain.join device.domain;
device.wait_for_device.finalize ();
device.wait_for_idle.finalize ();
device.wait_for_ackn.finalize ();
device.wait_for_work.finalize ();
if not unsafe_shutdown then devices.(ordinal) <- spinup_device ~ordinal
Expand Down Expand Up @@ -323,8 +427,8 @@ module type Simple_backend = sig
val init : label:string -> context
val finalize : context -> unit
val unsafe_cleanup : ?unsafe_shutdown:bool -> unit -> unit
val from_host : context -> Tnode.t -> bool
val to_host : context -> Tnode.t -> bool
val from_host_callback : context -> Tnode.t -> option
val to_host_callback : context -> Tnode.t -> option

module Simple_no_device_backend (Backend : Simple_backend) : No_device_backend = struct
Expand Down

0 comments on commit 5a2d4e7

Please sign in to comment.