@@ -115,9 +115,13 @@ defmodule Realtime.Api do
115115 Logger . debug ( "create_tenant #{ inspect ( attrs , pretty: true ) } " )
116116 tenant_id = Map . get ( attrs , :external_id ) || Map . get ( attrs , "external_id" )
117117
118- % Tenant { }
119- |> Tenant . changeset ( attrs )
120- |> region_aware_write ( :insert , tenant_id )
118+ if master_region? ( ) do
119+ % Tenant { }
120+ |> Tenant . changeset ( attrs )
121+ |> Repo . insert ( )
122+ else
123+ call ( :create_tenant , [ attrs ] , tenant_id )
124+ end
121125 end
122126
123127 @ doc """
@@ -132,9 +136,18 @@ defmodule Realtime.Api do
132136 {:error, %Ecto.Changeset{}}
133137
134138 """
135- def update_tenant ( % Tenant { } = tenant , attrs ) do
139+ @ spec update_tenant ( binary ( ) | Tenant . t ( ) , map ( ) ) :: { :ok , Tenant . t ( ) } | { :error , term ( ) }
140+ def update_tenant ( tenant_id , attrs ) when is_binary ( tenant_id ) do
141+ replica = Replica . replica ( )
142+ tenant = replica . get_by ( Tenant , external_id: tenant_id )
143+ update_tenant ( tenant , attrs )
144+ end
145+
146+ def update_tenant ( % Tenant { external_id: external_id } = tenant , attrs ) do
136147 changeset = Tenant . changeset ( tenant , attrs )
137- updated = region_aware_write ( changeset , :update , tenant . external_id )
148+
149+ updated =
150+ if master_region? ( ) , do: Repo . update ( changeset ) , else: call ( :update_tenant , [ external_id , attrs ] , external_id )
138151
139152 case updated do
140153 { :ok , tenant } ->
@@ -150,38 +163,23 @@ defmodule Realtime.Api do
150163 updated
151164 end
152165
153- @ doc """
154- Deletes a tenant.
155-
156- ## Examples
157-
158- iex> delete_tenant(tenant)
159- {:ok, %Tenant{}}
160-
161- iex> delete_tenant(tenant)
162- {:error, %Ecto.Changeset{}}
163-
164- """
165- def delete_tenant ( % Tenant { } = tenant ) , do: Repo . delete ( tenant )
166-
167166 @ spec delete_tenant_by_external_id ( String . t ( ) ) :: boolean ( )
168167 def delete_tenant_by_external_id ( id ) do
169- from ( t in Tenant , where: t . external_id == ^ id )
170- |> region_aware_write ( :delete_all , id )
171- |> case do
172- { num , _ } when num > 0 -> true
173- _ -> false
168+ if master_region? ( ) do
169+ from ( t in Tenant , where: t . external_id == ^ id )
170+ |> Repo . delete_all ( )
171+ |> case do
172+ { num , _ } when num > 0 -> true
173+ _ -> false
174+ end
175+ else
176+ call ( :delete_tenant_by_external_id , [ id ] , id )
174177 end
175178 end
176179
177- @ spec get_tenant_by_external_id ( String . t ( ) , atom ( ) ) :: Tenant . t ( ) | nil
178- def get_tenant_by_external_id ( external_id , repo \\ :replica )
179- when repo in [ :primary , :replica ] do
180- repo =
181- case repo do
182- :primary -> Repo
183- :replica -> Replica . replica ( )
184- end
180+ @ spec get_tenant_by_external_id ( String . t ( ) ) :: Tenant . t ( ) | nil
181+ def get_tenant_by_external_id ( external_id ) do
182+ repo = Replica . replica ( )
185183
186184 Tenant
187185 |> repo . get_by ( external_id: external_id )
@@ -195,26 +193,36 @@ defmodule Realtime.Api do
195193 end
196194
197195 def rename_settings_field ( from , to ) do
198- for extension <- list_extensions ( "postgres_cdc_rls" ) do
199- { value , settings } = Map . pop ( extension . settings , from )
200- new_settings = Map . put ( settings , to , value )
201-
202- extension
203- |> Changeset . cast ( % { settings: new_settings } , [ :settings ] )
204- |> region_aware_write ( :update! , extension . external_id )
196+ if master_region? ( ) do
197+ for extension <- list_extensions ( "postgres_cdc_rls" ) do
198+ { value , settings } = Map . pop ( extension . settings , from )
199+ new_settings = Map . put ( settings , to , value )
200+
201+ extension
202+ |> Changeset . cast ( % { settings: new_settings } , [ :settings ] )
203+ |> Repo . update ( )
204+ end
205+ else
206+ call ( :rename_settings_field , [ from , to ] , from )
205207 end
206208 end
207209
210+ @ spec preload_counters ( nil | Realtime.Api.Tenant . t ( ) , any ( ) ) :: nil | Realtime.Api.Tenant . t ( )
208211 @ doc """
209212 Updates the migrations_ran field for a tenant.
210213 """
211214 @ spec update_migrations_ran ( binary ( ) , integer ( ) ) :: { :ok , Tenant . t ( ) } | { :error , term ( ) }
212215 def update_migrations_ran ( external_id , count ) do
213- external_id
214- |> Cache . get_tenant_by_external_id ( )
215- |> Tenant . changeset ( % { migrations_ran: count } )
216- |> region_aware_write ( :update! , external_id )
217- |> tap ( fn _ -> Cache . distributed_invalidate_tenant_cache ( external_id ) end )
216+ if master_region? ( ) do
217+ tenant = Cache . get_tenant_by_external_id ( external_id )
218+
219+ tenant
220+ |> Tenant . changeset ( % { migrations_ran: count } )
221+ |> Repo . update ( )
222+ |> tap ( fn _ -> Cache . distributed_invalidate_tenant_cache ( external_id ) end )
223+ else
224+ call ( :update_migrations_ran , [ external_id , count ] , external_id )
225+ end
218226 end
219227
220228 def preload_counters ( nil ) , do: nil
@@ -257,21 +265,13 @@ defmodule Realtime.Api do
257265
258266 defp maybe_restart_db_connection ( _changeset ) , do: nil
259267
260- defp local_call ? do
268+ defp master_region ? do
261269 region = Application . get_env ( :realtime , :region )
262270 master_region = Application . get_env ( :realtime , :master_region ) || region
263271 region == master_region
264272 end
265273
266- defp region_aware_write ( % struct { } = argument , operation , tenant_id ) when struct in [ Changeset , Ecto.Query ] do
267- if local_call? ( ) ,
268- do: local_call ( operation , [ argument ] , tenant_id ) ,
269- else: remote_call ( operation , [ argument ] , tenant_id )
270- end
271-
272- defp local_call ( operation , args , _tenant_id ) , do: apply ( Realtime.Repo , operation , args )
273-
274- defp remote_call ( operation , args , tenant_id ) do
274+ defp call ( operation , args , tenant_id ) do
275275 master_region = Application . get_env ( :realtime , :master_region )
276276
277277 with { :ok , master_node } <- Nodes . node_from_region ( master_region , self ( ) ) ,
@@ -281,7 +281,7 @@ defmodule Realtime.Api do
281281 end
282282
283283 defp wrapped_call ( master_node , operation , args , tenant_id ) do
284- case GenRpc . call ( master_node , Realtime.Repo , operation , args , tenant_id: tenant_id ) do
284+ case GenRpc . call ( master_node , __MODULE__ , operation , args , tenant_id: tenant_id ) do
285285 { :error , :rpc_error , reason } -> { :error , reason }
286286 { :error , reason } -> { :error , reason }
287287 result -> { :ok , result }
0 commit comments