diff --git a/lib/_exceptions/network_partition_exception.ex b/lib/_exceptions/network_partition_exception.ex deleted file mode 100644 index 23579a6e..00000000 --- a/lib/_exceptions/network_partition_exception.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Actors.Exceptions.NetworkPartitionException do - @moduledoc """ - Error raised when the Actor already activated on another node. - """ - - defexception plug_status: 409 - - def message(_), - do: - "Unable to initialize the Actor because it is active on another Node or failed to update its status during a previous deactivation." -end diff --git a/lib/actors/actor/caller_consumer.ex b/lib/actors/actor/caller_consumer.ex index 3ee36c70..614dbfbd 100644 --- a/lib/actors/actor/caller_consumer.ex +++ b/lib/actors/actor/caller_consumer.ex @@ -667,7 +667,7 @@ defmodule Actors.Actor.CallerConsumer do Tracer.with_span "actor-lookup" do Tracer.set_attributes([{:actor_fqdn, actor_fqdn}]) - case Spawn.Cluster.Node.Registry.lookup(Actors.Actor.Entity, parent) do + case Spawn.Cluster.Node.Distributor.lookup(system_name, parent) do [{actor_ref, actor_ref_id}] -> Tracer.add_event("actor-status", [{"alive", true}]) Tracer.set_attributes([{"actor-pid", "#{inspect(actor_ref)}"}]) diff --git a/lib/actors/actor/caller_consumer_new.ex b/lib/actors/actor/caller_consumer_new.ex new file mode 100644 index 00000000..e8b25dbb --- /dev/null +++ b/lib/actors/actor/caller_consumer_new.ex @@ -0,0 +1,303 @@ +defmodule Actors.Actor.CallerConsumerNew do + @moduledoc """ + An Elixir module representing a GenStage consumer responsible for handling + events initiated by `CallerProducer` and interacting with actors in the system. + """ + use GenStage + use Retry + + require Logger + require OpenTelemetry.Tracer, as: Tracer + + alias Actors.Actor.CallerProducer + alias Actors.Config.PersistentTermConfig, as: Config + alias Actors.Actor.Entity, as: ActorEntity + alias Actors.Actor.Entity.Supervisor, as: ActorEntitySupervisor + alias Actors.Actor.InvocationScheduler + + alias Eigr.Functions.Protocol.Actors.{ + Actor, + ActorId, + ActorSystem, + Registry + } + + alias Eigr.Functions.Protocol.{ + InvocationRequest, + ProxyInfo, + RegistrationRequest, + RegistrationResponse, + RequestStatus, + ServiceInfo, + SpawnRequest, + SpawnResponse + } + + alias Spawn.Cluster.Node.Distributor + + alias Sidecar.Measurements + + import Spawn.Utils.Common, only: [to_existing_atom_or_new: 1] + + @activate_actors_min_demand 0 + @activate_actors_max_demand 4 + + @erpc_timeout 5_000 + + @doc """ + Registers actors in the system based on the provided registration request. + + Handles registration requests and ensures actors are properly registered. + """ + def register( + %RegistrationRequest{ + service_info: %ServiceInfo{} = _service_info, + actor_system: + %ActorSystem{name: name, registry: %Registry{actors: actors} = _registry} = + actor_system + } = _registration, + _opts + ) do + size = length(actors) + Logger.info("Registering #{inspect(size)} Actors on ActorSystem #{name}") + + if Sidecar.GracefulShutdown.running?() do + case Distributor.register(actor_system) do + :ok -> + status = %RequestStatus{status: :OK, message: "Accepted"} + {:ok, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}} + + _ -> + status = %RequestStatus{ + status: :ERROR, + message: "Failed to register one or more Actors" + } + + {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}} + end + else + status = %RequestStatus{ + status: :ERROR, + message: "You can't register actors when node is stopping" + } + + {:error, %RegistrationResponse{proxy_info: get_proxy_info(), status: status}} + end + end + + @doc """ + Gets the state of the specified actor. + + This function attempts to retrieve the state of the actor identified by the given + `ActorId`. It uses an exponential backoff strategy for retrying in case of errors + and logs any failures. + + ## Parameters + + - `id` (%ActorId): The unique identifier of the actor. + + ## Returns + + The state of the actor if successful, otherwise an error is raised. + + ## Retry Strategy + + The function utilizes an exponential backoff strategy with randomized delays and + a maximum expiry time of 30,000 milliseconds. + + ## Errors + + The function handles errors such as `:error`, `:exit`, `:noproc`, `:erpc`, + `:noconnection`, and `:timeout`. It also rescues `ErlangError` exceptions and logs + detailed error messages. + + """ + def get_state(%ActorId{name: actor_name, system: system_name} = id) do + end + + @doc """ + Performs a readiness check for a given actor identified by `%ActorId{}`. + + This function uses a retry mechanism with exponential backoff, randomization, and a 30-second expiry to handle errors and failures gracefully. + It attempts to check the readiness of the specified actor, logging any errors encountered during the process. + + ## Parameters + + - `id`: An `%ActorId{}` struct that contains: + - `name`: The name of the actor. + - `system`: The name of the system the actor belongs to. + + ## Returns + + - `{:ok, %HealthCheckReply{}}` if the readiness check is successful. The `HealthCheckReply` struct contains: + - `status`: A `HealthcheckStatus` struct with: + - `status`: A string indicating the status, e.g., "OK". + - `details`: A string providing additional details, e.g., "I'm alive!". + - `updated_at`: A `Google.Protobuf.Timestamp` indicating the last update time. + - An error tuple (e.g., `{:error, :noproc}`) if the readiness check fails after all retry attempts. + + ## Examples + + iex> readiness(%ActorId{name: "actor1", system: "system1"}) + {:ok, + %HealthCheckReply{ + status: %HealthcheckStatus{ + status: "OK", + details: "I'm alive!", + updated_at: %Google.Protobuf.Timestamp{seconds: 1717606730} + } + }} + + iex> readiness(%ActorId{name: "nonexistent_actor", system: "system1"}) + {:error, :noproc} + + ## Notes + + The retry mechanism handles the following cases: `:error`, `:exit`, `:noproc`, `:erpc`, `:noconnection`, and `:timeout`. It rescues only `ErlangError`. + + The readiness check is performed by calling `ActorEntity.readiness/2` on the actor reference obtained through `do_lookup_action/4`. + + Any errors during the readiness check are logged with a message indicating the actor's name and the error encountered. + """ + @spec readiness(ActorId.t()) :: {:ok, HealthCheckReply.t()} | {:error, any()} + def readiness(%ActorId{name: actor_name, system: system_name} = id) do + end + + @doc """ + Performs a liveness check for a given actor identified by `%ActorId{}`. + + This function uses a retry mechanism with exponential backoff, randomization, and a 30-second expiry to handle errors and failures gracefully. + It attempts to check the liveness of the specified actor, logging any errors encountered during the process. + + ## Parameters + + - `id`: An `%ActorId{}` struct that contains: + - `name`: The name of the actor. + - `system`: The name of the system the actor belongs to. + + ## Returns + + - `{:ok, %HealthCheckReply{}}` if the liveness check is successful. The `HealthCheckReply` struct contains: + - `status`: A `HealthcheckStatus` struct with: + - `status`: A string indicating the status, e.g., "OK". + - `details`: A string providing additional details, e.g., "I'm alive!". + - `updated_at`: A `Google.Protobuf.Timestamp` indicating the last update time. + - An error tuple (e.g., `{:error, :noproc}`) if the liveness check fails after all retry attempts. + + ## Examples + + iex> liveness(%ActorId{name: "actor1", system: "system1"}) + {:ok, + %HealthCheckReply{ + status: %HealthcheckStatus{ + status: "OK", + details: "I'm still alive!", + updated_at: %Google.Protobuf.Timestamp{seconds: 1717606837} + } + }} + + iex> liveness(%ActorId{name: "nonexistent_actor", system: "system1"}) + {:error, :noproc} + + ## Notes + + The retry mechanism handles the following cases: `:error`, `:exit`, `:noproc`, `:erpc`, `:noconnection`, and `:timeout`. It rescues only `ErlangError`. + + The liveness check is performed by calling `ActorEntity.liveness/2` on the actor reference obtained through `do_lookup_action/4`. + + Any errors during the liveness check are logged with a message indicating the actor's name and the error encountered. + """ + @spec liveness(ActorId.t()) :: {:ok, HealthCheckReply.t()} | {:error, any()} + def liveness(%ActorId{name: actor_name, system: system_name} = id) do + end + + @doc """ + Spawns an actor or a group of actors based on the provided `SpawnRequest`. + + This function is responsible for spawning actors based on the specified `SpawnRequest`. + It retrieves the hosts associated with the provided actor IDs and registers the actors. + Additionally, it handles cases where the system is in the process of draining or stopping. + + ## Parameters + + - `spawn` (%SpawnRequest): The request containing information about the actors to spawn. + - `opts` (Keyword.t): Additional options for spawning the actors. Defaults to an empty keyword list. + + ## Returns + + If successful, it returns `{:ok, %SpawnResponse{status: %RequestStatus{status: :OK, message: "Accepted"}}}`. + Otherwise, an error is raised. + + ## Actor Spawning Process + + - Retrieves actor hosts based on actor IDs from the `ActorRegistry`. + - Filters the hosts based on the system's graceful shutdown status. + - Registers the selected hosts in the `ActorRegistry`. + - Returns a success response. + + ## Errors + + - Raises an `ArgumentError` if attempting to spawn an unnamed actor that has not been registered before. + + """ + def spawn_actor(spawn, opts \\ []) + + def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do + end + + @doc """ + Invokes an actor action with distributed tracing using OpenTelemetry. + + This function performs an actor action invocation, incorporating distributed tracing + with OpenTelemetry. It sets up the tracing context, adds relevant attributes, + and handles asynchronous and synchronous invocations. + + ## Parameters + + - `request` (%InvocationRequest): The request containing information about the invocation. + - `opts` (Keyword.t): Additional options for the invocation. Defaults to an empty keyword list. + + ## Returns + + A tuple containing the status and the result of the invocation. + If the invocation is asynchronous, it returns `{:ok, :async}`. + + ## Tracing Context + + The function sets up the tracing context and adds attributes related to the invocation. + It uses OpenTelemetry to trace the client invoke with the kind set to `:client`. + + ## Retry Mechanism + + The function incorporates a retry mechanism with backoff, randomization, and timeout + to handle potential errors during the invocation. + + ## Error Handling + + In case of errors during the invocation, appropriate logging and tracing events are added, + and the error is re-raised with a stack trace. + + """ + def invoke_with_span( + %InvocationRequest{ + actor: %Actor{id: %ActorId{name: _name, system: _actor_id_system} = actor_id} = actor, + system: %ActorSystem{} = system, + action_name: action_name, + async: async?, + metadata: metadata, + caller: caller, + pooled: pooled? + } = request, + opts + ) do + end + + defp get_proxy_info() do + %ProxyInfo{ + protocol_major_version: 1, + protocol_minor_version: 2, + proxy_name: "spawn", + proxy_version: "1.4.1" + } + end +end diff --git a/lib/actors/actor/entity/entity.ex b/lib/actors/actor/entity/entity.ex index a42e850d..48409731 100644 --- a/lib/actors/actor/entity/entity.ex +++ b/lib/actors/actor/entity/entity.ex @@ -494,6 +494,6 @@ defmodule Actors.Actor.Entity do defp reply_to_noreply({:noreply, _response, state, opts}), do: {:noreply, state, opts} defp via(name) do - {:via, Horde.Registry, {Spawn.Cluster.Node.Registry, {__MODULE__, name}}} + {:via, Horde.Registry, {Spawn.Cluster.Node.Distributor, {__MODULE__, name}}} end end diff --git a/lib/actors/actor/entity/lifecycle.ex b/lib/actors/actor/entity/lifecycle.ex index fe743049..f458bcf0 100644 --- a/lib/actors/actor/entity/lifecycle.ex +++ b/lib/actors/actor/entity/lifecycle.ex @@ -7,7 +7,6 @@ defmodule Actors.Actor.Entity.Lifecycle do alias Actors.Actor.{Entity.EntityState, Entity.Invocation, StateManager} alias Actors.Actor.Pubsub - alias Actors.Exceptions.NetworkPartitionException alias Eigr.Functions.Protocol.Actors.{ Actor, @@ -39,8 +38,7 @@ defmodule Actors.Actor.Entity.Lifecycle do %ActorSettings{ stateful: stateful?, snapshot_strategy: snapshot_strategy, - deactivation_strategy: deactivation_strategy, - kind: kind + deactivation_strategy: deactivation_strategy } = _settings, timer_actions: timer_actions } @@ -48,72 +46,31 @@ defmodule Actors.Actor.Entity.Lifecycle do ) do Process.flag(:trap_exit, true) - split_brain_detector_mod = - Application.get_env(:spawn, :split_brain_detector, Actors.Node.DefaultSplitBrainDetector) - Logger.notice( "Activating Actor #{inspect(name)} with Parent #{inspect(parent)} in Node #{inspect(Node.self())}. Persistence #{inspect(stateful?)}." ) - actor_name_key = - if kind == :POOLED do - parent - else - name - end - :ok = handle_metadata(name, system, metadata) :ok = Invocation.handle_timers(timer_actions, system, state.actor) - :ok = - Spawn.Cluster.Node.Registry.update_entry_value( - Actors.Actor.Entity, - actor_name_key, - self(), - state.actor.id - ) - schedule_deactivate(deactivation_strategy, get_jitter()) state = case maybe_schedule_snapshot_advance(snapshot_strategy) do {:ok, timer} -> - %EntityState{ - state - | opts: - Keyword.merge(state.opts, - timer: timer, - split_brain_detector: split_brain_detector_mod - ) - } + %EntityState{state | opts: Keyword.merge(state.opts, timer: timer)} _ -> - %EntityState{ - state - | opts: - Keyword.merge(state.opts, - split_brain_detector: split_brain_detector_mod - ) - } + state end {:ok, state, {:continue, :load_state}} end - def load_state(%EntityState{actor: actor, revision: revision, opts: opts} = state) do + def load_state(%EntityState{actor: actor, revision: revision, opts: _opts} = state) do case get_state(actor.id, revision) do - {:ok, current_state, current_revision, status, node} -> - split_brain_detector = - Keyword.get(opts, :split_brain_detector, Actors.Node.DefaultSplitBrainDetector) - - case check_partition(actor.id, status, node, split_brain_detector) do - :continue -> - {:noreply, updated_state(state, current_state, current_revision), - {:continue, :call_init_action}} - - {:network_partition_detected, error} -> - handle_network_partition(actor.id, error) - end + {:ok, _current_state, _current_revision, _status, _node} -> + {:continue, :call_init_action} {:not_found, %{}, _current_revision} -> Logger.debug("Not found state on statestore for Actor #{inspect(actor.id)}.") @@ -284,27 +241,6 @@ defmodule Actors.Actor.Entity.Lifecycle do %EntityState{state | actor: %Actor{actor | state: actual_state}, revision: revision} end - defp check_partition(id, status, node, split_brain_detector) do - case split_brain_detector.check_network_partition(id, status, node) do - {:ok, :continue} -> - :continue - - {:error, :network_partition_detected} -> - {:network_partition_detected, :network_partition_detected} - - error -> - {:network_partition_detected, error} - end - end - - defp handle_network_partition(id, error) do - Logger.warning( - "We have detected a possible network partition issue for Actor #{inspect(id)}. This actor will not start. Details: #{inspect(error)}" - ) - - raise NetworkPartitionException - end - defp handle_load_state_error(id, state, error) do Logger.error("Error on load state for Actor #{inspect(id)}. Error: #{inspect(error)}") {:noreply, state, {:continue, :call_init_action}} diff --git a/lib/actors/actor/invocation_scheduler.ex b/lib/actors/actor/invocation_scheduler.ex index 2cc7595d..6b599bb4 100644 --- a/lib/actors/actor/invocation_scheduler.ex +++ b/lib/actors/actor/invocation_scheduler.ex @@ -11,7 +11,6 @@ defmodule Actors.Actor.InvocationScheduler do require Logger - alias Spawn.Cluster.StateHandoff.InvocationSchedulerState alias Eigr.Functions.Protocol.InvocationRequest @hibernate_delay 20_000 @@ -28,8 +27,8 @@ defmodule Actors.Actor.InvocationScheduler do def handle_continue(:init_invocations, state) do schedule_hibernate() - InvocationSchedulerState.all() - |> Enum.each(&call_invoke/1) + # InvocationSchedulerState.all() + # |> Enum.each(&call_invoke/1) {:noreply, state} end @@ -42,7 +41,8 @@ defmodule Actors.Actor.InvocationScheduler do @impl true def handle_info({:invoke, decoded_request, scheduled_to, repeat_in}, state) do if is_nil(repeat_in) do - InvocationSchedulerState.remove(InvocationRequest.encode(decoded_request)) + # InvocationSchedulerState.remove(InvocationRequest.encode(decoded_request)) + nil else scheduled_to = DateTime.add(scheduled_to, repeat_in, :millisecond) @@ -62,10 +62,10 @@ defmodule Actors.Actor.InvocationScheduler do @impl true def handle_cast({:schedule, request, scheduled_to, repeat_in}, state) do - encoded_request = + _encoded_request = InvocationRequest.encode(%InvocationRequest{request | scheduled_to: 0, async: true}) - InvocationSchedulerState.put(encoded_request, scheduled_to, repeat_in) + # InvocationSchedulerState.put(encoded_request, scheduled_to, repeat_in) call_invoke({request, {scheduled_to, repeat_in}}) @@ -73,21 +73,21 @@ defmodule Actors.Actor.InvocationScheduler do end @impl true - def handle_cast({:schedule_fixed, requests}, state) do - requests = - Enum.reduce(requests, [], fn {request, scheduled_to, repeat_in}, acc -> - encoded_request = InvocationRequest.encode(request) + def handle_cast({:schedule_fixed, _requests}, state) do + # requests = + # Enum.reduce(requests, [], fn {request, scheduled_to, repeat_in}, acc -> + # encoded_request = InvocationRequest.encode(request) - if is_nil(InvocationSchedulerState.get(encoded_request)) do - call_invoke({request, {scheduled_to, repeat_in}}) + # if is_nil(InvocationSchedulerState.get(encoded_request)) do + # call_invoke({request, {scheduled_to, repeat_in}}) - acc ++ [{encoded_request, scheduled_to, repeat_in}] - else - acc - end - end) + # acc ++ [{encoded_request, scheduled_to, repeat_in}] + # else + # acc + # end + # end) - InvocationSchedulerState.put_many(requests) + # InvocationSchedulerState.put_many(requests) {:noreply, state} end diff --git a/lib/actors/split_brain_detector.ex b/lib/actors/split_brain_detector.ex deleted file mode 100644 index a6bfdf7d..00000000 --- a/lib/actors/split_brain_detector.ex +++ /dev/null @@ -1,14 +0,0 @@ -defmodule Actors.SplitBrainDetector do - @moduledoc """ - - """ - @type status :: String.t() - @type node_id :: String.t() - @type actor_id :: Eigr.Functions.Protocol.Actors.ActorId.t() - - @callback check_network_partition(actor_id(), status(), node_id()) :: - {:ok, :continue} | {:error, :network_partition_detected} - - @callback check_network_partition!(actor_id(), status(), node_id()) :: - {:ok, :continue} | Exception.t() -end diff --git a/lib/spawn/cluster/node/distributor.ex b/lib/spawn/cluster/node/distributor.ex new file mode 100644 index 00000000..a72ec99b --- /dev/null +++ b/lib/spawn/cluster/node/distributor.ex @@ -0,0 +1,198 @@ +defmodule Spawn.Cluster.Node.Distributor do + @moduledoc """ + Defines a distributed registry for all process. + """ + require Logger + + alias Actors.Config.PersistentTermConfig, as: Config + + alias ProcessHub.Service.ProcessRegistry + + alias Eigr.Functions.Protocol.Actors.Actor + alias Eigr.Functions.Protocol.Actors.ActorId + alias Eigr.Functions.Protocol.Actors.ActorSystem + alias Eigr.Functions.Protocol.Actors.Registry + + @actor_table :actor_registry + + def init() do + :ets.new(@actor_table, [:named_table, :set, :public]) + end + + def nodes(system), do: ProcessHub.nodes(system) + + def child_specs() do + [ + registry_actors_distributor_process_hub(), + init_registry_process(), + normal_actors_distributor_process_hub(), + pooled_actors_distributor_process_hub() + ] + end + + @doc """ + Registers a `Actors` in a register. + """ + def register(%ActorSystem{name: name, registry: %Registry{actors: actors}} = system) do + nodes = nodes("registry_#{name}") + {res, failed_nodes} = :rpc.multicall(nodes, __MODULE__, :register_local, [system]) + :ets.insert(@actor_table, {name, actors}) + Logger.info("Registered actor system with name: #{name}") + :ok + end + + @doc """ + Searches for an actor by system name and actor_id on a local ETS table. + """ + def find_actor(system_name, %ActorId{} = actor_id) do + case :ets.lookup(@actor_table, system_name) do + [] -> + {:error, :not_found} + + [{^system_name, actors}] -> + case Enum.find(actors, fn %Actor{id: %ActorId{name: name} = _id} -> + name == actor_id.name + end) do + nil -> + {:error, :actor_not_found} + + actor -> + {:ok, actor} + end + end + end + + @doc """ + Get Process if this is alive. + """ + def lookup(system, actor_name) do + case ProcessRegistry.lookup(system, actor_name) do + nil -> + :not_found + + {%{id: ^actor_name, start: {_module, _fun, _args}}, lookups} -> + pid = + lookups + |> Enum.map(fn {_node, pid} -> pid end) + |> List.first() + + {:ok, pid} + + error -> + {:error, error} + end + end + + @doc """ + Check if Process is alive. + """ + def is_alive?(_mod, _actor_name) do + end + + @doc """ + List all alive PIDs from given registry module. + """ + @spec list_actor_pids(module()) :: list(pid()) + def list_actor_pids(_mod) do + end + + defp init_registry_process() do + %{ + id: :initializer_registry_process, + start: + {Task, :start, + [ + fn -> + Process.flag(:trap_exit, true) + + ProcessHub.start_children( + "registry_#{Config.get(:actor_system_name)}", + [ + %{ + id: Spawn.Cluster.Node.GlobalRegistry, + start: {Spawn.Cluster.Node.GlobalRegistry, :start_link, [%{}]} + } + ], + child_mapping: %{ + self: [Node.self()] + } + ) + + receive do + {:EXIT, _pid, reason} -> + Logger.info( + "[SUPERVISOR] Initializer Registry Process: #{inspect(self())} is successfully down with reason #{inspect(reason)}" + ) + + :ok + end + end + ]} + } + end + + defp registry_actors_distributor_process_hub() do + {ProcessHub, + %ProcessHub{ + hub_id: "registry_#{Config.get(:actor_system_name)}", + redundancy_strategy: %ProcessHub.Strategy.Redundancy.Replication{ + # TODO get from config + replication_factor: 2, + replication_model: :active_active, + redundancy_signal: :none + }, + synchronization_strategy: %ProcessHub.Strategy.Synchronization.PubSub{ + # TODO get from config + sync_interval: 10000 + }, + distribution_strategy: %ProcessHub.Strategy.Distribution.Guided{} + }} + end + + defp normal_actors_distributor_process_hub() do + {ProcessHub, + %ProcessHub{ + hub_id: Config.get(:actor_system_name), + redundancy_strategy: %ProcessHub.Strategy.Redundancy.Replication{ + # TODO get from config + replication_factor: 2, + replication_model: :active_passive, + redundancy_signal: :none + }, + migration_strategy: %ProcessHub.Strategy.Migration.HotSwap{ + # TODO get from config + retention: 2000, + handover: true + }, + synchronization_strategy: %ProcessHub.Strategy.Synchronization.Gossip{ + # TODO get from config + sync_interval: 10000, + # TODO get from config + recipients: 2 + }, + partition_tolerance_strategy: %ProcessHub.Strategy.PartitionTolerance.DynamicQuorum{ + # TODO get from config + quorum_size: 2 + }, + distribution_strategy: %ProcessHub.Strategy.Distribution.Guided{} + }} + end + + defp pooled_actors_distributor_process_hub() do + {ProcessHub, + %ProcessHub{ + hub_id: "pooled_#{Config.get(:actor_system_name)}", + redundancy_strategy: %ProcessHub.Strategy.Redundancy.Replication{ + replication_model: :active_active, + redundancy_signal: :none + }, + synchronization_strategy: %ProcessHub.Strategy.Synchronization.PubSub{ + sync_interval: 10000 + }, + partition_tolerance_strategy: %ProcessHub.Strategy.PartitionTolerance.DynamicQuorum{ + quorum_size: 2 + }, + distribution_strategy: %ProcessHub.Strategy.Distribution.Guided{} + }} + end +end diff --git a/lib/spawn/cluster/node/global_registry.ex b/lib/spawn/cluster/node/global_registry.ex new file mode 100644 index 00000000..f965d41e --- /dev/null +++ b/lib/spawn/cluster/node/global_registry.ex @@ -0,0 +1,14 @@ +defmodule Spawn.Cluster.Node.GlobalRegistry do + @moduledoc """ + + """ + use GenServer + + def start_link(state) do + GenServer.start_link(__MODULE__, state) + end + + def init(state) do + {:ok, state} + end +end diff --git a/lib/spawn/cluster/node/registry.ex b/lib/spawn/cluster/node/registry.ex deleted file mode 100644 index 18389d56..00000000 --- a/lib/spawn/cluster/node/registry.ex +++ /dev/null @@ -1,61 +0,0 @@ -defmodule Spawn.Cluster.Node.Registry do - @moduledoc """ - Defines a distributed registry for all process - """ - use Horde.Registry - - def child_spec() do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [%{}]} - } - end - - @doc false - def start_link(_) do - Horde.Registry.start_link(__MODULE__, [keys: :unique, members: :auto], name: __MODULE__) - end - - @impl true - def init(args) do - Horde.Registry.init(args) - end - - @doc """ - Get Process if this is alive. - """ - def lookup(mod, actor_name) do - Horde.Registry.lookup(__MODULE__, {mod, actor_name}) - end - - @doc """ - Update value meta inside registry - """ - def update_entry_value(mod, actor_name, pid, value) do - GenServer.call(__MODULE__, {:update_value, {mod, actor_name}, pid, value}) - end - - @doc """ - Check if Process is alive. - """ - def is_alive?(mod, actor_name) do - case Horde.Registry.lookup(__MODULE__, {mod, actor_name}) do - [] -> false - _ -> true - end - end - - @doc """ - List all alive PIDs from given registry module. - """ - @spec list_actor_pids(module()) :: list(pid()) - def list_actor_pids(mod) do - Horde.Registry.select(__MODULE__, [{{:"$1", :_, :_}, [], [:"$1"]}]) - |> Stream.filter(fn {mod_name, _} -> mod_name == mod end) - # |> Stream.map(&Horde.Registry.match(__MODULE__, &1, :_)) - # |> Stream.scan([], fn [{pid, _}], acc -> [pid | acc] end) - # |> Stream.flat_map(& &1) - # |> Stream.uniq() - |> Enum.to_list() - end -end diff --git a/lib/spawn/cluster/state_handoff/controller_behaviour.ex b/lib/spawn/cluster/state_handoff/controller_behaviour.ex deleted file mode 100644 index 95c9064e..00000000 --- a/lib/spawn/cluster/state_handoff/controller_behaviour.ex +++ /dev/null @@ -1,67 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.ControllerBehaviour do - @moduledoc """ - Behavior for managing the state of clustered processes a.k.a lookups. - """ - @type node_type :: term() - - @type opts :: Keyword.t() - - @type data :: any() - - @type new_data :: data() - - @type id :: Eigr.Functions.Protocol.Actors.ActorId.t() - - @type host :: Actors.Registry.HostActor.t() - - @type hosts :: list(Actors.Registry.HostActor.t()) - - @type timer :: {atom(), integer()} - - @doc """ - Cleanup action. - """ - @callback clean(node(), data()) :: any() - - @doc """ - Fetch the ActorHost process reference by id. - In case `id` is the hash of the ActorId but here the struct ActorId is passed as a parameter. - An implementations must handle this. - """ - @callback get_by_id(id(), data()) :: {new_data(), hosts()} - - @doc """ - Any initialization code required by implementations of this behavior. - Must return the state to be added in the StateHandoffManager. - """ - @callback handle_init(opts()) :: new_data() | {new_data(), timer()} - - @doc """ - Any procedure to be executed after the StateHandoffManager is initialized. - Executed during callback call to handle_continue. - """ - @callback handle_after_init(data()) :: new_data() - - @doc """ - Perform any necessary cleanups during StateHandoffManager termination. - Generally excluding references to all processes owned by the terminating node. - """ - @callback handle_terminate(node(), data()) :: new_data() - - @callback handle_timer(any(), data()) :: new_data() | {new_data(), timer()} - - @doc """ - If necessary any procedure to be executed during a nodeup event - """ - @callback handle_nodeup_event(node(), node_type(), data()) :: new_data() - - @doc """ - If necessary any procedure to be executed during a nodedown event - """ - @callback handle_nodedown_event(node(), node_type(), data()) :: new_data() - - @doc """ - Adds a reference to an ActorHost process. - """ - @callback set(id(), node(), host(), data) :: new_data() -end diff --git a/lib/spawn/cluster/state_handoff/controllers/crdt_controller.ex b/lib/spawn/cluster/state_handoff/controllers/crdt_controller.ex deleted file mode 100644 index 5cb69e51..00000000 --- a/lib/spawn/cluster/state_handoff/controllers/crdt_controller.ex +++ /dev/null @@ -1,218 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.Controllers.CrdtController do - @moduledoc """ - This handles state handoff in a cluster. - - It uses the DeltaCrdt library to handle a distributed state, which is an eventually consistent replicated data type. - The module starts a GenServer that monitors nodes in the cluster, and when a new node comes up it sends a "set_neighbours" - message to that node's GenServer process with its own DeltaCrdt process ID. This is done to ensure that changes in either node's - state are reflected across both. - - The module also handles other messages like "handoff" and "get" to put and retrieve data from the DeltaCrdt state, respectively. - """ - require Iter - require Logger - - alias Actors.Config.PersistentTermConfig, as: Config - alias Eigr.Functions.Protocol.Actors.Actor - - import Spawn.Utils.Common, only: [generate_key: 1, actor_host_hash: 0] - - @behaviour Spawn.Cluster.StateHandoff.ControllerBehaviour - - @type node_type :: term() - - @type opts :: Keyword.t() - - @type data :: any() - - @type new_data :: data() - - @type id :: Eigr.Functions.Protocol.Actors.ActorId.t() - - @type host :: Actors.Registry.HostActor.t() - - @type hosts :: list(Actors.Registry.HostActor.t()) - - @type timer :: {atom(), integer()} - - @call_timeout 15_000 - - def get_crdt_pid do - :persistent_term.get(__MODULE__, {:error, Node.self()}) - end - - @doc """ - Cluster HostActor cleanup - """ - @impl true - def clean(node, %{crdt_pid: crdt_pid} = data) do - Logger.debug("Received cleanup action from Node #{inspect(node)}") - - keys = - crdt_pid - |> DeltaCrdt.to_map() - |> Iter.filter(fn {_key, [host]} -> host.node == node end) - |> Iter.map(fn {key, _value} -> key end) - - DeltaCrdt.drop(crdt_pid, keys) - - Logger.debug("Hosts (#{Enum.count(keys)}) cleaned for node #{inspect(node)}") - - data - end - - @impl true - @spec get_by_id(id(), data()) :: {new_data(), hosts()} - def get_by_id(id, %{crdt_pid: crdt_pid} = data) do - key = generate_key(id) - - hosts = - case DeltaCrdt.get(crdt_pid, key, :infinity) do - [host] -> - [%{host | actor: Actor.decode(host.actor)}] - - nil -> - [] - end - - {data, hosts} - end - - @impl true - @spec handle_init(opts()) :: new_data() | {new_data(), timer()} - def handle_init(_opts) do - pooling_interval = Config.get(:neighbours_sync_interval) - - {:ok, crdt_pid} = - DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, - sync_interval: Config.get(:sync_interval), - ship_interval: Config.get(:ship_interval), - ship_debounce: Config.get(:ship_debounce) - ) - - :persistent_term.put(__MODULE__, crdt_pid) - - { - %{crdt_pid: crdt_pid, neighbours_sync_interval: pooling_interval}, - {:set_neighbours_sync, pooling_interval} - } - end - - @impl true - @spec handle_after_init(data()) :: new_data() - def handle_after_init(%{crdt_pid: crdt_pid} = data) do - do_set_neighbours(crdt_pid) - data - end - - @impl true - @spec handle_terminate(node(), data()) :: new_data() - def handle_terminate(_node, %{crdt_pid: crdt_pid} = _data) do - Logger.debug("#{inspect(__MODULE__)} Handling StateHandoff terminate...") - :persistent_term.erase(__MODULE__) - - %{crdt_pid: crdt_pid} - end - - def handle_terminate(node, data) do - Logger.warning("Invalid terminate state for Node #{inspect(node)}. State: #{inspect(data)}") - end - - @impl true - @spec handle_timer(any(), data()) :: new_data() | {new_data(), timer()} - def handle_timer( - :set_neighbours_sync, - %{crdt_pid: crdt_pid, neighbours_sync_interval: pooling_interval} = data - ) do - if Sidecar.GracefulShutdown.running?() do - do_set_neighbours(crdt_pid) - end - - {data, {:set_neighbours_sync, pooling_interval}} - end - - def handle_timer(_event, data), do: data - - @impl true - @spec handle_nodeup_event(node(), node_type(), data()) :: new_data() - def handle_nodeup_event(_node, _node_type, %{crdt_pid: crdt_pid} = _data) do - if Sidecar.GracefulShutdown.running?() do - do_set_neighbours(crdt_pid) - end - - %{crdt_pid: crdt_pid} - end - - @impl true - @spec handle_nodedown_event(node(), node_type(), data()) :: new_data() - def handle_nodedown_event(node, _node_type, %{crdt_pid: crdt_pid} = _data) do - if Sidecar.GracefulShutdown.running?() do - take_ownership(node, crdt_pid) - end - - do_set_neighbours(crdt_pid) - %{crdt_pid: crdt_pid} - end - - @impl true - @spec set(id(), node(), host(), data) :: new_data() - def set(id, _node, host, %{crdt_pid: crdt_pid} = data) do - key = generate_key(id) - - host = %{host | actor: Actor.encode(host.actor)} - - DeltaCrdt.put(crdt_pid, key, [host], :infinity) - - data - end - - defp take_ownership(node, crdt_pid) do - Logger.debug(" #{inspect(node)}") - - registers = - crdt_pid - |> DeltaCrdt.to_map() - |> Iter.filter(fn {_key, [host]} -> - host.node == node and Keyword.get(host.opts, :hash) == actor_host_hash() - end) - |> Iter.map(fn {key, [value]} -> {key, [%{value | node: Node.self()}]} end) - |> Iter.into(%{}) - - DeltaCrdt.merge(crdt_pid, registers) - - Logger.debug( - "Took ownership of (#{Enum.count(registers)}) registers from node #{inspect(node)}" - ) - end - - defp do_set_neighbours(this_crdt_pid) do - nodes = Node.list() - - Logger.notice("Sending :set_neighbours to #{inspect(nodes)} for #{inspect(this_crdt_pid)}") - - neighbours = - :erpc.multicall(nodes, __MODULE__, :get_crdt_pid, [], @call_timeout) - |> Enum.map(fn - {:ok, {:error, node}} -> - Logger.warning("The node failed to retrieve DeltaCrdt pid -> #{inspect(node)}") - - nil - - {:ok, crdt_pid} -> - crdt_pid - - error -> - Logger.warning( - "Couldn't reach one of the nodes when calling for neighbors -> #{inspect(error)}" - ) - - nil - end) - |> Enum.reject(&is_nil/1) - - # add other_node's crdt_pid as a neighbour - # we are not adding both ways and letting them sync with eachother - # based on current Node.list() of each node - DeltaCrdt.set_neighbours(this_crdt_pid, neighbours) - end -end diff --git a/lib/spawn/cluster/state_handoff/controllers/persistent_controller.ex b/lib/spawn/cluster/state_handoff/controllers/persistent_controller.ex deleted file mode 100644 index 7b43425e..00000000 --- a/lib/spawn/cluster/state_handoff/controllers/persistent_controller.ex +++ /dev/null @@ -1,92 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.Controllers.PersistentController do - @moduledoc """ - `StateHandoffPersistentController` is a StateHandoff Controller basead on `Statestore` mechanism. - """ - use Nebulex.Caching - require Logger - - @behaviour Spawn.Cluster.StateHandoff.ControllerBehaviour - - alias Spawn.Cache.LookupCache, as: Cache - - @type node_type :: term() - - @type opts :: Keyword.t() - - @type data :: any() - - @type new_data :: data() - - @type id :: Eigr.Functions.Protocol.Actors.ActorId.t() - - @type host :: Actors.Registry.HostActor.t() - - @type hosts :: list(Actors.Registry.HostActor.t()) - - @type timer :: {atom(), integer()} - - @otp_app :spawn - - @ttl :timer.minutes(10) - - @impl true - @spec clean(node(), data()) :: data() - def clean(node, data), do: handle_terminate(node, data) - - @impl true - @spec get_by_id(id(), data()) :: {new_data(), hosts()} - @decorate cacheable(cache: Cache, keys: [id], opts: [ttl: @ttl]) - def get_by_id(id, %{backend_adapter: backend} = data) do - {:ok, lookups} = backend.get_by_id(id) - - hosts = - Enum.map(lookups, fn data = _lookup -> - :erlang.binary_to_term(data) - end) - - {data, hosts} - end - - @impl true - @spec handle_init(opts()) :: new_data() | {new_data(), timer()} - def handle_init(_opts) do - backend = Application.get_env(@otp_app, :state_handoff_controller_persistent_backend) - %{backend_adapter: backend} - end - - @impl true - @spec handle_after_init(data()) :: new_data() - def handle_after_init(data), do: data - - @impl true - @spec handle_terminate(node(), data()) :: new_data() - @decorate cache_evict(cache: Cache, key: node) - def handle_terminate(node, %{backend_adapter: backend} = data) do - backend.clean(node) - data - end - - def handle_terminate(node, data) do - Logger.warning("Invalid terminate state for Node #{inspect(node)}. State: #{inspect(data)}") - end - - @impl true - @spec handle_timer(any(), data()) :: new_data() | {new_data(), timer()} - def handle_timer(_event, data), do: data - - @impl true - @spec handle_nodeup_event(node(), node_type(), data()) :: new_data() - def handle_nodeup_event(_node, _node_type, data), do: data - - @impl true - @spec handle_nodedown_event(node(), node_type(), data()) :: new_data() - def handle_nodedown_event(_node, _node_type, data), do: data - - @impl true - @spec set(id(), node(), host(), data) :: new_data() - def set(id, node, host, %{backend_adapter: backend} = data) do - bytes = :erlang.term_to_binary(host) - backend.set(id, node, bytes) - data - end -end diff --git a/lib/spawn/cluster/state_handoff/invocation_scheduler_state.ex b/lib/spawn/cluster/state_handoff/invocation_scheduler_state.ex deleted file mode 100644 index c63983d0..00000000 --- a/lib/spawn/cluster/state_handoff/invocation_scheduler_state.ex +++ /dev/null @@ -1,160 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.InvocationSchedulerState do - @moduledoc """ - This handles invocation scheduler stream - - It uses the DeltaCrdt library to handle a distributed state, which is an eventually consistent replicated data type. - The module starts a GenServer that monitors nodes in the cluster, and when a new node comes up it sends a "set_neighbours" - message to that node's GenServer process with its own DeltaCrdt process ID. This is done to ensure that changes in either node's - state are reflected across both. - """ - use GenServer - - require Iter - require Logger - - alias Actors.Config.PersistentTermConfig, as: Config - - @call_timeout 15_000 - - def child_spec(opts) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]}, - restart: :permanent - } - end - - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @impl true - def init(_opts) do - Process.flag(:trap_exit, true) - Process.flag(:message_queue_data, :off_heap) - :net_kernel.monitor_nodes(true, node_type: :visible) - - pooling_interval = Config.get(:neighbours_sync_interval) - - {:ok, crdt_pid} = - DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, - sync_interval: Config.get(:sync_interval), - ship_interval: Config.get(:ship_interval), - ship_debounce: Config.get(:ship_debounce) - ) - - :persistent_term.put(__MODULE__, crdt_pid) - - Process.send_after(self(), :sync, pooling_interval) - - {:ok, crdt_pid, {:continue, :after_init}} - end - - @impl true - def handle_continue(:after_init, crdt_pid) do - do_set_neighbours(crdt_pid) - - {:noreply, crdt_pid} - end - - def get_crdt_pid do - :persistent_term.get(__MODULE__, {:error, Node.self()}) - end - - @spec all() :: map() - def all() do - DeltaCrdt.to_map(get_crdt_pid()) || %{} - end - - def put_many([]), do: :ok - - def put_many(invocations) do - new_data = - Enum.reduce(invocations, %{}, fn {invocation, scheduled_to, cycle_in}, acc -> - Map.put(acc, invocation, {scheduled_to, cycle_in}) - end) - - DeltaCrdt.merge(get_crdt_pid(), new_data, :infinity) - end - - def put(invocation, scheduled_to, repeat_in) do - put_many([{invocation, scheduled_to, repeat_in}]) - end - - def get(invocation) do - DeltaCrdt.get(get_crdt_pid(), invocation, :infinity) - end - - def remove(key) do - DeltaCrdt.delete(get_crdt_pid(), key, :infinity) - end - - @impl true - def terminate(_reason, crdt_pid) do - Logger.debug("#{inspect(__MODULE__)} Handling InvocationSchedulerState terminate...") - :persistent_term.erase(__MODULE__) - - {:ok, crdt_pid} - end - - @impl true - def handle_info(:sync, crdt_pid) do - if Sidecar.GracefulShutdown.running?() do - do_set_neighbours(crdt_pid) - end - - Process.send_after(self(), :sync, Config.get(:neighbours_sync_interval)) - {:noreply, crdt_pid} - end - - def handle_info({:nodeup, node, _node_type}, crdt_pid) do - Logger.debug("InvocationSchedulerState :nodeup event from #{inspect(node)}") - - if Sidecar.GracefulShutdown.running?() do - do_set_neighbours(crdt_pid) - end - - {:noreply, crdt_pid} - end - - def handle_info({:nodedown, node, _node_type}, crdt_pid) do - Logger.debug("InvocationSchedulerState :nodedown event from #{inspect(node)}") - - if Sidecar.GracefulShutdown.running?() do - do_set_neighbours(crdt_pid) - end - - {:noreply, crdt_pid} - end - - defp do_set_neighbours(this_crdt_pid) do - nodes = Node.list() - - Logger.notice("Sending :set_neighbours to #{inspect(nodes)} for #{inspect(this_crdt_pid)}") - - neighbours = - :erpc.multicall(nodes, __MODULE__, :get_crdt_pid, [], @call_timeout) - |> Enum.map(fn - {:ok, {:error, node}} -> - Logger.warning("The node failed to retrieve DeltaCrdt pid -> #{inspect(node)}") - - nil - - {:ok, crdt_pid} -> - crdt_pid - - error -> - Logger.warning( - "Couldn't reach one of the nodes when calling for neighbors -> #{inspect(error)}" - ) - - nil - end) - |> Enum.reject(&is_nil/1) - - # add other_node's crdt_pid as a neighbour - # we are not adding both ways and letting them sync with eachother - # based on current Node.list() of each node - DeltaCrdt.set_neighbours(this_crdt_pid, neighbours) - end -end diff --git a/lib/spawn/cluster/state_handoff/manager.ex b/lib/spawn/cluster/state_handoff/manager.ex deleted file mode 100644 index 4327fbce..00000000 --- a/lib/spawn/cluster/state_handoff/manager.ex +++ /dev/null @@ -1,157 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.Manager do - @moduledoc """ - This handles state handoff in a cluster. - - This module monitors node up and down events as well as node terminate events and triggers `Spawn.Cluster.StateHandoff.ControllerBehaviour` implementations to handle these events. - """ - use GenServer - require Logger - - def child_spec(id, opts \\ []) do - %{ - id: id, - start: {__MODULE__, :start_link, [opts]}, - restart: :permanent - } - end - - defmodule State do - defstruct data: nil, controller: nil, timer: nil - end - - @impl true - def init(opts) do - controller = - Application.get_env( - :spawn, - :state_handoff_controller_adapter, - Spawn.Cluster.StateHandoff.Controllers.PersistentController - ) - - do_init(opts) - - case controller.handle_init(opts) do - {initial_state, {evt, delay} = _scheduler} -> - timer = Process.send_after(self(), {:timer, evt}, delay) - - {:ok, %State{controller: controller, data: initial_state, timer: timer}, - {:continue, :after_init}} - - initial_state -> - {:ok, %State{controller: controller, data: initial_state}, {:continue, :after_init}} - end - end - - @impl true - def handle_continue(:after_init, %State{controller: controller, data: data} = state) do - new_data = controller.handle_after_init(data) - - {:noreply, %State{state | data: new_data}} - end - - @impl true - def terminate(_reason, %State{controller: controller, data: data} = state) do - Logger.debug("Calling StateHandoff Manager terminate for controller #{inspect(controller)}") - - node = Node.self() - new_data = controller.handle_terminate(node, data) - - {:ok, %State{state | data: new_data}} - end - - @impl true - def handle_call({:set, actor_id, host}, from, state) do - new_data = state.controller.set(actor_id, Node.self(), host, state.data) - - {:reply, from, %State{state | data: new_data}} - end - - @impl true - def handle_call({:get_actor_hosts_by_actor_id, actor_id}, _from, state) do - {_new_data, hosts} = state.controller.get_by_id(actor_id, state.data) - - {:reply, hosts, state} - end - - def handle_call({:clean, node}, _from, state) do - new_data = state.controller.clean(node, state.data) - - {:reply, new_data, %State{state | data: new_data}} - end - - @impl true - def handle_info( - {:timer, event}, - %State{controller: controller, data: data, timer: timer} = state - ) do - if !is_nil(timer) do - Process.cancel_timer(timer) - end - - case controller.handle_timer(event, data) do - {new_data, {evt, delay} = _timer} -> - new_timer = Process.send_after(self(), {:timer, evt}, delay) - {:noreply, %State{state | data: new_data, timer: new_timer}} - - new_data -> - {:noreply, %State{state | data: new_data}} - end - end - - def handle_info({:nodeup, node, node_type}, %State{controller: controller, data: data} = state) do - Logger.debug("Received :nodeup event from #{inspect(node)}") - new_data = controller.handle_nodeup_event(node, node_type, data) - - {:noreply, %State{state | data: new_data}} - end - - def handle_info( - {:nodedown, node, node_type}, - %State{controller: controller, data: data} = state - ) do - Logger.debug("Received :nodedown event from #{inspect(node)}") - new_data = controller.handle_nodedown_event(node, node_type, data) - - {:noreply, %State{state | data: new_data}} - end - - def handle_info(event, state) do - Logger.debug("Received handle_info event #{inspect(event)}") - - {:noreply, state} - end - - # Client API - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Store a actor and entity in the lookup store - """ - def set(actor_id, host), do: GenServer.call(__MODULE__, {:set, actor_id, host}, :infinity) - - @doc """ - Pickup the stored entity data for a actor - """ - def get(actor_id), - do: GenServer.call(__MODULE__, {:get_actor_hosts_by_actor_id, actor_id}, :infinity) - - @doc """ - Cluster HostActor cleanup - """ - def clean(node) do - Logger.debug("Received cleanup action from Node #{inspect(node)}") - - GenServer.call(__MODULE__, {:clean, node}) - - Logger.debug("Hosts cleaned for node #{inspect(node)}") - end - - # Private functions - defp do_init(_config) do - Process.flag(:trap_exit, true) - Process.flag(:message_queue_data, :off_heap) - :net_kernel.monitor_nodes(true, node_type: :visible) - end -end diff --git a/lib/spawn/cluster/state_handoff/manager_supervisor.ex b/lib/spawn/cluster/state_handoff/manager_supervisor.ex deleted file mode 100644 index 68265678..00000000 --- a/lib/spawn/cluster/state_handoff/manager_supervisor.ex +++ /dev/null @@ -1,34 +0,0 @@ -defmodule Spawn.Cluster.StateHandoff.ManagerSupervisor do - @moduledoc false - use Supervisor - require Logger - import Spawn.Utils.Common, only: [supervisor_process_logger: 1] - - alias Actors.Config.PersistentTermConfig, as: Config - - def start_link(state \\ []) do - Supervisor.start_link(__MODULE__, state, name: __MODULE__) - end - - def child_spec(opts) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]} - } - end - - @impl true - def init(opts) do - children = [ - supervisor_process_logger(__MODULE__), - Spawn.Cluster.StateHandoff.Manager.child_spec(:state_handoff_manager, opts), - Spawn.Cluster.StateHandoff.InvocationSchedulerState.child_spec(opts) - ] - - Supervisor.init(children, - strategy: :one_for_one, - max_restarts: Config.get(:state_handoff_max_restarts), - max_seconds: Config.get(:state_handoff_max_seconds) - ) - end -end diff --git a/lib/spawn/supervisor.ex b/lib/spawn/supervisor.ex index 97b496cb..91abd177 100644 --- a/lib/spawn/supervisor.ex +++ b/lib/spawn/supervisor.ex @@ -7,6 +7,8 @@ defmodule Spawn.Supervisor do alias Actors.Config.PersistentTermConfig, as: Config + alias Spawn.Cluster.Node.Distributor + @shutdown_timeout_ms 330_000 def start_link(opts) do @@ -24,13 +26,11 @@ defmodule Spawn.Supervisor do @impl true def init(opts) do children = - [ - supervisor_process_logger(__MODULE__), - {Spawn.Cache.LookupCache, []}, - Spawn.Cluster.StateHandoff.ManagerSupervisor.child_spec(opts), - {Spawn.Cluster.ClusterSupervisor, []}, - Spawn.Cluster.Node.Registry.child_spec() - ] + ([ + supervisor_process_logger(__MODULE__), + {Spawn.Cache.LookupCache, []}, + {Spawn.Cluster.ClusterSupervisor, []} + ] ++ Distributor.child_specs()) |> maybe_start_internal_nats(opts) Supervisor.init(children, strategy: :one_for_one) diff --git a/mix.exs b/mix.exs index 8940f6fb..9abb8c7a 100644 --- a/mix.exs +++ b/mix.exs @@ -83,8 +83,9 @@ defmodule Spawn.MixProject do {:retry, "~> 0.17"}, {:flow, "~> 1.2"}, {:libcluster, "~> 3.3"}, - {:horde, "~> 0.9"}, + #{:horde, "~> 0.9"}, {:highlander, "~> 0.2.1"}, + {:process_hub, "~> 0.2"}, {:phoenix_pubsub, "~> 2.1"}, {:phoenix_pubsub_nats, "~> 0.2"}, {:jason, "~> 1.3"}, diff --git a/mix.lock b/mix.lock index cc94196b..bad1addf 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,9 @@ "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, "bakeware": {:hex, :bakeware, "0.2.4", "0aaf49b34f4bab2aa433f9ff1485d9401e421603160abd6d269c469fc7b65212", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "7b97bcf6fbeee53bb32441d6c495bf478d26f9575633cfef6831e421e86ada6d"}, "bandit": {:hex, :bandit, "1.5.2", "ed0a41c43a9e529c670d0fd48371db4027e7b80d43b1942893e17deb8bed0540", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "35ddbdce7e8a2a3c6b5093f7299d70832a43ed2f4a1852885a61d334cab1b4ad"}, + "blockade": {:hex, :blockade, "0.2.1", "2a91de67a337ae146f368cfc0bfc850a0cf9b5390408b323f263f8608ba33361", [:rebar3], [], "hexpm", "d319b643c9bd20b4499e0a2fe254c37ec9e3682ae62ec211a0cabca62ca5f245"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.9", "e8d3364f310da6ce6463c3dd20cf90ae7bbecbf6c5203b98bf9b48035592649b", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "9dcab3d0f3038621f1601f13539e7a9ee99843862e66ad62827b0c42b2f58a54"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, @@ -23,6 +25,7 @@ "ecto_sqlite3": {:hex, :ecto_sqlite3, "0.13.0", "0c3dc8ff24f378ef108619fd5c18bbbea43cb86dc8733c1c596bd7e0a5bb9e28", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.11", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:exqlite, "~> 0.9", [hex: :exqlite, repo: "hexpm", optional: false]}], "hexpm", "8ab7d8bf6663b811b80c9fa8730780f7077106c40a3fdbae384fe8f82315b257"}, "ed25519": {:hex, :ed25519, "1.4.1", "479fb83c3e31987c9cad780e6aeb8f2015fb5a482618cdf2a825c9aff809afc4", [:mix], [], "hexpm", "0dacb84f3faa3d8148e81019ca35f9d8dcee13232c32c9db5c2fb8ff48c80ec7"}, "elixir_make": {:hex, :elixir_make, "0.7.8", "505026f266552ee5aabca0b9f9c229cbb496c689537c9f922f3eb5431157efc7", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "7a71945b913d37ea89b06966e1342c85cfe549b15e6d6d081e8081c493062c07"}, + "eternal": {:hex, :eternal, "1.2.2", "d1641c86368de99375b98d183042dd6c2b234262b8d08dfd72b9eeaafc2a1abd", [:mix], [], "hexpm", "2c9fe32b9c3726703ba5e1d43a1d255a4f3f2d8f8f9bc19f094c7cb1a7a9e782"}, "ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"}, "exqlite": {:hex, :exqlite, "0.19.0", "0f3ee29e35bed38552dd0ed59600aa81c78f867f5b5ff0e17d330148e0465483", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "55a8fbb0443f03d4a256e3458bd1203eff5037a6624b76460eaaa9080f462b06"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, @@ -36,12 +39,14 @@ "grpc_reflection": {:hex, :grpc_reflection, "0.1.4", "0b3542801ff7078e53dc5dee3d79205f3b8d1fec391c6e890926a182f83d09e0", [:mix], [{:google_protos, "~> 0.4.0", [hex: :google_protos, repo: "hexpm", optional: false]}, {:grpc, "~> 0.7", [hex: :grpc, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.11", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "74a6148335305926b166a58c8e040f64501410d744be55ebe343a704003e994a"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, "gun": {:hex, :gun, "2.1.0", "b4e4cbbf3026d21981c447e9e7ca856766046eff693720ba43114d7f5de36e87", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "52fc7fc246bfc3b00e01aea1c2854c70a366348574ab50c57dfe796d24a0101d"}, + "hash_ring": {:hex, :hash_ring, "0.4.2", "65d7e84ee911c2f07e53193711013ec9b9761c5eb180720e31224b776c98f502", [:rebar3], [], "hexpm", "c327d99b1e2af41b50e5c4803a9bee7932db2c0f39ad7854a9fdad7e42444066"}, "highlander": {:hex, :highlander, "0.2.1", "e59b459f857e89daf73f2598bf2b2c0479a435481e6101ea389fd3625919b052", [:mix], [], "hexpm", "5ba19a18358803d82a923511acec8ee85fac30731c5ca056f2f934bc3d3afd9a"}, "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "hpack": {:hex, :hpack_erl, "0.3.0", "2461899cc4ab6a0ef8e970c1661c5fc6a52d3c25580bc6dd204f84ce94669926", [:rebar3], [], "hexpm", "d6137d7079169d8c485c6962dfe261af5b9ef60fbc557344511c1e65e3d95fb0"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "iter": {:hex, :iter, "0.1.2", "bd5dbba48ba67e0f134889a4a29f2b377db6cdcee0661f3c29439e7b649e317a", [:mix], [], "hexpm", "e79f53ed36105ae72582fd3ef224ca2539ccc00cdc27e6e7fe69c49119c4e39b"}, "jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"}, + "jumper": {:hex, :jumper, "1.0.2", "68cdcd84472a00ac596b4e6459a41b3062d4427cbd4f1e8c8793c5b54f1406a7", [:mix], [], "hexpm", "9b7782409021e01ab3c08270e26f36eb62976a38c1aa64b2eaf6348422f165e1"}, "k8s": {:hex, :k8s, "2.6.1", "ef949e268a65fc45e4481e1071b96c8aa7eae028f0f451052c301d72aae649a2", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0", [hex: :mint_web_socket, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.8", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "a95bde29b60ea94b2ba341969f911e94ae38fc635a332549c39e726ba76ebe10"}, "k8s_webhoox": {:hex, :k8s_webhoox, "0.2.0", "5ef0968a426a0e5d168dd54db7075e0ee222dddfa5da2cf29f25f01a7d02ffd0", [:mix], [{:k8s, "~> 2.0", [hex: :k8s, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:pluggable, "~> 1.0", [hex: :pluggable, repo: "hexpm", optional: false]}, {:x509, "~> 0.8.5", [hex: :x509, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.0", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "4917e1bf43bcbae3c2fa53fa4206f444cc029e757dc4e2b7d550cb0ae8752543"}, "libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"}, @@ -72,12 +77,14 @@ "pluggable": {:hex, :pluggable, "1.1.0", "7eba3bc70c0caf4d9056c63c882df8862f7534f0145da7ab3a47ca73e4adb1e4", [:mix], [], "hexpm", "d12eb00ea47b21e92cd2700d6fbe3737f04b64e71b63aad1c0accde87c751637"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, + "process_hub": {:hex, :process_hub, "0.2.5-alpha", "339c40e824f38a36735c3b85ed60f799e42bfb63e2ca93da7adf4db1a042688a", [:mix], [{:blockade, "~> 0.2.1", [hex: :blockade, repo: "hexpm", optional: false]}, {:cachex, "~> 3.6", [hex: :cachex, repo: "hexpm", optional: false]}, {:hash_ring, "~> 0.4.2", [hex: :hash_ring, repo: "hexpm", optional: false]}], "hexpm", "98000583721dfa177ee2e02838968bd058acb9655d35cb15de69e162241ba35b"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, "protobuf_generate": {:hex, :protobuf_generate, "0.1.2", "45b9a9ae8606333cdea993ceaaecd799d206cdfe23348d37c06207eac76cbee6", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "55b0ff8385703317ca90e1bd30a2ece99e80ae0c73e6ebcfb374e84e57870d61"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "recon": {:hex, :recon, "2.5.4", "05dd52a119ee4059fa9daa1ab7ce81bc7a8161a2f12e9d42e9d551ffd2ba901c", [:mix, :rebar3], [], "hexpm", "e9ab01ac7fc8572e41eb59385efeb3fb0ff5bf02103816535bacaedf327d0263"}, "retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"}, "shards": {:hex, :shards, "1.1.1", "8b42323457d185b26b15d05187784ce6c5d1e181b35c46fca36c45f661defe02", [:make, :rebar3], [], "hexpm", "169a045dae6668cda15fbf86d31bf433d0dbbaec42c8c23ca4f8f2d405ea8eda"}, + "sleeplocks": {:hex, :sleeplocks, "1.1.3", "96a86460cc33b435c7310dbd27ec82ca2c1f24ae38e34f8edde97f756503441a", [:rebar3], [], "hexpm", "d3b3958552e6eb16f463921e70ae7c767519ef8f5be46d7696cc1ed649421321"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "tds": {:hex, :tds, "2.3.5", "fedfb96d53206f01eac62ead859e47e1541a62e1553e9eb7a8801c7dca59eae8", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "52e350f5dd5584bbcff9859e331be144d290b41bd4c749b936014a17660662f2"}, @@ -87,6 +94,7 @@ "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "tls_certificate_check": {:hex, :tls_certificate_check, "1.22.1", "0f450cc1568a67a65ce5e15df53c53f9a098c3da081c5f126199a72505858dc1", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3092be0babdc0e14c2e900542351e066c0fa5a9cf4b3597559ad1e67f07938c0"}, + "unsafe": {:hex, :unsafe, "1.0.2", "23c6be12f6c1605364801f4b47007c0c159497d0446ad378b5cf05f1855c0581", [:mix], [], "hexpm", "b485231683c3ab01a9cd44cb4a79f152c6f3bb87358439c6f68791b85c2df675"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "x509": {:hex, :x509, "0.8.8", "aaf5e58b19a36a8e2c5c5cff0ad30f64eef5d9225f0fd98fb07912ee23f7aba3", [:mix], [], "hexpm", "ccc3bff61406e5bb6a63f06d549f3dba3a1bbb456d84517efaaa210d8a33750f"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, diff --git a/spawn_operator/spawn_operator/lib/spawn_operator/k8s/proxy/deployment.ex b/spawn_operator/spawn_operator/lib/spawn_operator/k8s/proxy/deployment.ex index 2b9b7234..04791018 100644 --- a/spawn_operator/spawn_operator/lib/spawn_operator/k8s/proxy/deployment.ex +++ b/spawn_operator/spawn_operator/lib/spawn_operator/k8s/proxy/deployment.ex @@ -36,11 +36,11 @@ defmodule SpawnOperator.K8s.Proxy.Deployment do } ] - @default_actor_host_function_replicas 1 + @default_actor_host_function_replicas 2 @default_actor_host_resources %{ "requests" => %{ - "cpu" => "100m", + "cpu" => "10m", "memory" => "80Mi", "ephemeral-storage" => "1M" } @@ -48,7 +48,7 @@ defmodule SpawnOperator.K8s.Proxy.Deployment do @default_proxy_resources %{ "requests" => %{ - "cpu" => "50m", + "cpu" => "10m", "memory" => "80Mi", "ephemeral-storage" => "1M" } @@ -249,7 +249,7 @@ defmodule SpawnOperator.K8s.Proxy.Deployment do proxy_container = %{ - "name" => "sidecar", + "name" => "spawn", "image" => "#{annotations.proxy_image_tag}", "imagePullPolicy" => "Always", "env" => @default_actor_host_function_env, diff --git a/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/example.pb.ex b/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/example.pb.ex index b5f5234b..e6ae3f1a 100644 --- a/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/example.pb.ex +++ b/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/example.pb.ex @@ -34,7 +34,7 @@ defmodule Io.Eigr.Spawn.Example.MyState do } end - field :value, 1, type: :int32 + field(:value, 1, type: :int32) end defmodule Io.Eigr.Spawn.Example.MyBusinessMessage do @@ -73,7 +73,7 @@ defmodule Io.Eigr.Spawn.Example.MyBusinessMessage do } end - field :value, 1, type: :int32 + field(:value, 1, type: :int32) end defmodule Io.Eigr.Spawn.Example.Joe.Service do @@ -571,4 +571,4 @@ defmodule Io.Eigr.Spawn.Example.Joe.Stub do @moduledoc false use GRPC.Stub, service: Io.Eigr.Spawn.Example.Joe.Service -end \ No newline at end of file +end diff --git a/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/inside_folder/clock.pb.ex b/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/inside_folder/clock.pb.ex index b0bffbf7..46fbdd16 100644 --- a/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/inside_folder/clock.pb.ex +++ b/spawn_sdk/spawn_sdk_example/lib/spawn_sdk_example/actors/inside_folder/clock.pb.ex @@ -340,4 +340,4 @@ defmodule Io.Eigr.Spawn.Example.ClockActor.Stub do @moduledoc false use GRPC.Stub, service: Io.Eigr.Spawn.Example.ClockActor.Service -end \ No newline at end of file +end