Skip to content

Commit

Permalink
add: [source] phoenix - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Jan 24, 2025
1 parent 699794c commit 805e13f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 17 deletions.
7 changes: 7 additions & 0 deletions lib/cocktailparty/catalog/source_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ defmodule Cocktailparty.Catalog.SourceType do
module: Cocktailparty.Catalog.DummyWebsocket,
required_fields: [:output_datatype]
}
],
"phoenix" => [
%{
type: "phoenix_downlink",
module: Cocktailparty.Catalog.PhoenixDownlink,
required_fields: [:output_datatype, :destination]
}
]
}

Expand Down
75 changes: 59 additions & 16 deletions lib/cocktailparty/input/phoenix_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,41 @@ defmodule Cocktailparty.Input.PhoenixClient do
{:ok, socket}
end

# @impl Slipstream
# def handle_connect(socket) do
# socket =
# socket.assigns.topics
# |> Enum.reduce(socket, fn topic, socket ->
# case rejoin(socket, topic) do
# {:ok, socket} -> socket
# {:error, _reason} -> socket
# end
# end)

# {:ok, socket}
# end
@impl Slipstream
def handle_connect(socket) do
socket =
socket
|> update(
:subscriptions,
&Enum.reduce(&1, %{}, fn destination, acc ->
acc
|> Map.put(
destination,
MapSet.union(
socket.assigns.subscriptions[destination],
socket.assigns.subscribing[destination]
)
)
end)
)

socket =
socket.assigns.subscriptions
|> Enum.reduce(socket, fn topic, socket ->
case rejoin(socket, topic) do
{:ok, socket} -> socket
{:error, _reason} -> socket
end
end)

{:ok, socket}
end

@impl Slipstream
def handle_cast({:subscribe, destination, name = {:source, _srcid}}, socket) do
def handle_cast(
{:subscribe, %{destination: destination, name: {:source, _srcid} = name}},
socket
) do
subscribers = Map.get(socket.assigns.subscriptions, destination, MapSet.new())
subscribers_subscribing = Map.get(socket.assigns.subscribing, destination, MapSet.new())
new_subsribers = MapSet.put(subscribers, name)
Expand All @@ -56,6 +75,32 @@ defmodule Cocktailparty.Input.PhoenixClient do
{:noreply, socket}
end

# def handle_cast({:unsubscribe, destination, _name = {:source, _srcid}}, socket) do
def handle_cast({:unsubscribe, %{destination: destination, name: {:source, _srcid}}}, socket) do
subscribers = Map.get(socket.assigns.subscriptions, destination, MapSet.new())

socket =
case joined?(socket, destination) do
true ->
# if that was the last subscriber we send leave
if MapSet.size(subscribers) == 1 do
socket
|> leave(destination)
|> update(:subscriptions, &Map.delete(&1, destination))
else
socket
|> update(:subscriptions, &Map.delete(&1, destination))
end

false ->
# we remove the sub from the set
socket
|> update(:subscriptions, &Map.delete(&1, destination))
end

{:noreply, socket}
end

@impl Slipstream
def handle_join(destination, join_response, socket) do
Logger.info("#{destination} #{inspect(join_response)}")
Expand All @@ -78,8 +123,6 @@ defmodule Cocktailparty.Input.PhoenixClient do
@impl Slipstream
def handle_message(destination, event, message, socket) do
# Here we will push to subscribed sources
Logger.info("Got message on #{destination}/#{event}: #{inspect(message)}")

subscribers = Map.get(socket.assigns.subscriptions, destination, MapSet.new())

Enum.each(subscribers, fn name ->
Expand Down
2 changes: 1 addition & 1 deletion lib/cocktailparty/input/stomp_pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ defmodule Cocktailparty.Input.StompPubSub do
case :global.whereis_name(name) do
:undefined ->
{:source, n} = name
Logger.info("Cannot find process #{n}")
Logger.error("Cannot find process #{n}")

pid ->
send(pid, {:new_stomp_message, frame})
Expand Down

0 comments on commit 805e13f

Please sign in to comment.