Skip to content

Commit

Permalink
chg: [sinks] wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Nov 15, 2024
1 parent 0a66caa commit bc434ad
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 15 deletions.
13 changes: 12 additions & 1 deletion lib/cocktailparty/sink_catalog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ defmodule Cocktailparty.SinkCatalog do
import Ecto.Query, warn: false
import Ecto.Changeset
alias Cocktailparty.UserManagement
alias Cocktailparty.Input.Connection
alias Cocktailparty.Input
alias Cocktailparty.Repo
alias Cocktailparty.SinkCatalog.Sink
alias Cocktailparty.SinkCatalog.SinkType
alias Cocktailparty.Accounts.User
alias CocktailpartyWeb.Endpoint

Expand All @@ -30,6 +31,16 @@ defmodule Cocktailparty.SinkCatalog do
|> Repo.preload(:connection)
end

@doc """
Returns the list of available sink types for a given connection.
"""
def get_available_sink_types(connection_id) do
connection = Input.get_connection!(connection_id)
SinkType.get_sink_types_for_connection(connection.type)
end



@doc """
Returns the list of sinks / joined with its use
Expand Down
62 changes: 62 additions & 0 deletions lib/cocktailparty/sink_catalog/redis_channel_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Cocktailparty.SinkCatalog.RedisChannelSink do
use Cocktailparty.SinkCatalog.SinkBehaviour
use GenServer

require Logger

alias Redix

## Required Fields

@impl Cocktailparty.SinkCatalog.SinkBehaviour
def required_fields do
[:channel]
end

## Public API
@impl Cocktailparty.SinkCatalog.SinkBehaviour
def publish(pid, message) do
GenServer.call(pid, {:publish, message})
end

## GenServer Callbacks
def start_link(%Cocktailparty.SinkCatalog.Sink{} = sink) do
GenServer.start_link(__MODULE__, sink, name: {:global, {:sink, sink.id}})
end

@impl GenServer
def init(sink) do
# We just check whether the connection process exists
with conn_pid <- :global.whereis_name({"redis_pub", sink.connection_id}) do
# Subscribe to the Redis channel
{:ok,
%{
conn_pid: conn_pid,
channel: sink.config["channel"],
source_id: sink.id
}}
else
:undefined -> {:stop, {:connection_not_found, sink.connection_id}}
end
end

#TODO

@impl GenServer
def handle_call({:publish, message}, _from, %{redix_conn: redix_conn, channel: channel} = state) do
case Redix.command(redix_conn, ["PUBLISH", channel, message]) do
{:ok, _count} ->
{:reply, :ok, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end

@impl GenServer
def terminate(_reason, %{redix_conn: redix_conn}) do
if Process.alive?(redix_conn) do
Redix.stop(redix_conn)
end
:ok
end
end
1 change: 1 addition & 0 deletions lib/cocktailparty/sink_catalog/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Cocktailparty.SinkCatalog.Sink do
field :name, :string
field :type, :string
field :config, :map
field :config_yaml, :string, virtual: true
field :description, :string

belongs_to :user, Cocktailparty.Accounts.User
Expand Down
21 changes: 21 additions & 0 deletions lib/cocktailparty/sink_catalog/sink_behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Cocktailparty.SinkCatalog.SinkBehaviour do
@moduledoc """
Defines the behavior that all sink modules must implement.
"""

@callback required_fields() :: [atom()]
@doc"""
Takes a sink process PID and a message (of any type) and returns :ok or an error tuple.
"""
@callback publish(pid(), term()) :: :ok | {:error, term()}

defmacro __using__(_) do
quote do
@behaviour Cocktailparty.SinkCatalog.SinkBehaviour

def required_fields, do: []

defoverridable required_fields: 0
end
end
end
44 changes: 44 additions & 0 deletions lib/cocktailparty/sink_catalog/sink_type.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Cocktailparty.SinkCatalog.SinkType do
@moduledoc """
Defines the available sink types for each connection type.
"""

@sink_types %{
"redis_pub" => [
%{type: "pub", module: Cocktailparty.SinkCatalog.RedisChannel, required_fields: [:channel]}
]
}

@doc """
Returns the list of source types available for the given connection type.
"""
def get_sink_types_for_connection(connection_type) do
Map.get(@sink_types, connection_type, [])
end

@doc """
Returns the module associated with the given connection type and source type.
"""
def get_module(connection_type, sink_type) do
@sink_types
|> Map.get(connection_type, [])
|> Enum.find(fn %{type: type} -> type == sink_type end)
|> case do
%{module: module} -> {:ok, module}
nil -> {:error, :unknown_source_type}
end
end

@doc """
Returns the required fields for the given connection type and source type.
"""
def get_required_fields(connection_type, sink_type) do
@sink_types
|> Map.get(connection_type, [])
|> Enum.find(fn %{type: type} -> type == sink_type end)
|> case do
%{required_fields: fields} -> {:ok, fields}
_ -> {:error, :unknown_source_type}
end
end
end
40 changes: 34 additions & 6 deletions lib/cocktailparty_web/controllers/admin/sink_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,28 @@ defmodule CocktailpartyWeb.Admin.SinkController do

def new(conn, _params) do
changeset = SinkCatalog.change_sink(%Sink{})
# get list of redis instances
instances = Input.list_sink_connections()
# get list of users
# get list of available sink connections
connections = Input.list_sink_connections()
# get list of users authorized to create sinks
users = SinkCatalog.list_authorized_users()

case instances do
# Build the map of connection IDs to sink types with required fields
connection_sink_types = build_connection_sink_types(connections)

case connections do
[] ->
conn
|> put_flash(:error, "A receiving redis instance is required to create a sink.")
|> put_flash(:error, "A receiving connection is required to create a sink.")
|> redirect(to: ~p"/admin/connections")

_ ->
render(conn, :new, changeset: changeset, connections: instances, users: users)
render(conn, :new,
changeset: changeset,
connections: connections,
connection_sink_types: connection_sink_types,
sink_types: [],
users: users
)
end
end

Expand Down Expand Up @@ -109,4 +118,23 @@ defmodule CocktailpartyWeb.Admin.SinkController do
|> put_flash(:info, "Sink deleted successfully.")
|> redirect(to: ~p"/admin/sinks")
end

defp build_connection_sink_types(connections) do
Enum.reduce(connections, %{}, fn connection, acc ->
sink_types = SinkCatalog.get_available_sink_types(connection.id)

sink_type_info =
Enum.map(sink_types, fn sink_type ->
{:ok, required_fields} =
SinkCatalog.SinkType.get_required_fields(connection.type, sink_type.type)

%{
type: sink_type.type,
required_fields: Enum.map(required_fields, &Atom.to_string/1)
}
end)

Map.put(acc, Integer.to_string(connection.id), sink_type_info)
end)
end
end
2 changes: 2 additions & 0 deletions lib/cocktailparty_web/controllers/admin/sink_html.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule CocktailpartyWeb.Admin.SinkHTML do
attr :changeset, Ecto.Changeset, required: true
attr :action, :string, required: true
attr :connections, :list, required: true
attr :connection_sink_types, :map, required: true
attr :sink_types, :list, required: true
attr :users, :list, required: true

def sink_form(assigns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,96 @@
</.error>
<.input field={f[:name]} type="text" label="Name" />
<.input field={f[:description]} type="text" label="Description" />
<.input field={f[:type]} type="text" label="Type" />
<.input
field={f[:connection_id]}
type="select"
label="Connection"
options={Enum.map(@connections, fn connection -> {connection.name, connection.id} end)}
/>
<.input
field={f[:user_id]}
type="select"
label="User"
options={Enum.map(@users, fn user -> {user.email, user.id} end)}
/>
<.input field={f[:type]} type="select" label="Source type" options={@source_types} />
<!-- Display Required Fields -->
<div class="mt-14">
<!-- Required fields will be displayed here -->
<dl class="-my-4 divide-y divide-zinc-100">
<div class="flex gap-4 py-4 sm:gap-8" id="required-fields"></div>
</dl>
</div>
<.input field={f[:config_yaml]} type="textarea" label="Config" />
<.input type="checkbox" field={f[:public]} label="Public" />
<:actions>
<.button>Save Sink</.button>
<.button>Save Source</.button>
</:actions>
</.simple_form>
<script>
// Include the connection_source_types map as a JavaScript variable
var connectionSourceTypes = <%= raw(Jason.encode!(@connection_source_types)) %>;

// Function to update the required fields display based on the selected source type
function updateRequiredFields() {
const connectionId = document.getElementById("source_connection_id").value;
const sourceType = document.getElementById("source_type").value;

const requiredFieldsDiv = document.getElementById("required-fields");
requiredFieldsDiv.innerHTML = '';

if (connectionSourceTypes[connectionId]) {
const sourceTypeInfo = connectionSourceTypes[connectionId].find(function(item) {
return item.type === sourceType;
});

if (sourceTypeInfo && sourceTypeInfo.required_fields) {
var heading = document.createElement("dt");
heading.setAttribute("class", "w-1/4 flex-none text-[0.8125rem] leading-6 text-zinc-500");
var content = document.createTextNode("Required fields:");
heading.appendChild(content);
requiredFieldsDiv.appendChild(heading);

sourceTypeInfo.required_fields.forEach(function(field) {
var listItem = document.createElement("dd");
listItem.setAttribute("class", "text-sm leading-6 text-zinc-700");
listItem.innerText = field;
requiredFieldsDiv.appendChild(listItem);
});
}
}
}

// Event listener for when the connection selection changes
document.getElementById("source_connection_id").addEventListener("change", function() {
const connectionId = this.value;
const sourceTypeSelect = document.getElementById("source_type");
const requiredFieldsDiv = document.getElementById("required-fields");

// Clear existing options and required fields
sourceTypeSelect.innerHTML = '';
requiredFieldsDiv.innerHTML = '';

if (connectionSourceTypes[connectionId]) {
// Populate the source type select with options
connectionSourceTypes[connectionId].forEach(function(sourceTypeInfo) {
const option = document.createElement("option");
option.value = sourceTypeInfo.type;
option.text = sourceTypeInfo.type;
sourceTypeSelect.add(option);
});

// Select the first source type and update required fields
if (sourceTypeSelect.options.length > 0) {
sourceTypeSelect.selectedIndex = 0;
updateRequiredFields();
}
} else {
// If no source types are available, show a message or disable the select
const option = document.createElement("option");
option.text = "No source types available";
option.disabled = true;
sourceTypeSelect.add(option);
}
});

// Event listener for when the source type selection changes
document.getElementById("source_type").addEventListener("change", updateRequiredFields);
updateRequiredFields();
</script>


2 changes: 2 additions & 0 deletions lib/cocktailparty_web/controllers/sink_html.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule CocktailpartyWeb.SinkHTML do
attr :changeset, Ecto.Changeset, required: true
attr :action, :string, required: true
attr :connections, :list, required: true
attr :connection_sink_types, :map, required: true
attr :sink_types, :list, required: true

def sink_form(assigns)
end

0 comments on commit bc434ad

Please sign in to comment.