Skip to content

Commit

Permalink
chg: [connection] websocket - datatypes - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Jan 21, 2025
1 parent 1ed77f1 commit 37c596c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 90 deletions.
29 changes: 2 additions & 27 deletions lib/cocktailparty/catalog/dummy_websocket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ defmodule Cocktailparty.Catalog.DummyWebsocket do
conn_pid,
{:subscribe,
%{
name: {:source, source.id},
input_datatype: source.config["input_datatype"]
name: {:source, source.id}
}}
)

Expand All @@ -44,7 +43,7 @@ defmodule Cocktailparty.Catalog.DummyWebsocket do

@impl GenServer
# Receiving a message from a websocket we are subscribed to.
def handle_info({:new_text_message, message}, state) do
def handle_info({_message_type_atom, message}, state) do
# wrap messages into %Broadcast{} to keep metadata about the payload
broadcast =
case state.output_datatype do
Expand Down Expand Up @@ -77,30 +76,6 @@ defmodule Cocktailparty.Catalog.DummyWebsocket do
{:noreply, state}
end

@impl GenServer
# Receiving a message from a websocket we are subscribed to.
def handle_info({:new_binary_message, message}, state) do
# wrap messages into %Broadcast{} to keep metadata about the payload
broadcast = %Broadcast{
topic: "feed:" <> Integer.to_string(state.source_id),
event: :new_binary_message,
payload: message
}

:ok =
Phoenix.PubSub.broadcast(
Cocktailparty.PubSub,
"feed:" <> Integer.to_string(state.source_id),
broadcast
)

:telemetry.execute([:cocktailparty, :broker], %{count: 1}, %{
feed: "feed:" <> Integer.to_string(state.source_id)
})

{:noreply, state}
end

@impl GenServer
def terminate(reason, state) do
Logger.info("Dummy websocket source #{state.source_id} terminating because #{reason}")
Expand Down
2 changes: 1 addition & 1 deletion lib/cocktailparty/catalog/source_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Cocktailparty.Catalog.SourceType do
%{
type: "dummy",
module: Cocktailparty.Catalog.DummyWebsocket,
required_fields: [:input_datatype, :output_datatype]
required_fields: [:output_datatype]
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion lib/cocktailparty/input/connection_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Cocktailparty.Input.ConnectionTypes do
"websocket" => %{
name: "WebSocket",
module: Cocktailparty.Input.WebSocket,
required_fields: [:uri],
required_fields: [:uri, :input_datatype],
fullduplex: false
}
# Add other connection types here
Expand Down
10 changes: 9 additions & 1 deletion lib/cocktailparty/input/websocket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ defmodule Cocktailparty.Input.WebSocket do
def start_link(connection) do
Logger.info("Supervisor Starting #{connection.name} websocket")

# TODO have proper validation on YAML fields
input_datatype = case connection.config["input_datatype"] do
"text" -> :text
"binary" -> :binary
end

specs =
{Cocktailparty.Input.WebsocketClient,
uri: connection.config["uri"], state: %{subscribed: MapSet.new()}, opts: [name: {:global, {connection.type, connection.id}}]}
uri: connection.config["uri"],
state: %{subscribed: MapSet.new(), input_datatype: input_datatype},
opts: [name: {:global, {connection.type, connection.id}}]}

# Add to the ConnectionDynamicSupervisor children
case :global.whereis_name(Cocktailparty.ConnectionsDynamicSupervisor) do
Expand Down
93 changes: 33 additions & 60 deletions lib/cocktailparty/input/websocket_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@ defmodule Cocktailparty.Input.WebsocketClient do
use Fresh

# TODO binary data etc.
defstruct [:subscribed]
defstruct [:subscribed, :input_datatype]

# def handle_cast({:subscribe, name = {:source, _}}, state) do
def handle_info({:subscribe, source = %{name: {:source, _}, input_datatype: _}}, state) do
def handle_info({:subscribe, source = %{name: {:source, _}}}, state) do
Logger.error("Received SUB")
# with pid <- :global.whereis_name(source.name) do
# Logger.info("Received SUB from #{:erlang.pid_to_list(pid) |> to_string}")
{:ok, Map.put(state, :subscribed, MapSet.put(state.subscribed, source))}
{:ok,
Map.put(
state,
:subscribed,
MapSet.put(state.subscribed, source) |> Map.put(:input_datatype, state.input_datatype)
)}

# end
end

def handle_info({:unsubscribe, source = %{name: {:source, _}, input_datatype: _}}, state) do
def handle_info({:unsubscribe, source = %{name: {:source, _}}}, state) do
with pid <- :global.whereis_name(source.name) do
Logger.info("Received UNSUB from #{:erlang.pid_to_list(pid) |> to_string}")
{:ok, Map.put(state, :subscribed, MapSet.delete(state.subscribed, source))}
Expand All @@ -26,66 +31,34 @@ defmodule Cocktailparty.Input.WebsocketClient do
{:ok, state}
end

def handle_in({:text, content}, state) do
# IO.puts("Received state: #{inspect(content)}")

if state.subscribed != MapSet.new() do
Enum.each(state.subscribed, fn source ->
case :global.whereis_name(source.name) do
:undefined ->
{:source, n} = source.name
Process.exit(self(), "Cannot find process \#{:source, #{n}\}")

pid ->
case source.input_datatype do
"text" ->
send(pid, {:new_text_message, content})

"both" ->
send(pid, {:new_text_message, content})

_ ->
{:ok, state}
end
end
end)
def handle_in({datatype, content}, state) do
if state.input_datatype == datatype do
if state.subscribed != MapSet.new() do
Enum.each(state.subscribed, fn source ->
case :global.whereis_name(source.name) do
:undefined ->
{:source, n} = source.name
Process.exit(self(), "Cannot find process \#{:source, #{n}\}")

pid ->
case datatype do
:text ->
send(pid, {:new_text_message, content})

:binary ->
send(pid, {:new_binary_message, content})

_ ->
{:ok, state}
end
end
end)
end
end

{:ok, state}
end

def handle_in({:binary, content}, state) do
# IO.puts("Received state: #{inspect(content)}")

if state.subscribed != MapSet.new() do
Enum.each(state.subscribed, fn source ->
case :global.whereis_name(source.name) do
:undefined ->
# If the source process cannot be find, we terminate the present process
Process.exit(self(), "Cannot find process #{source.name}")

pid ->
case source.input_datatype do
"binary" ->
send(pid, {:new_binary_message, content})

"both" ->
send(pid, {:new_binary_message, content})

_ ->
{:ok, state}
end
end
end)
end

{:ok, state}
end

def handle_in({_, _}, state) do
{:ok, state}
end

def handle_control({:ping, _message}, state) do
# IO.puts("Received ping with content: #{message}!")
{:ok, state}
Expand Down

0 comments on commit 37c596c

Please sign in to comment.