Skip to content

Commit

Permalink
chg: [connections] WIP unsubsribe source
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Oct 15, 2024
1 parent c0a4d65 commit 54e525d
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 129 deletions.
2 changes: 2 additions & 0 deletions lib/cocktailparty/catalog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ defmodule Cocktailparty.Catalog do
end
end

# TODO remove
def get_broker(%Source{} = source) do
# locate the reponsible broker process
case GenServer.whereis({:global, "broker_" <> Integer.to_string(source.connection_id)}) do
Expand Down Expand Up @@ -629,6 +630,7 @@ defmodule Cocktailparty.Catalog do
# GenServer.cast(get_broker(source), msg)
# end

# TODO remove
defp notify_monitor(msg) do
GenServer.cast({:global, Cocktailparty.PubSubMonitor}, msg)
end
Expand Down
23 changes: 14 additions & 9 deletions lib/cocktailparty/catalog/source_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ defmodule Cocktailparty.Catalog.SourceManager do
Stops a source process given its ID.
"""
def stop_source(source_id) do
with pid_sds <- :global.whereis_name(Cocktailparty.SourcesDynamicSupervisor),
pid <- :global.whereis_name({:source, source_id}) do
DynamicSupervisor.terminate_child(pid_sds, pid)
:ok
else
:undefined ->
{:error, :source_not_running}
pid_sds = :global.whereis_name(Cocktailparty.SourcesDynamicSupervisor)
pid = :global.whereis_name({:source, source_id})

cond do
pid_sds == :undefined ->
:ok

pid == :undefined ->
:ok

true ->
DynamicSupervisor.terminate_child(pid_sds, pid)
end
end

Expand All @@ -58,10 +63,10 @@ defmodule Cocktailparty.Catalog.SourceManager do
"""
def restart_source(id) do
with src <- Cocktailparty.Catalog.get_source!(id),
:ok <- stop_source(src.id) do
:ok <- stop_source(src.id) do
start_source(src)
else
:undefined ->
_ ->
Logger.error("Cannot restart process for source #{id} -- not running")
end
end
Expand Down
14 changes: 2 additions & 12 deletions lib/cocktailparty/catalog/stomp_subscribe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Cocktailparty.Catalog.StompSubscribe do

alias Phoenix.Socket.Broadcast
alias Cocktailparty.Catalog.SourceType
# alias Cocktailparty.Input.StompPubSub
alias Cocktailparty.Input.StompPubSub
alias Barytherium.Frame
require Logger

Expand All @@ -26,7 +26,7 @@ defmodule Cocktailparty.Catalog.StompSubscribe do
def init(source) do
with conn_pid <- :global.whereis_name({"stomp", source.connection_id}) do
# Subscribe to the STOMP channel
# StompPubSub.subscribe(conn_pid, source.config["destination"], self())
StompPubSub.subscribe(conn_pid, source.config["destination"], {:source, source.id})

{:ok,
%{
Expand Down Expand Up @@ -83,16 +83,6 @@ defmodule Cocktailparty.Catalog.StompSubscribe do
{:noreply, state}
end

# @impl GenServer
# TODO when terminating we notify the STOMP pubsub driver so it unsubscribes
# from the server if there is no other consumer
# def terminate(reason, state) do
# Logger.info(
# "Redis PubSub source #{state.source_id} unsubscribing from #{state.channel} because #{reason}"
# )

# Redix.PubSub.unsubscribe(state.conn_pid, state.source_id, self())
# end
defp decompress_body(<<31, 139, 8, _::binary>> = body), do: :zlib.gunzip(body)
defp decompress_body(<<_::binary>> = body), do: body
end
189 changes: 84 additions & 105 deletions lib/cocktailparty/input/stomp_pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Cocktailparty.Input.StompPubSub do
alias Barytherium.Network
alias Barytherium.Network.Sender
alias Cocktailparty.Input
import Cocktailparty.Util
alias Cocktailparty.Util

@run_interval 10_000

Expand All @@ -19,6 +19,7 @@ defmodule Cocktailparty.Input.StompPubSub do
virtual_host: "/",
ssl: false,
subscriptions: %{},
subscribing: %{},
network_pid: nil,
connection_id: nil

Expand All @@ -27,14 +28,12 @@ defmodule Cocktailparty.Input.StompPubSub do
GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, __MODULE__))
end

# TODO we should subscribe by name, not by pid
def subscribe(pubsub, destination, pid) do
GenServer.call(pubsub, {:subscribe, destination, pid})
def subscribe(pubsub, destination, name) do
GenServer.cast(pubsub, {:subscribe, destination, name})
end

# TODO we should unsubscribe by name, not by pid
def unsubscribe(pubsub, destination, pid) do
GenServer.call(pubsub, {:unsubscribe, destination, pid})
def unsubscribe(pubsub, destination, name) do
GenServer.cast(pubsub, {:unsubscribe, destination, name})
end

