Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 64 additions & 67 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,32 @@ defmodule Realtime.Api do
Logger.debug("create_tenant #{inspect(attrs, pretty: true)}")
tenant_id = Map.get(attrs, :external_id) || Map.get(attrs, "external_id")

%Tenant{}
|> Tenant.changeset(attrs)
|> region_aware_write(:insert, tenant_id)
if master_region?() do
%Tenant{}
|> Tenant.changeset(attrs)
|> Repo.insert()
else
call(:create_tenant, [attrs], tenant_id)
end
end

@doc """
Updates a tenant.

## Examples

iex> update_tenant(tenant, %{field: new_value})
{:ok, %Tenant{}}

iex> update_tenant(tenant, %{field: bad_value})
{:error, %Ecto.Changeset{}}

"""
def update_tenant(%Tenant{} = tenant, attrs) do
@spec update_tenant_by_external_id(binary(), map()) :: {:ok, Tenant.t()} | {:error, term()}
def update_tenant_by_external_id(tenant_id, attrs) when is_binary(tenant_id) do
if master_region?() do
tenant_id
|> get_tenant_by_external_id(use_replica?: false)
|> update_tenant(attrs)
else
call(:update_tenant_by_external_id, [tenant_id, attrs], tenant_id)
end
end

defp update_tenant(%Tenant{} = tenant, attrs) do
changeset = Tenant.changeset(tenant, attrs)
updated = region_aware_write(changeset, :update, tenant.external_id)
updated = Repo.update(changeset)

case updated do
{:ok, tenant} ->
Expand All @@ -150,42 +156,31 @@ defmodule Realtime.Api do
updated
end

@doc """
Deletes a tenant.

## Examples

iex> delete_tenant(tenant)
{:ok, %Tenant{}}

iex> delete_tenant(tenant)
{:error, %Ecto.Changeset{}}

"""
def delete_tenant(%Tenant{} = tenant), do: Repo.delete(tenant)

@spec delete_tenant_by_external_id(String.t()) :: boolean()
def delete_tenant_by_external_id(id) do
from(t in Tenant, where: t.external_id == ^id)
|> region_aware_write(:delete_all, id)
|> case do
{num, _} when num > 0 -> true
_ -> false
if master_region?() do
query = from(t in Tenant, where: t.external_id == ^id)
{num, _} = Repo.delete_all(query)
num > 0
else
call(:delete_tenant_by_external_id, [id], id)
end
end

@spec get_tenant_by_external_id(String.t(), atom()) :: Tenant.t() | nil
def get_tenant_by_external_id(external_id, repo \\ :replica)
when repo in [:primary, :replica] do
repo =
case repo do
:primary -> Repo
:replica -> Replica.replica()
end
@spec get_tenant_by_external_id(String.t(), Keyword.t()) :: Tenant.t() | nil
def get_tenant_by_external_id(external_id, opts \\ []) do
use_replica? = Keyword.get(opts, :use_replica?, true)

Tenant
|> repo.get_by(external_id: external_id)
|> repo.preload(:extensions)
cond do
use_replica? ->
Replica.replica().get_by(Tenant, external_id: external_id) |> Replica.replica().preload(:extensions)

!use_replica? and master_region?() ->
Repo.get_by(Tenant, external_id: external_id) |> Repo.preload(:extensions)

true ->
call(:get_tenant_by_external_id, [external_id, opts], external_id)
end
end

defp list_extensions(type) do
Expand All @@ -195,26 +190,36 @@ defmodule Realtime.Api do
end

def rename_settings_field(from, to) do
for extension <- list_extensions("postgres_cdc_rls") do
{value, settings} = Map.pop(extension.settings, from)
new_settings = Map.put(settings, to, value)

extension
|> Changeset.cast(%{settings: new_settings}, [:settings])
|> region_aware_write(:update!, extension.external_id)
if master_region?() do
for extension <- list_extensions("postgres_cdc_rls") do
{value, settings} = Map.pop(extension.settings, from)
new_settings = Map.put(settings, to, value)

extension
|> Changeset.cast(%{settings: new_settings}, [:settings])
|> Repo.update()
end
else
call(:rename_settings_field, [from, to], from)
end
end

