Skip to content

Commit

Permalink
Set read_preference to primaryPreferred when topology single with rep…
Browse files Browse the repository at this point in the history
…lica set. (#266)

* Set read preference as primaryPreferred when topology is single and is replica.

* Put missing replica? in ServerDescription.parse_hello_response/1 and add test.

* update code format.
  • Loading branch information
ColaCheng authored Feb 6, 2025
1 parent 9a71d4f commit db726e9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 26 deletions.
3 changes: 3 additions & 0 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,7 @@ defmodule Mongo.ReadPreference do

filter_nils(read_preference)
end

def to_topology_single_type({_, %{replica?: true} = _server_description}), do: %{mode: :primaryPreferred}
def to_topology_single_type(_), do: nil
end
16 changes: 12 additions & 4 deletions lib/mongo/server_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ defmodule Mongo.ServerDescription do
compression: [compressor_types],
read_only: boolean(),
logical_session_timeout: non_neg_integer,
supports_retryable_writes: boolean()
supports_retryable_writes: boolean(),
replica?: boolean()
}

@empty %{
Expand Down Expand Up @@ -69,7 +70,8 @@ defmodule Mongo.ServerDescription do
compression: [],
read_only: false,
logical_session_timeout: 30,
support_retryable_writes: false
support_retryable_writes: false,
replica?: false
}

def new() do
Expand Down Expand Up @@ -115,7 +117,8 @@ defmodule Mongo.ServerDescription do
compression: map_compressors(hello_response["compression"]),
read_only: hello_response["readOnly"] || false,
logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30,
supports_retryable_writes: supports_retryable_writes
supports_retryable_writes: supports_retryable_writes,
replica?: replica?(server_type)
}
end

Expand Down Expand Up @@ -147,7 +150,8 @@ defmodule Mongo.ServerDescription do
compression: map_compressors(hello_response["compression"]),
read_only: hello_response["readOnly"] || false,
logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30,
supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil
supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil,
replica?: replica?(server_type)
}
end

Expand Down Expand Up @@ -187,4 +191,8 @@ defmodule Mongo.ServerDescription do
[:zlib]
end
end

defp replica?(server_type) do
server_type in [:rs_primary, :rs_secondary, :rs_arbiter, :rs_other, :rs_ghost]
end
end
29 changes: 16 additions & 13 deletions lib/mongo/topology_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,20 @@ defmodule Mongo.TopologyDescription do
|> Keyword.get(:read_preference)
|> ReadPreference.merge_defaults()

{servers, read_prefs} =
{server, read_prefs} =
case topology.type do
:unknown ->
{[], nil}
{nil, nil}

:single ->
{topology.servers, nil}
server = pick_server(topology.servers)
{server, ReadPreference.to_topology_single_type(server)}

:sharded ->
{mongos_servers(topology), ReadPreference.to_mongos(read_preference)}
{topology |> mongos_servers() |> pick_server(), ReadPreference.to_mongos(read_preference)}

_other ->
{select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.to_replica_set(read_preference)}
{topology |> select_replica_set_server(read_preference.mode, read_preference) |> pick_server(), ReadPreference.to_replica_set(read_preference)}
end

opts =
Expand All @@ -147,17 +148,12 @@ defmodule Mongo.TopologyDescription do
Keyword.put(opts, :read_preference, prefs)
end

server =
servers
|> Enum.take_random(1)
|> Enum.map(fn {server, description} -> {server, description.compression} end)

case server do
[] ->
nil ->
:empty

[{addr, compression}] ->
{:ok, {addr, merge_compression(opts, compression)}}
{addr, server_description} ->
{:ok, {addr, merge_compression(opts, server_description.compression)}}
end
end

Expand All @@ -182,6 +178,13 @@ defmodule Mongo.TopologyDescription do
end
end

defp pick_server(servers) do
case Enum.take_random(servers, 1) do
[] -> nil
[server] -> server
end
end

defp mongos_servers(%{:servers => servers}) do
Enum.filter(servers, fn {_, server} -> server.type == :mongos end)
end
Expand Down
8 changes: 8 additions & 0 deletions test/mongo/topology_description_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ defmodule Mongo.TopologyDescriptionTest do

assert :single = TopologyDescription.get_type(opts)
end

test "Set read_preference to :primaryPreferred when topology is single and server is replica set" do
assert {:ok, {_, opts}} = TopologyDescription.select_servers(single(), :read, [])
assert nil == Keyword.get(opts, :read_preference)

assert {:ok, {_, opts}} = TopologyDescription.select_servers(single_with_repl_set(), :read, [])
assert :primaryPreferred = Keyword.get(opts, :read_preference) |> Map.get(:mode)
end
end
62 changes: 53 additions & 9 deletions test/support/topology_test_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,43 @@ defmodule Mongo.TopologyTestData do
set_version: nil,
tag_set: %{},
type: :standalone,
compression: []
compression: [],
replica?: false
}
}
}

def single_with_repl_set(),
do: %{
set_name: nil,
type: :single,
compatibility_error: nil,
compatible: true,
local_threshold_ms: 15,
max_election_id: nil,
max_set_version: nil,
servers: %{
"localhost:27017" => %{
address: "localhost:27017",
arbiters: [],
election_id: nil,
error: nil,
hosts: [],
last_update_time: nil,
last_write_date: nil,
max_wire_version: 4,
me: nil,
min_wire_version: 0,
op_time: nil,
passives: [],
primary: nil,
round_trip_time: 44,
set_name: nil,
set_version: nil,
tag_set: %{},
type: :standalone,
compression: [],
replica?: true
}
}
}
Expand Down Expand Up @@ -64,7 +100,8 @@ defmodule Mongo.TopologyTestData do
set_version: nil,
tag_set: %{},
type: :mongos,
compression: []
compression: [],
replica?: false
}
}
}
Expand Down Expand Up @@ -102,7 +139,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
},
"localhost:27019" => %{
address: "localhost:27019",
Expand All @@ -127,7 +165,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
},
"localhost:27020" => %{
address: "localhost:27020",
Expand All @@ -152,7 +191,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27018",
"localhost:27019",
"localhost:27020"
]
],
replica?: true
}
}
}
Expand Down Expand Up @@ -186,7 +226,8 @@ defmodule Mongo.TopologyTestData do
tag_set: %{},
type: :unknown,
hosts: [],
compression: []
compression: [],
replica?: true
},
"localhost:27019" => %{
address: "localhost:27019",
Expand All @@ -211,7 +252,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27019",
"localhost:27020"
],
compression: []
compression: [],
replica?: true
},
"localhost:27020" => %{
address: "localhost:27020",
Expand All @@ -236,7 +278,8 @@ defmodule Mongo.TopologyTestData do
"localhost:27019",
"localhost:27020"
],
compression: []
compression: [],
replica?: true
}
}
}
Expand Down Expand Up @@ -270,7 +313,8 @@ defmodule Mongo.TopologyTestData do
tag_set: %{},
type: :rs_primary,
hosts: ["localhost:27018"],
compression: []
compression: [],
replica?: true
}
}
}
Expand Down

0 comments on commit db726e9

Please sign in to comment.