## GenServer Callbacks
Expand All @@ -48,6 +47,7 @@ defmodule Cocktailparty.Input.StompPubSub do
passcode: Keyword.get(opts, :passcode),
opts: [secure: Keyword.get(opts, :ssl, false)],
subscriptions: %{},
ready: false,
network_pid: nil,
connection_id: Keyword.get(opts, :connection_id)
}
Expand All @@ -63,33 +63,66 @@ defmodule Cocktailparty.Input.StompPubSub do
end

@impl true
def handle_call({:subscribe, destination, name}, _from, state) do
def handle_info(:reconnect, state) do
Logger.info("Reconnecting to #{state.host}:#{state.port}")
# Kill the disconnected process
if state.network_pid do
Process.exit(state.network_pid, :kill)
end

{:ok, network_pid} = connect(state)
{:noreply, Map.put(state, :network_pid, network_pid)}
end

@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
# do the translation between pid and source name
name = Util.get_global_name(pid)
# TODO haha cannot get it's name it is already dead.
Logger.info("Process #{name} has been terminated")
# Remove the dead pid from all subscriptions
new_subscriptions = remove_process_from_subscriptions(state.subscriptions, name)
{:noreply, %{state | subscriptions: new_subscriptions}}
end

def handle_cast({:subscribe, destination, name}, state) do
with pid <- :global.whereis_name(name) do
# We monitor the source process
Process.monitor(pid)
Logger.info("Received SUB from #{:erlang.pid_to_list(pid) |> to_string}")

# If the connection is ready we can start subscribing right away
if state.ready do
Logger.info("Connection is ready, SUBSCRIBING")
subscribers = Map.get(state.subscriptions, destination, MapSet.new())
new_subscribers = MapSet.put(subscribers, name)
new_subscriptions = Map.put(state.subscriptions, destination, new_subscribers)

# Send SUBSCRIBE frame if this is the first subscriber to the destination
if MapSet.size(subscribers) == 0 do
send_subscribe_frame(state.sender_pid, destination)
end

subscribers = Map.get(state.subscriptions, destination, MapSet.new())
new_subscribers = MapSet.put(subscribers, name)
new_subscriptions = Map.put(state.subscriptions, destination, new_subscribers)
# We monitor the process to handle subscriptions
Process.monitor(pid)

# Send SUBSCRIBE frame if this is the first subscriber to the destination
if MapSet.size(subscribers) == 0 do
send_subscribe_frame(state, destination)
{:noreply, Map.put(state, :subscriptions, new_subscriptions)}
else
Logger.info("Connection is not ready, keeping for later")
# Otherwise we don't do anything, the driver will reconnect all the source anyway
end

# {:reply, :ok, %{state | subscriptions: new_subscriptions}}
{:reply, :ok, Map.put(state, :subscriptions, new_subscriptions)}
else
:undefined ->
Logger.info("Cannot find process #{name}")
end
end

@impl true
def handle_call({:unsubscribe, destination, pid}, _from, state) do
def handle_cast({:unsubscribe, destination, name}, state) do
# We remove the process from the list of processes that receive the frame to this destination
# if this is the last one, we send an unsubscribe frame
# if the network is not ready there is nothing to do
subscribers = Map.get(state.subscriptions, destination, MapSet.new())
new_subscribers = MapSet.delete(subscribers, pid)
new_subscribers = MapSet.delete(subscribers, name)

# TODO this is broken ATM
new_subscriptions =
if MapSet.size(new_subscribers) == 0 do
# Send UNSUBSCRIBE frame if no subscribers left
Expand All @@ -102,27 +135,6 @@ defmodule Cocktailparty.Input.StompPubSub do
{:reply, :ok, Map.put(state, :subscriptions, new_subscriptions)}
end

# @impl true
# TODO when a source process sends DOWN, we unsubscribe from the STOMP Network ?
# Or do we manage this externally?
# def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
# # Remove the dead pid from all subscriptions
# new_subscriptions = remove_pid_from_subscriptions(state.subscriptions, pid)
# {:noreply, %{state | subscriptions: new_subscriptions}}
# end

@impl true
def handle_info(:reconnect, state) do
Logger.info("Reconnecting to #{state.host}:#{state.port}")
# Kill the disconnected process
if state.network_pid do
Process.exit(state.network_pid, :kill)
end

{:ok, network_pid} = connect(state)
{:noreply, Map.put(state, :network_pid, network_pid)}
end

# Handle messages from Barytherium
# CONNECT
@impl true
Expand Down Expand Up @@ -162,7 +174,6 @@ defmodule Cocktailparty.Input.StompPubSub do
Logger.error("Stomp Connection to #{host}:#{port} failed, error: #{error}")
Process.send_after(self(), :reconnect, @run_interval)
{:noreply, state}
# {:stop, :connect_disconnected, state}
end

