Skip to content

Commit

Permalink
Merge branch 'stomp' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
gallypette committed Oct 4, 2024
2 parents 9e8e868 + 6f9a1e6 commit 5bf58b6
Show file tree
Hide file tree
Showing 75 changed files with 2,211 additions and 689 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,5 @@ config :remote_ip, debug: false

# TimeZone data
config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase

config :tzdata, :autoupdate, :disabled
29 changes: 14 additions & 15 deletions lib/cocktailparty/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Cocktailparty.Broker do

defstruct [
:pubsub,
:redis_instance,
:connection,
subscribed: [],
subscribing: []
]
Expand All @@ -18,20 +18,20 @@ defmodule Cocktailparty.Broker do
end

def init(opts) do
redis_instance = opts[:redis_instance]
connection = opts[:connection]

{:ok, pubsub} =
PubSub.start_link(
host: redis_instance.hostname,
port: redis_instance.port,
name: {:global, "pubsub_" <> Integer.to_string(redis_instance.id)}
host: connection.config["hostname"],
port: connection.config["port"],
name: {:global, "pubsub_" <> Integer.to_string(connection.id)}
)

# Logger.info("Starting pubsub for #{redis_instance.name}")
Logger.info("Starting pubsub for #{redis_instance.id}")
# Logger.info("Starting pubsub for #{connection.name}")
Logger.info("Starting pubsub for #{connection.id}")

# Get sources from the catalog
sources = Catalog.list_redis_instance_sources(redis_instance.id)
sources = Catalog.list_connection_sources(connection.id)

subscribing =
Enum.reduce(sources, [], fn source, subscribing ->
Expand All @@ -41,8 +41,7 @@ defmodule Cocktailparty.Broker do
[source | subscribing]
end)

{:ok,
%{subscribing: subscribing, pubsub: pubsub, subscribed: [], redis_instance: redis_instance}}
{:ok, %{subscribing: subscribing, pubsub: pubsub, subscribed: [], connection: connection}}
end

# Receiving a connection notification from Redix about source we are subscribing to.
Expand All @@ -63,7 +62,7 @@ defmodule Cocktailparty.Broker do
subscribing: subscribing,
subscribed: subscribed,
pubsub: state.pubsub,
redis_instance: state.redis_instance
connection: state.connection
}

# Log the subscription
Expand All @@ -89,7 +88,7 @@ defmodule Cocktailparty.Broker do
subscribing: subscribing,
subscribed: subscribed,
pubsub: state.pubsub,
redis_instance: state.redis_instance
connection: state.connection
}

Logger.info("Disconnected from #{inspect(current_sub.name)}")
Expand Down Expand Up @@ -156,7 +155,7 @@ defmodule Cocktailparty.Broker do
subscribing: subscribing,
pubsub: state.pubsub,
subscribed: subscribed,
redis_instance: state.redis_instance
connection: state.connection
}}
end

Expand All @@ -171,7 +170,7 @@ defmodule Cocktailparty.Broker do
subscribing: subscribing,
pubsub: state.pubsub,
subscribed: state.subscribed,
redis_instance: state.redis_instance
connection: state.connection
}}
end

Expand Down Expand Up @@ -205,7 +204,7 @@ defmodule Cocktailparty.Broker do
subscribing: state.subscribing,
pubsub: state.pubsub,
subscribed: state.subscribed,
redis_instance: state.redis_instance
connection: state.connection
}}
end
end
132 changes: 105 additions & 27 deletions lib/cocktailparty/catalog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ defmodule Cocktailparty.Catalog do
The Catalog context.
"""
require Logger
import Cocktailparty.Util
import Ecto.Changeset

import Ecto.Query, warn: false
alias Cocktailparty.Input.RedisInstance
alias Cocktailparty.Input.Connection
alias Cocktailparty.Input
alias Cocktailparty.Repo
alias Cocktailparty.Catalog.Source
alias Cocktailparty.Catalog.{Source, SourceType}
alias Cocktailparty.Accounts.User
alias Cocktailparty.Accounts
alias Cocktailparty.UserManagement
alias CocktailpartyWeb.Tracker
alias Cocktailparty.Catalog.SourceManager

@doc """
Returns the list of sources.
Expand All @@ -25,7 +29,15 @@ defmodule Cocktailparty.Catalog do
def list_sources do
Repo.all(Source)
|> Repo.preload(:users)
|> Repo.preload(:redis_instance)
|> Repo.preload(:connection)
end