@spec preload_counters(nil | Realtime.Api.Tenant.t(), any()) :: nil | Realtime.Api.Tenant.t()
@doc """
Updates the migrations_ran field for a tenant.
"""
@spec update_migrations_ran(binary(), integer()) :: {:ok, Tenant.t()} | {:error, term()}
def update_migrations_ran(external_id, count) do
external_id
|> Cache.get_tenant_by_external_id()
|> Tenant.changeset(%{migrations_ran: count})
|> region_aware_write(:update!, external_id)
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
if master_region?() do
tenant = get_tenant_by_external_id(external_id, use_replica?: false)

tenant
|> Tenant.changeset(%{migrations_ran: count})
|> Repo.update()
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
else
call(:update_migrations_ran, [external_id, count], external_id)
end
end

def preload_counters(nil), do: nil
Expand Down Expand Up @@ -257,21 +262,13 @@ defmodule Realtime.Api do

defp maybe_restart_db_connection(_changeset), do: nil

defp local_call? do
defp master_region? do
region = Application.get_env(:realtime, :region)
master_region = Application.get_env(:realtime, :master_region) || region
region == master_region
end

defp region_aware_write(%struct{} = argument, operation, tenant_id) when struct in [Changeset, Ecto.Query] do
if local_call?(),
do: local_call(operation, [argument], tenant_id),
else: remote_call(operation, [argument], tenant_id)
end

defp local_call(operation, args, _tenant_id), do: apply(Realtime.Repo, operation, args)

defp remote_call(operation, args, tenant_id) do
defp call(operation, args, tenant_id) do
master_region = Application.get_env(:realtime, :master_region)

with {:ok, master_node} <- Nodes.node_from_region(master_region, self()),
Expand All @@ -281,7 +278,7 @@ defmodule Realtime.Api do
end

defp wrapped_call(master_node, operation, args, tenant_id) do
case GenRpc.call(master_node, Realtime.Repo, operation, args, tenant_id: tenant_id) do
case GenRpc.call(master_node, __MODULE__, operation, args, tenant_id: tenant_id) do
{:error, :rpc_error, reason} -> {:error, reason}
{:error, reason} -> {:error, reason}
result -> {:ok, result}
Expand Down
2 changes: 0 additions & 2 deletions lib/realtime/repo_replica.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ defmodule Realtime.Repo.Replica do
# Do not create module if replica isn't set or configuration is not present
cond do
is_nil(replica) ->
Logger.info("Replica region not found, defaulting to Realtime.Repo")
Realtime.Repo

is_nil(replica_conf) ->
Logger.info("Replica config not found for #{region} region")
Realtime.Repo

true ->
Expand Down
6 changes: 2 additions & 4 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ defmodule Realtime.Tenants do
@spec suspend_tenant_by_external_id(String.t()) :: {:ok, Tenant.t()} | {:error, term()}
def suspend_tenant_by_external_id(external_id) do
external_id
|> Cache.get_tenant_by_external_id()
|> Api.update_tenant(%{suspend: true})
|> Api.update_tenant_by_external_id(%{suspend: true})
|> tap(fn _ -> broadcast_operation_event(:suspend_tenant, external_id) end)
end

Expand All @@ -469,8 +468,7 @@ defmodule Realtime.Tenants do
@spec unsuspend_tenant_by_external_id(String.t()) :: {:ok, Tenant.t()} | {:error, term()}
def unsuspend_tenant_by_external_id(external_id) do
external_id
|> Cache.get_tenant_by_external_id()
|> Api.update_tenant(%{suspend: false})
|> Api.update_tenant_by_external_id(%{suspend: false})
|> tap(fn _ -> broadcast_operation_event(:unsuspend_tenant, external_id) end)
end

Expand Down
8 changes: 4 additions & 4 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ defmodule RealtimeWeb.TenantController do
)

def update(conn, %{"tenant_id" => external_id, "tenant" => tenant_params}) do
tenant = Api.get_tenant_by_external_id(external_id)
tenant = Api.get_tenant_by_external_id(external_id, use_replica?: false)

