Skip to content

Commit

Permalink
chg: [sinks] wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Nov 29, 2024
1 parent bc434ad commit 31b96f0
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 174 deletions.
1 change: 1 addition & 0 deletions lib/cocktailparty/catalog/source_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Cocktailparty.Catalog.SourceType do

@source_types %{
"redis_pub_sub" => [
# TODO change to sub
%{type: "pubsub", module: Cocktailparty.Catalog.RedisChannel, required_fields: [:channel]}
],
"redis" => [
Expand Down
60 changes: 58 additions & 2 deletions lib/cocktailparty/sink_catalog/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,64 @@ defmodule Cocktailparty.SinkCatalog.Sink do
@doc false
def changeset(sink, attrs) do
sink
|> cast(attrs, [:name, :type, :description, :config, :connection_id, :user_id])
|> validate_required([:name, :type, :config])
|> cast(attrs, [:name, :type, :description, :config_yaml, :connection_id, :user_id])
|> validate_required([:name, :type])
|> validate_required_config_yaml(attrs)
|> unique_constraint(:name)
|> parse_yaml()
|> validate_config_fields()
end

defp validate_required_config_yaml(changeset, attrs) do
# Only validate presence of config_yaml if it's provided in attrs
if Map.has_key?(attrs, "config_yaml") do
validate_required(changeset, [:config_yaml])
else
changeset
end
end

defp parse_yaml(changeset) do
config_yaml = get_field(changeset, :config_yaml) || ""

case YamlElixir.read_from_string(config_yaml) do
{:ok, config_map} when is_map(config_map) ->
put_change(changeset, :config, config_map)

{:ok, _} ->
add_error(changeset, :config_yaml, "must be a valid YAML mapping")

{:error, reason} ->
add_error(changeset, :config_yaml, "invalid YAML format: #{inspect(reason)}")
end
end

defp validate_config_fields(changeset) do
if changeset.valid? do
connection_id = get_field(changeset, :connection_id)
sink_type = get_field(changeset, :type)
config = get_field(changeset, :config) || %{}

connection = Cocktailparty.Input.get_connection!(connection_id)
with {:ok, required_fields} <- Cocktailparty.SinkCatalog.SinkType.get_required_fields(connection.type, sink_type) do
missing_fields =
required_fields
|> Enum.filter(fn field -> Map.get(config, to_string(field)) in [nil, ""] end)

if missing_fields == [] do
changeset
else
Enum.reduce(missing_fields, changeset, fn field, acc ->
add_error(acc, :config_yaml, "missing required field: #{field}")
end)
end
else
_ -> add_error(changeset, :type, "is invalid")
end
else
changeset
end
end


end
2 changes: 1 addition & 1 deletion lib/cocktailparty/sink_catalog/sink_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Cocktailparty.SinkCatalog.SinkType do
"""

@sink_types %{
"redis_pub" => [
"redis_pub_sub" => [
%{type: "pub", module: Cocktailparty.SinkCatalog.RedisChannel, required_fields: [:channel]}
]
}
Expand Down
73 changes: 62 additions & 11 deletions lib/cocktailparty_web/controllers/admin/sink_controller.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule CocktailpartyWeb.Admin.SinkController do
use CocktailpartyWeb, :controller

import Cocktailparty.Util

alias Cocktailparty.SinkCatalog
alias Cocktailparty.SinkCatalog.Sink
alias Cocktailparty.Input
Expand Down Expand Up @@ -45,12 +47,22 @@ defmodule CocktailpartyWeb.Admin.SinkController do
|> redirect(to: ~p"/admin/sinks/#{sink}")

{:error, %Ecto.Changeset{} = changeset} ->
# get list of redis instances
instances = Input.list_connections()
sink_types = get_sink_types_from_params(sink_params)

# get list of connections
connections = Input.list_connections()
connection_sink_types = build_connection_sink_types(connections)

# get list of users
users = SinkCatalog.list_authorized_users()

render(conn, :new, changeset: changeset, connections: instances, users: users)
render(conn, :new,
changeset: changeset,
connections: connections,
connection_sink_types: connection_sink_types,
sink_types: sink_types,
users: users
)
end
end

Expand All @@ -64,29 +76,49 @@ defmodule CocktailpartyWeb.Admin.SinkController do
def edit(conn, %{"id" => id}) do
sink = SinkCatalog.get_sink!(id)
changeset = SinkCatalog.change_sink(sink)
# get list of redis instances
instances = Input.list_sink_connections()
# get list of connections
connections = Input.list_sink_connections()

# Convert the config map to a YAML string and set it in the changeset
config_yaml = map_to_yaml!(sink.config)
changeset = Ecto.Changeset.put_change(changeset, :config_yaml, config_yaml)

# Build the map of connection IDs to sink types with required fields
connection_sink_types = build_connection_sink_types(connections)

# Get source types for the current connection
connection = Input.get_connection!(sink.connection_id)

sink_types =
SinkCatalog.get_available_sink_types(connection.id) |> Enum.map(&{&1.type, &1.type})

# get list of users
users = SinkCatalog.list_authorized_users()

case instances do

case connections do
[] ->
conn
|> put_flash(:error, "A receiving redis instance is required to edit a sink.")
|> put_flash(:error, "A receiving connection is required to edit a sink.")
|> redirect(to: ~p"/admin/connections")

_ ->
render(conn, :edit,
sink: sink,
changeset: changeset,
connections: instances,
connections: connections,
connection_sink_types: connection_sink_types,
source_types: sink_types,
users: users
)
end
end

def update(conn, %{"id" => id, "sink" => sink_params}) do
sink = SinkCatalog.get_sink!(id)
# get list of redis instances
connections = Input.list_sink_connections()
connection_sink_types = build_connection_sink_types(connections)

case SinkCatalog.update_sink(sink, sink_params) do
{:ok, sink} ->
Expand All @@ -95,20 +127,31 @@ defmodule CocktailpartyWeb.Admin.SinkController do
|> redirect(to: ~p"/admin/sinks/#{sink}")

{:error, %Ecto.Changeset{} = changeset} ->
# get list of redis instances
instances = Input.list_sink_connections()
# get list of users
users = SinkCatalog.list_authorized_users()

render(conn, :edit,
sink: sink,
changeset: changeset,
connections: instances,
connections: connections,
connection_sink_types: connection_sink_types,
users: users
)
end

# {:error, reason = %YamlElixir.ParsingError{}} ->
# conn
# |> put_flash(:error, "Failed to parse YAML: #{reason.message}")
# |> render("edit.html",
# source: source,
# changeset: Source.changeset(source, source_params),
# connections: connections
# )
# end
end



def delete(conn, %{"id" => id}) do
sink = SinkCatalog.get_sink!(id)

Expand Down Expand Up @@ -137,4 +180,12 @@ defmodule CocktailpartyWeb.Admin.SinkController do
Map.put(acc, Integer.to_string(connection.id), sink_type_info)
end)
end

defp get_sink_types_from_params(%{"connection_id" => connection_id})
when connection_id != "" do
connection = Input.get_connection!(connection_id)
SinkCatalog.get_available_sink_types(connection.id) |> Enum.map(&{&1.type, &1.type})
end

defp get_sink_types_from_params(_), do: []
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
changeset={@changeset}
action={~p"/admin/sinks/#{@sink}"}
connections={@connections}
connection_sink_types={@connection_sink_types}
sink_types={@sink_types}
users={@users}
/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
changeset={@changeset}
action={~p"/admin/sinks"}
connections={@connections}
connection_sink_types={@connection_sink_types}
sink_types={@sink_types}
users={@users}
/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
label="Connection"
options={Enum.map(@connections, fn connection -> {connection.name, connection.id} end)}
/>
<.input field={f[:type]} type="select" label="Source type" options={@source_types} />
<.input field={f[:type]} type="select" label="Sink type" options={@sink_types} />
<.input
field={f[:user_id]}
type="select"
label="User"
options={Enum.map(@users, fn user -> {user.email, user.id} end)}
/>
<!-- Display Required Fields -->
<div class="mt-14">
<!-- Required fields will be displayed here -->
Expand All @@ -19,36 +25,35 @@
</dl>
</div>
<.input field={f[:config_yaml]} type="textarea" label="Config" />
<.input type="checkbox" field={f[:public]} label="Public" />
<:actions>
<.button>Save Source</.button>
<.button>Save Sink</.button>
</:actions>
</.simple_form>
<script>
// Include the connection_source_types map as a JavaScript variable
var connectionSourceTypes = <%= raw(Jason.encode!(@connection_source_types)) %>;
// Include the connection_sink_types map as a JavaScript variable
var connectionSinkTypes = <%= raw(Jason.encode!(@connection_sink_types)) %>;

// Function to update the required fields display based on the selected source type
// Function to update the required fields display based on the selected sink type
function updateRequiredFields() {
const connectionId = document.getElementById("source_connection_id").value;
const sourceType = document.getElementById("source_type").value;
const connectionId = document.getElementById("sink_connection_id").value;
const sinkType = document.getElementById("sink_type").value;

const requiredFieldsDiv = document.getElementById("required-fields");
requiredFieldsDiv.innerHTML = '';

if (connectionSourceTypes[connectionId]) {
const sourceTypeInfo = connectionSourceTypes[connectionId].find(function(item) {
return item.type === sourceType;
if (connectionSinkTypes[connectionId]) {
const sinkTypeInfo = connectionSinkTypes[connectionId].find(function(item) {
return item.type === sinkType;
});

if (sourceTypeInfo && sourceTypeInfo.required_fields) {
if (sinkTypeInfo && sinkTypeInfo.required_fields) {
var heading = document.createElement("dt");
heading.setAttribute("class", "w-1/4 flex-none text-[0.8125rem] leading-6 text-zinc-500");
var content = document.createTextNode("Required fields:");
heading.appendChild(content);
requiredFieldsDiv.appendChild(heading);

sourceTypeInfo.required_fields.forEach(function(field) {
sinkTypeInfo.required_fields.forEach(function(field) {
var listItem = document.createElement("dd");
listItem.setAttribute("class", "text-sm leading-6 text-zinc-700");
listItem.innerText = field;
Expand All @@ -59,40 +64,40 @@
}

// Event listener for when the connection selection changes
document.getElementById("source_connection_id").addEventListener("change", function() {
document.getElementById("sink_connection_id").addEventListener("change", function() {
const connectionId = this.value;
const sourceTypeSelect = document.getElementById("source_type");
const sinkTypeSelect = document.getElementById("sink_type");
const requiredFieldsDiv = document.getElementById("required-fields");

// Clear existing options and required fields
sourceTypeSelect.innerHTML = '';
sinkTypeSelect.innerHTML = '';
requiredFieldsDiv.innerHTML = '';

if (connectionSourceTypes[connectionId]) {
// Populate the source type select with options
connectionSourceTypes[connectionId].forEach(function(sourceTypeInfo) {
if (connectionSinkTypes[connectionId]) {
// Populate the sink type select with options
connectionSinkTypes[connectionId].forEach(function(sinkTypeInfo) {
const option = document.createElement("option");
option.value = sourceTypeInfo.type;
option.text = sourceTypeInfo.type;
sourceTypeSelect.add(option);
option.value = sinkTypeInfo.type;
option.text = sinkTypeInfo.type;
sinkTypeSelect.add(option);
});

// Select the first source type and update required fields
if (sourceTypeSelect.options.length > 0) {
sourceTypeSelect.selectedIndex = 0;
// Select the first sink type and update required fields
if (sinkTypeSelect.options.length > 0) {
sinkTypeSelect.selectedIndex = 0;
updateRequiredFields();
}
} else {
// If no source types are available, show a message or disable the select
// If no sink types are available, show a message or disable the select
const option = document.createElement("option");
option.text = "No source types available";
option.text = "No sink types available";
option.disabled = true;
sourceTypeSelect.add(option);
sinkTypeSelect.add(option);
}
});

// Event listener for when the source type selection changes
document.getElementById("source_type").addEventListener("change", updateRequiredFields);
// Event listener for when the sink type selection changes
document.getElementById("sink_type").addEventListener("change", updateRequiredFields);
updateRequiredFields();
</script>

Expand Down
Loading

0 comments on commit 31b96f0

Please sign in to comment.