# CONNECTED SET DESTINATIONS
Expand All @@ -188,11 +199,9 @@ defmodule Cocktailparty.Input.StompPubSub do
Logger.info("Cannot find process #{{:source, source.id}}")
acc

# TODO do we monitor sources, or handle this case from the source_dynamic_supervisor?
pid ->
# We monitor the process
# we monitor the process to handle subscriptions
Process.monitor(pid)

subscribers = Map.get(acc, source.config["destination"], MapSet.new())
new_subscribers = MapSet.put(subscribers, {:source, source.id})

Expand All @@ -216,7 +225,7 @@ defmodule Cocktailparty.Input.StompPubSub do
end
)

{:noreply, Map.put(state, :subscriptions, new_subscriptions)}
{:noreply, Map.put(state, :subscriptions, new_subscriptions) |> Map.put(:ready, true)}
end

# # RECEIVING DATA
Expand Down Expand Up @@ -251,10 +260,10 @@ defmodule Cocktailparty.Input.StompPubSub do
@impl true
def handle_cast({:barytherium, :disconnect, reason}, state) do
Logger.info("STOMP disconnected because #{reason}")
# Remove the dead pid from all subscriptions
new_subscriptions = remove_pid_from_subscriptions(state.subscriptions, state.network_pid)
# We destroy the subscription map, we have to reconnect anyway
Process.send_after(self(), :reconnect, @run_interval)
{:noreply, %{state | subscriptions: new_subscriptions}}
# {:noreply, %{state | subscriptions: new_subscriptions}}
{:noreply, Map.put(state, :subscriptions, %{}) |> Map.put(:ready, false)}
end

@impl true
Expand All @@ -277,42 +286,36 @@ defmodule Cocktailparty.Input.StompPubSub do
Network.start_link(self(), state.host, state.port, state.opts)
end

defp send_subscribe_frame(state, destination) do
frame = %Frame{
command: "SUBSCRIBE",
headers: %{
"destination" => destination,
"id" => destination,
"ack" => "auto"
},
body: ""
}

Logger.info("sending SUB to #{pid_to_string(state.sender_pid)}")

send_frame(state.sender_pid, frame)
end

defp send_unsubscribe_frame(state, destination) do
frame = %Frame{
command: "UNSUBSCRIBE",
headers: %{
"id" => destination
},
body: ""
}

send_frame(state.sender_pid, frame)
defp send_subscribe_frame(sender_pid, destination) do
Sender.write(sender_pid, [
%Barytherium.Frame{
command: :subscribe,
headers: [
{"id", destination},
{"destination", destination},
{"ack", "client"}
]
}
])
end

defp send_frame(sender_pid, frame) do
GenServer.call(sender_pid, {:write, frame})
defp send_unsubscribe_frame(sender_pid, destination) do
Sender.write(sender_pid, [
%Barytherium.Frame{
command: :unsubscribe,
headers: [
{"id", destination},
{"destination", destination},
{"ack", "client"}
]
}
])
end

defp remove_pid_from_subscriptions(subscriptions, pid) do
defp remove_process_from_subscriptions(subscriptions, name) do
subscriptions
|> Enum.reduce(%{}, fn {destination, subscribers}, acc ->
new_subscribers = MapSet.delete(subscribers, pid)
new_subscribers = MapSet.delete(subscribers, name)

if MapSet.size(new_subscribers) == 0 do
# Send UNSUBSCRIBE frame if no subscribers left
Expand All @@ -330,28 +333,4 @@ defmodule Cocktailparty.Input.StompPubSub do
ack_id -> Sender.write(sender_pid, [%Frame{command: :ack, headers: [{"id", ack_id}]}])
end
end

## Public API

# @doc """
# Subscribes the given `pid` to the specified STOMP `destination`.

# ## Examples

# StompPubSub.subscribe(pubsub, "destination_name", self())
# """
# def subscribe(pubsub, destination, pid \\ self()) when is_pid(pid) do
# subscribe(pubsub, destination, pid)
# end

# @doc """
# Unsubscribes the given `pid` from the specified STOMP `destination`.

# ## Examples

# StompPubSub.unsubscribe(pubsub, "destination_name", self())
# """
# def unsubscribe(pubsub, destination, pid \\ self()) when is_pid(pid) do
# unsubscribe(pubsub, destination, pid)
# end
end
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ defmodule CocktailpartyWeb.Admin.ConnectionController do

def create(conn, %{"connection" => connection_params}) do
{:ok, config_str} = Map.fetch(connection_params, "config")
# replace with function from Util
# TODO handle exception
config = yaml_to_map!(config_str)

case Input.create_connection(Map.put(connection_params, "config", config)) do
{:ok, connection} ->
# TODO: handle errors
# Cocktailparty.Input.Connection.start(connection)

conn
|> put_flash(:info, "Connection created successfully.")
Expand Down
Loading

0 comments on commit 54e525d

Please sign in to comment.