@@ -115,9 +115,13 @@ defmodule Realtime.Api do
115115 Logger . debug ( "create_tenant #{ inspect ( attrs , pretty: true ) } " )
116116 tenant_id = Map . get ( attrs , :external_id ) || Map . get ( attrs , "external_id" )
117117
118- % Tenant { }
119- |> Tenant . changeset ( attrs )
120- |> region_aware_write ( :insert , tenant_id )
118+ if master_region? ( ) do
119+ % Tenant { }
120+ |> Tenant . changeset ( attrs )
121+ |> Repo . insert ( )
122+ else
123+ call ( :create_tenant , [ attrs ] , tenant_id )
124+ end
121125 end
122126
123127 @ doc """
@@ -132,9 +136,17 @@ defmodule Realtime.Api do
132136 {:error, %Ecto.Changeset{}}
133137
134138 """
135- def update_tenant ( % Tenant { } = tenant , attrs ) do
139+ def update_tenant ( tenant_id , attrs ) when is_binary ( tenant_id ) do
140+ replica = Replica . replica ( )
141+ tenant = replica . get_by ( Tenant , external_id: tenant_id )
142+ update_tenant ( tenant , attrs )
143+ end
144+
145+ def update_tenant ( % Tenant { external_id: external_id } = tenant , attrs ) do
136146 changeset = Tenant . changeset ( tenant , attrs )
137- updated = region_aware_write ( changeset , :update , tenant . external_id )
147+
148+ updated =
149+ if master_region? ( ) , do: Repo . update ( changeset ) , else: call ( :update_tenant , [ external_id , attrs ] , external_id )
138150
139151 case updated do
140152 { :ok , tenant } ->
@@ -150,38 +162,23 @@ defmodule Realtime.Api do
150162 updated
151163 end
152164
153- @ doc """
154- Deletes a tenant.
155-
156- ## Examples
157-
158- iex> delete_tenant(tenant)
159- {:ok, %Tenant{}}
160-
161- iex> delete_tenant(tenant)
162- {:error, %Ecto.Changeset{}}
163-
164- """
165- def delete_tenant ( % Tenant { } = tenant ) , do: Repo . delete ( tenant )
166-
167165 @ spec delete_tenant_by_external_id ( String . t ( ) ) :: boolean ( )
168166 def delete_tenant_by_external_id ( id ) do
169- from ( t in Tenant , where: t . external_id == ^ id )
170- |> region_aware_write ( :delete_all , id )
171- |> case do
172- { num , _ } when num > 0 -> true
173- _ -> false
167+ if master_region? ( ) do
168+ from ( t in Tenant , where: t . external_id == ^ id )
169+ |> Repo . delete_all ( )
170+ |> case do
171+ { num , _ } when num > 0 -> true
172+ _ -> false
173+ end
174+ else
175+ call ( :delete_tenant_by_external_id , [ id ] , id )
174176 end
175177 end
176178
177- @ spec get_tenant_by_external_id ( String . t ( ) , atom ( ) ) :: Tenant . t ( ) | nil
178- def get_tenant_by_external_id ( external_id , repo \\ :replica )
179- when repo in [ :primary , :replica ] do
180- repo =
181- case repo do
182- :primary -> Repo
183- :replica -> Replica . replica ( )
184- end
179+ @ spec get_tenant_by_external_id ( String . t ( ) ) :: Tenant . t ( ) | nil
180+ def get_tenant_by_external_id ( external_id ) do
181+ repo = Replica . replica ( )
185182
186183 Tenant
187184 |> repo . get_by ( external_id: external_id )
@@ -195,26 +192,36 @@ defmodule Realtime.Api do
195192 end
196193
197194 def rename_settings_field ( from , to ) do
198- for extension <- list_extensions ( "postgres_cdc_rls" ) do
199- { value , settings } = Map . pop ( extension . settings , from )
200- new_settings = Map . put ( settings , to , value )
201-
202- extension
203- |> Changeset . cast ( % { settings: new_settings } , [ :settings ] )
204- |> region_aware_write ( :update! , extension . external_id )
195+ if master_region? ( ) do
196+ for extension <- list_extensions ( "postgres_cdc_rls" ) do
197+ { value , settings } = Map . pop ( extension . settings , from )
198+ new_settings = Map . put ( settings , to , value )
199+
200+ extension
201+ |> Changeset . cast ( % { settings: new_settings } , [ :settings ] )
202+ |> Repo . update ( )
203+ end
204+ else
205+ call ( :rename_settings_field , [ from , to ] , from )
205206 end
206207 end
207208
209+ @ spec preload_counters ( nil | Realtime.Api.Tenant . t ( ) , any ( ) ) :: nil | Realtime.Api.Tenant . t ( )
208210 @ doc """
209211 Updates the migrations_ran field for a tenant.
210212 """
211213 @ spec update_migrations_ran ( binary ( ) , integer ( ) ) :: { :ok , Tenant . t ( ) } | { :error , term ( ) }
212214 def update_migrations_ran ( external_id , count ) do
213- external_id
214- |> Cache . get_tenant_by_external_id ( )
215- |> Tenant . changeset ( % { migrations_ran: count } )
216- |> region_aware_write ( :update! , external_id )
217- |> tap ( fn _ -> Cache . distributed_invalidate_tenant_cache ( external_id ) end )
215+ if master_region? ( ) do
216+ tenant = Cache . get_tenant_by_external_id ( external_id )
217+
218+ tenant
219+ |> Tenant . changeset ( % { migrations_ran: count } )
220+ |> Repo . update ( )
221+ |> tap ( fn _ -> Cache . distributed_invalidate_tenant_cache ( external_id ) end )
222+ else
223+ call ( :update_migrations_ran , [ external_id , count ] , external_id )
224+ end
218225 end
219226
220227 def preload_counters ( nil ) , do: nil
@@ -257,21 +264,13 @@ defmodule Realtime.Api do
257264
258265 defp maybe_restart_db_connection ( _changeset ) , do: nil
259266
260- defp local_call ? do
267+ defp master_region ? do
261268 region = Application . get_env ( :realtime , :region )
262269 master_region = Application . get_env ( :realtime , :master_region ) || region
263270 region == master_region
264271 end
265272
266- defp region_aware_write ( % struct { } = argument , operation , tenant_id ) when struct in [ Changeset , Ecto.Query ] do
267- if local_call? ( ) ,
268- do: local_call ( operation , [ argument ] , tenant_id ) ,
269- else: remote_call ( operation , [ argument ] , tenant_id )
270- end
271-
272- defp local_call ( operation , args , _tenant_id ) , do: apply ( Realtime.Repo , operation , args )
273-
274- defp remote_call ( operation , args , tenant_id ) do
273+ defp call ( operation , args , tenant_id ) do
275274 master_region = Application . get_env ( :realtime , :master_region )
276275
277276 with { :ok , master_node } <- Nodes . node_from_region ( master_region , self ( ) ) ,
@@ -281,7 +280,7 @@ defmodule Realtime.Api do
281280 end
282281
283282 defp wrapped_call ( master_node , operation , args , tenant_id ) do
284- case GenRpc . call ( master_node , Realtime.Repo , operation , args , tenant_id: tenant_id ) do
283+ case GenRpc . call ( master_node , __MODULE__ , operation , args , tenant_id: tenant_id ) do
285284 { :error , :rpc_error , reason } -> { :error , reason }
286285 { :error , reason } -> { :error , reason }
287286 result -> { :ok , result }
0 commit comments