@doc """
Returns the list of available source types for a given connection.
"""
def get_available_source_types(connection_id) do
connection = Input.get_connection!(connection_id)
SourceType.get_source_types_for_connection(connection.type)
end

@doc """
Expand Down Expand Up @@ -87,27 +99,70 @@ defmodule Cocktailparty.Catalog do

Repo.all(query)
|> Repo.preload(:users)
|> Repo.preload(:redis_instance)
|> Repo.preload(:connection)
end

@doc """
Returns the list of sources for a redis instance
## Examples
iex> list_redis_instance_sources(redis_instance_id)
iex> list_connection_sources(connection_id)
[%Source{}, ...]
"""
def list_redis_instance_sources(redis_instance_id) do
def list_connection_sources(connection_id) do
Repo.all(
from s in Source,
join: r in RedisInstance,
on: s.redis_instance_id == r.id,
where: r.id == ^redis_instance_id
join: r in Connection,
on: s.connection_id == r.id,
where: r.id == ^connection_id
)
end

@doc """
Gets a single source, with its config as a map
Raises `Ecto.NoResultsError` if the Source does not exist.
## Examples
iex> get_source!(123)
%Source{}
iex> get_source!(456)
** (Ecto.NoResultsError)
"""
def get_source_map!(id) do
Repo.get!(Source, id)
|> Repo.preload(:users)
|> Repo.preload(:connection)
end

@doc """
Gets a single source, with its config as a string
Raises `Ecto.NoResultsError` if the Source does not exist.
## Examples
iex> get_source!(123)
%Source{}
iex> get_source!(456)
** (Ecto.NoResultsError)
"""
def get_source_text!(id) do
source = Repo.get!(Source, id)

source
|> Repo.preload(:users)
|> Repo.preload(:connection)
|> Map.put(:config, map_to_yaml!(source.config))
end

@doc """
Gets a single source.
Expand All @@ -125,7 +180,7 @@ defmodule Cocktailparty.Catalog do
def get_source!(id) do
Repo.get!(Source, id)
|> Repo.preload(:users)
|> Repo.preload(:redis_instance)
|> Repo.preload(:connection)
end

@doc """
Expand All @@ -141,13 +196,14 @@ defmodule Cocktailparty.Catalog do
"""
def create_source(attrs \\ %{}) do
Cocktailparty.Input.get_redis_instance!(attrs["redis_instance_id"])
Cocktailparty.Input.get_connection_map!(attrs["connection_id"])
|> Ecto.build_assoc(:sources)
|> change_source(attrs)
|> validate_source_type()
|> Repo.insert()
|> case do
{:ok, source} ->
_ = notify_broker(source, {:new_source, source})
SourceManager.start_source(source)
notify_monitor({:subscribe, "feed:" <> Integer.to_string(source.id)})
{:ok, source}

Expand All @@ -156,6 +212,19 @@ defmodule Cocktailparty.Catalog do
end
end

defp validate_source_type(changeset) do
connection_id = get_field(changeset, :connection_id)
source_type = get_field(changeset, :type)

with %Connection{type: connection_type} <- Input.get_connection!(connection_id),
source_types <- SourceType.get_source_types_for_connection(connection_type),
true <- Enum.any?(source_types, fn %{type: type} -> type == source_type end) do
changeset
else
_ -> add_error(changeset, :type, "is not valid for the selected connection type")
end
end