case tenant do
nil ->
Expand All @@ -160,7 +160,7 @@ defmodule RealtimeWeb.TenantController do
end

tenant ->
with {:ok, %Tenant{} = tenant} <- Api.update_tenant(tenant, tenant_params) do
with {:ok, %Tenant{} = tenant} <- Api.update_tenant_by_external_id(tenant.external_id, tenant_params) do
conn
|> put_status(:ok)
|> put_resp_header("location", Routes.tenant_path(conn, :show, tenant))
Expand Down Expand Up @@ -192,7 +192,7 @@ defmodule RealtimeWeb.TenantController do
def delete(conn, %{"tenant_id" => tenant_id}) do
stop_all_timeout = Enum.count(PostgresCdc.available_drivers()) * 1_000

with %Tenant{} = tenant <- Api.get_tenant_by_external_id(tenant_id, :primary),
with %Tenant{} = tenant <- Api.get_tenant_by_external_id(tenant_id, use_replica: false),
_ <- Tenants.suspend_tenant_by_external_id(tenant_id),
true <- Api.delete_tenant_by_external_id(tenant_id),
true <- Cache.distributed_invalidate_tenant_cache(tenant_id),
Expand Down Expand Up @@ -231,7 +231,7 @@ defmodule RealtimeWeb.TenantController do
)

def reload(conn, %{"tenant_id" => tenant_id}) do
case Tenants.get_tenant_by_external_id(tenant_id) do
case Api.get_tenant_by_external_id(tenant_id, use_replica?: false) do
nil ->
log_error("TenantNotFound", "Tenant not found")

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/plugs/assign_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule RealtimeWeb.Plugs.AssignTenant do

def call(%Plug.Conn{host: host} = conn, _opts) do
with {:ok, external_id} <- Database.get_external_id(host),
%Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id) do
%Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id, use_replica?: true) do
Logger.metadata(external_id: external_id, project: external_id)
OpenTelemetry.Tracer.set_attributes(external_id: external_id)

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.66.1",
version: "2.66.2",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
32 changes: 16 additions & 16 deletions test/integration/region_aware_routing_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do

Mimic.expect(Realtime.GenRpc, :call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :insert
assert mod == Realtime.Api
assert func == :create_tenant
assert opts[:tenant_id] == external_id

call_original(GenRpc, :call, [node, mod, func, args, opts])
Expand Down Expand Up @@ -80,16 +80,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
Realtime.GenRpc
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :insert
assert mod == Realtime.Api
assert func == :create_tenant
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
end)
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :update
assert mod == Realtime.Api
assert func == :update_tenant_by_external_id
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
Expand All @@ -98,7 +98,7 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
tenant = tenant_fixture(tenant_attrs)

new_name = "updated_via_routing"
result = Api.update_tenant(tenant, %{name: new_name})
result = Api.update_tenant_by_external_id(tenant.external_id, %{name: new_name})

assert {:ok, %Tenant{} = updated} = result
assert updated.name == new_name
Expand All @@ -123,16 +123,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
Realtime.GenRpc
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :insert
assert mod == Realtime.Api
assert func == :create_tenant
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
end)
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :delete_all
assert mod == Realtime.Api
assert func == :delete_tenant_by_external_id
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
Expand Down Expand Up @@ -164,16 +164,16 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
Realtime.GenRpc
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :insert
assert mod == Realtime.Api
assert func == :create_tenant
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
end)
|> Mimic.expect(:call, fn node, mod, func, args, opts ->
assert node == master_node
assert mod == Realtime.Repo
assert func == :update!
assert mod == Realtime.Api
assert func == :update_migrations_ran
assert opts[:tenant_id] == tenant_attrs["external_id"]

call_original(GenRpc, :call, [node, mod, func, args, opts])
Expand All @@ -184,7 +184,7 @@ defmodule Realtime.Integration.RegionAwareRoutingTest do
new_migrations_ran = 5
result = Api.update_migrations_ran(tenant.external_id, new_migrations_ran)

assert %Tenant{} = updated = result
assert {:ok, updated} = result
assert updated.migrations_ran == new_migrations_ran

reloaded = Realtime.Repo.get(Tenant, tenant.id)
Expand Down
Loading
Loading