Skip to content

Commit

Permalink
chg: [connection] websocket - datatypes - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Jan 20, 2025
1 parent d85e971 commit 1ed77f1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
33 changes: 24 additions & 9 deletions lib/cocktailparty/catalog/dummy_websocket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ defmodule Cocktailparty.Catalog.DummyWebsocket do
# Tell the connection process to send the packet this way
send(
conn_pid,
{:subscribe, %{name: {:source, source.id}, datatype: source.config["datatype"]}}
{:subscribe,
%{
name: {:source, source.id},
input_datatype: source.config["input_datatype"]
}}
)

Logger.error("SENT SUB")

{:ok,
%{
conn_id: source.connection_id,
source_id: source.id
source_id: source.id,
input_datatype: source.config["input_datatype"],
output_datatype: source.config["output_datatype"]
}}
end
end
Expand All @@ -42,11 +46,22 @@ defmodule Cocktailparty.Catalog.DummyWebsocket do
# Receiving a message from a websocket we are subscribed to.
def handle_info({:new_text_message, message}, state) do
# wrap messages into %Broadcast{} to keep metadata about the payload
broadcast = %Broadcast{
topic: "feed:" <> Integer.to_string(state.source_id),
event: :new_text_message,
payload: message
}
broadcast =
case state.output_datatype do
"text" ->
%Broadcast{
topic: "feed:" <> Integer.to_string(state.source_id),
event: :new_text_message,
payload: message
}

"binary" ->
%Broadcast{
topic: "feed:" <> Integer.to_string(state.source_id),
event: :new_binary_message,
payload: message
}
end

:ok =
Phoenix.PubSub.broadcast(
Expand Down
2 changes: 1 addition & 1 deletion lib/cocktailparty/catalog/source_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Cocktailparty.Catalog.SourceType do
%{
type: "dummy",
module: Cocktailparty.Catalog.DummyWebsocket,
required_fields: [:datatype]
required_fields: [:input_datatype, :output_datatype]
}
]
}
Expand Down
22 changes: 11 additions & 11 deletions lib/cocktailparty/input/websocket_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ defmodule Cocktailparty.Input.WebsocketClient do
defstruct [:subscribed]

# def handle_cast({:subscribe, name = {:source, _}}, state) do
def handle_info({:subscribe, source = %{name: {:source, _}, datatype: _}}, state) do
def handle_info({:subscribe, source = %{name: {:source, _}, input_datatype: _}}, state) do
Logger.error("Received SUB")
# with pid <- :global.whereis_name(source.name) do
# Logger.info("Received SUB from #{:erlang.pid_to_list(pid) |> to_string}")
{:ok, Map.put(state, :subscribed, MapSet.put(state.subscribed, source))}
# end
end

def handle_info({:unsubscribe, source = %{name: {:source, _}, datatype: _}}, state) do
def handle_info({:unsubscribe, source = %{name: {:source, _}, input_datatype: _}}, state) do
with pid <- :global.whereis_name(source.name) do
Logger.info("Received UNSUB from #{:erlang.pid_to_list(pid) |> to_string}")
{:ok, Map.put(state, :subscribed, MapSet.delete(state.subscribed, source))}
Expand All @@ -34,10 +34,10 @@ defmodule Cocktailparty.Input.WebsocketClient do
case :global.whereis_name(source.name) do
:undefined ->
{:source, n} = source.name
Logger.info("Cannot find process #{n}")
Process.exit(self(), "Cannot find process \#{:source, #{n}\}")

pid ->
case source.datatype do
case source.input_datatype do
"text" ->
send(pid, {:new_text_message, content})

Expand All @@ -61,11 +61,11 @@ defmodule Cocktailparty.Input.WebsocketClient do
Enum.each(state.subscribed, fn source ->
case :global.whereis_name(source.name) do
:undefined ->
{:source, n} = source.name
Logger.info("Cannot find process #{n}")
# If the source process cannot be find, we terminate the present process
Process.exit(self(), "Cannot find process #{source.name}")

pid ->
case source.datatype do
case source.input_datatype do
"binary" ->
send(pid, {:new_binary_message, content})

Expand All @@ -86,13 +86,13 @@ defmodule Cocktailparty.Input.WebsocketClient do
{:ok, state}
end

def handle_control({:ping, message}, state) do
IO.puts("Received ping with content: #{message}!")
def handle_control({:ping, _message}, state) do
# IO.puts("Received ping with content: #{message}!")
{:ok, state}
end

def handle_control({:pong, message}, state) do
IO.puts("Received pong with content: #{message}!")
def handle_control({:pong, _message}, state) do
# IO.puts("Received pong with content: #{message}!")
{:ok, state}
end

Expand Down

0 comments on commit 1ed77f1

Please sign in to comment.