@doc """
Updates a source.
Expand All @@ -173,6 +242,8 @@ defmodule Cocktailparty.Catalog do

# Preserve the existing users association
changeset = Ecto.Changeset.put_assoc(changeset, :users, source.users)
# TODO
# remove broker logic -- call the source itself

case changeset do
%Ecto.Changeset{
Expand All @@ -181,21 +252,28 @@ defmodule Cocktailparty.Catalog do
}
when source.channel != new_channel ->
# We ask the broker to delete the source with the old channel
notify_broker(source, {:delete_source, source})
# TODO Terminate the source gen_server
# notify_broker(source, {:delete_source, source})
# We notify the monitor
notify_monitor({:unsubscribe, "feed:" <> Integer.to_string(source.id)})

# We update the source
{:ok, source} = Repo.update(changeset)
{:ok, source} =
changeset
|> validate_source_type()
|> Repo.update()

# And we ask the broker and the pubsubmonitor to subscribe to the updated source
notify_broker(source, {:new_source, source})
# TODO Create a new source gen_server
# notify_broker(source, {:new_source, source})
notify_monitor({:subscribe, "feed:" <> Integer.to_string(source.id)})

{:ok, source}

_ ->
Repo.update(changeset)
changeset
|> validate_source_type()
|> Repo.update()
end
end

Expand All @@ -215,7 +293,7 @@ defmodule Cocktailparty.Catalog do
Repo.delete(source)
|> case do
{:ok, source} ->
_ = notify_broker(source, {:delete_source, source})
:ok = SourceManager.stop_source(source.id)
notify_monitor({:unsubscribe, "feed:" <> Integer.to_string(source.id)})

# kick users who subscribed to the source
Expand All @@ -225,8 +303,8 @@ defmodule Cocktailparty.Catalog do
kick_all_users_from_source(source.id)
{:ok, source}

{:error, msg} ->
{:error, msg}
{:error, changeset} ->
{:error, changeset}
end
end

Expand Down Expand Up @@ -256,7 +334,7 @@ defmodule Cocktailparty.Catalog do
end

def is_subscribed?(source_id, user_id) do
src = get_source!(source_id)
src = get_source_map!(source_id)
user = Accounts.get_user!(user_id)

if Enum.member?(src.users, user) do
Expand All @@ -279,7 +357,7 @@ defmodule Cocktailparty.Catalog do
"""
def subscribe(source_id, user_id) do
src = get_source!(source_id)
src = get_source_map!(source_id)
user = Accounts.get_user!(user_id)
user_list = src.users |> Enum.concat([user])

Expand All @@ -301,7 +379,7 @@ defmodule Cocktailparty.Catalog do
"""
def mass_subscribe(source_id) do
source = get_source!(source_id)
source = get_source_map!(source_id)
all_users = UserManagement.list_users_short()

get_src_users =
Expand Down Expand Up @@ -528,7 +606,7 @@ defmodule Cocktailparty.Catalog do

def get_broker(%Source{} = source) do
# locate the reponsible broker process
case GenServer.whereis({:global, "broker_" <> Integer.to_string(source.redis_instance_id)}) do
case GenServer.whereis({:global, "broker_" <> Integer.to_string(source.connection_id)}) do
{name, node} ->
# TODO
Logger.error("TODO: contacting remote broker in the cluster: #{node}/#{name}")
Expand All @@ -538,7 +616,7 @@ defmodule Cocktailparty.Catalog do
# TODO
Logger.error(
"looks like broker_" <>
Integer.to_string(source.redis_instance_id) <> " is dead - should not happen"
Integer.to_string(source.connection_id) <> " is dead - should not happen"
)

nil
Expand All @@ -562,9 +640,9 @@ defmodule Cocktailparty.Catalog do
Repo.one(query)
end

defp notify_broker(%Source{} = source, msg) do
GenServer.cast(get_broker(source), msg)
end
# defp notify_broker(%Source{} = source, msg) do
# GenServer.cast(get_broker(source), msg)
# end

defp notify_monitor(msg) do
GenServer.cast({:global, Cocktailparty.PubSubMonitor}, msg)
Expand Down
Loading

0 comments on commit 5bf58b6

Please sign in to comment.