diff --git a/src/common/client.py b/src/common/client.py index 9fdbc4b..0f36380 100644 --- a/src/common/client.py +++ b/src/common/client.py @@ -415,3 +415,21 @@ def sentinels_primary(self, hostname: str) -> list[dict[str, str]]: return self.exec_cli_command( command=["sentinel", "sentinels", PRIMARY_NAME], hostname=hostname ) + + def set(self, hostname: str, *args: str) -> bool: + """Set a sentinel configuration parameter through the CLI. + + Args: + hostname (str): The hostname to connect to. + *args (str): The sentinel configuration parameters to set, as a variable list of strings. + + Returns: + bool: True if the command executed successfully, False otherwise. + """ + return ( + self.exec_cli_command( + command=["sentinel", "set"] + list(args), + hostname=hostname, + ) + == "OK" + ) diff --git a/src/events/base_events.py b/src/events/base_events.py index e63e6c2..1cb7130 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -76,6 +76,9 @@ def __init__(self, charm: "ValkeyCharm"): self.framework.observe( self.charm.on[PEER_RELATION].relation_changed, self._on_peer_relation_changed ) + self.framework.observe( + self.charm.on[PEER_RELATION].relation_departed, self._on_peer_relation_departed + ) self.framework.observe(self.charm.on.update_status, self._on_update_status) self.framework.observe(self.charm.on.leader_elected, self._on_leader_elected) self.framework.observe(self.charm.on.config_changed, self._on_config_changed) @@ -227,12 +230,18 @@ def _on_unit_fully_started(self, event: UnitFullyStarted) -> None: def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None: """Handle event received by all units when a unit's relation data changes.""" + self._reconfigure_quorum_if_necessary() + if not self.charm.unit.is_leader(): return for lock in [StartLock(self.charm.state)]: lock.process() + def _on_peer_relation_departed(self, event: ops.RelationDepartedEvent) -> None: + """Handle event received by all units when a unit departs.""" + self._reconfigure_quorum_if_necessary() + def _on_update_status(self, event: ops.UpdateStatusEvent) -> None: """Handle the update-status event.""" if not self.charm.state.unit_server.is_started: @@ -524,3 +533,24 @@ def _set_state_for_going_away(self) -> None: ) self.charm.state.unit_server.update({"scale_down_state": ScaleDownState.GOING_AWAY}) + + def _reconfigure_quorum_if_necessary(self) -> None: + """Reconfigure the sentinel quorum if it does not match the current cluster size.""" + # if the unit / all units are being removed, we do not need to reconfigure the quorum + if ( + not self.charm.state.unit_server.is_active + or self.charm.state.unit_server.is_being_removed + or self.model.app.planned_units() == 0 + ): + return + + if self.charm.sentinel_manager.get_configured_quorum() != self.charm.config_manager.quorum: + logger.debug("Updating sentinel quorum to match current cluster size") + try: + self.charm.sentinel_manager.set_quorum(self.charm.config_manager.quorum) + self.charm.config_manager.set_sentinel_config_properties( + self.charm.sentinel_manager.get_primary_ip() + ) + except ValkeyWorkloadCommandError as e: + logger.error(f"Failed to update sentinel quorum: {e}") + # not critical to defer here, we can wait for the next relation change or config change to try again diff --git a/src/managers/config.py b/src/managers/config.py index 9290c1d..c68508a 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -21,7 +21,6 @@ CHARM_USERS_ROLE_MAP, CLIENT_PORT, PRIMARY_NAME, - QUORUM_NUMBER, SENTINEL_PORT, SENTINEL_TLS_PORT, TLS_PORT, @@ -256,10 +255,7 @@ def get_sentinel_config_properties( def _generate_sentinel_configs(self, primary_endpoint: str) -> dict[str, str]: """Generate the sentinel config properties based on the current cluster state.""" sentinel_configs = {} - # TODO consider adding quorum calculation based on number of planned_units and the parity of the number of units - sentinel_configs["monitor"] = ( - f"{PRIMARY_NAME} {primary_endpoint} {TLS_PORT} {QUORUM_NUMBER}" - ) + sentinel_configs["monitor"] = f"{PRIMARY_NAME} {primary_endpoint} {TLS_PORT} {self.quorum}" # auth settings # auth-user is used by sentinel to authenticate to the valkey primary sentinel_configs["auth-user"] = f"{PRIMARY_NAME} {CharmUsers.VALKEY_SENTINEL.value}" @@ -363,6 +359,12 @@ def configure_services(self, primary_endpoint: str) -> None: ) raise ValkeyConfigurationError("Failed to set configuration") from e + @property + def quorum(self) -> int: + """Calculate the quorum based on the number of units in the cluster.""" + num_units = len([server for server in self.state.servers if server.is_active]) + return (num_units // 2) + 1 + def get_statuses(self, scope: Scope, recompute: bool = False) -> list[StatusObject]: """Compute the config manager's statuses.""" status_list: list[StatusObject] = [] diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 41cfe41..b0803db 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -22,7 +22,7 @@ ) from core.base_workload import WorkloadBase from core.cluster_state import ClusterState -from literals import CharmUsers +from literals import PRIMARY_NAME, CharmUsers from statuses import CharmStatuses logger = logging.getLogger(__name__) @@ -316,3 +316,13 @@ def get_primary_ip_for_scale_down(self) -> str: 40s covers both substrates """ return self.get_primary_ip() + + def get_configured_quorum(self) -> int: + """Get the currently configured quorum for the sentinel cluster.""" + client = self._get_sentinel_client() + return int(client.primary(self.state.bind_address)["quorum"]) + + def set_quorum(self, quorum: int) -> None: + """Set the quorum for the sentinel cluster.""" + client = self._get_sentinel_client() + client.set(self.state.bind_address, PRIMARY_NAME, "quorum", str(quorum)) diff --git a/tests/integration/continuous_writes.py b/tests/integration/continuous_writes.py index 3cc44bc..f267878 100644 --- a/tests/integration/continuous_writes.py +++ b/tests/integration/continuous_writes.py @@ -89,7 +89,7 @@ async def _create_glide_client(self, config: Optional[SimpleNamespace] = None) - glide_config = GlideClientConfiguration( addresses=addresses, client_name="continuous_writes_client", - request_timeout=500, + request_timeout=1000, credentials=credentials, reconnect_strategy=BackoffStrategy(num_of_retries=1, factor=50, exponent_base=2), ) @@ -253,7 +253,7 @@ async def _make_client(conf: SimpleNamespace) -> GlideClient: glide_config = GlideClientConfiguration( addresses=addresses, client_name="continuous_writes_worker", - request_timeout=500, + request_timeout=1000, credentials=credentials, reconnect_strategy=BackoffStrategy(num_of_retries=1, factor=50, exponent_base=2), ) diff --git a/tests/integration/ha/test_scaling.py b/tests/integration/ha/test_scaling.py index 4121e96..cd6a70d 100644 --- a/tests/integration/ha/test_scaling.py +++ b/tests/integration/ha/test_scaling.py @@ -21,6 +21,7 @@ get_number_connected_replicas, get_password, get_primary_ip, + get_quorum, remove_number_units, seed_valkey, ) @@ -58,6 +59,15 @@ async def test_seed_data(juju: jubilant.Juju) -> None: await seed_valkey(juju, target_gb=1) +async def test_check_quorum(juju: jubilant.Juju) -> None: + """Check quorum value.""" + app_name = existing_app(juju) or APP_NAME + init_units_count = len(juju.status().apps[app_name].units) + assert get_quorum(juju, f"{app_name}/0") == (init_units_count // 2) + 1, ( + "Unexpected quorum value after initial deploy" + ) + + async def test_scale_up(juju: jubilant.Juju, c_writes) -> None: """Make sure new units are added to the valkey downtime.""" app_name = existing_app(juju) or APP_NAME @@ -78,6 +88,9 @@ async def test_scale_up(juju: jubilant.Juju, c_writes) -> None: f"Expected {init_units_count + 2} units, got {num_units}." ) + for unit in juju.status().apps[app_name].units: + assert get_quorum(juju, unit) == (num_units // 2) + 1 + # check if all units have been added to the cluster hostnames = get_cluster_hostnames(juju, app_name) @@ -145,6 +158,9 @@ async def test_scale_down_one_unit(juju: jubilant.Juju, substrate: Substrate, c_ f"Expected {init_units_count - 1} units, got {num_units}." ) + for unit in juju.status().apps[app_name].units: + assert get_quorum(juju, unit) == (num_units // 2) + 1 + number_of_replicas = await get_number_connected_replicas( hostnames=get_cluster_hostnames(juju, app_name), username=CharmUsers.VALKEY_ADMIN.value, @@ -225,6 +241,9 @@ async def test_scale_down_multiple_units( f"Expected {init_units_count - 3} connected replicas, got {number_of_replicas}." ) + for unit in juju.status().apps[app_name].units: + assert get_quorum(juju, unit) == (num_units // 2) + 1 + c_writes.update() await assert_continuous_writes_increasing( diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 4b3bf70..e6decb2 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -33,6 +33,7 @@ INTERNAL_USERS_PASSWORD_CONFIG, INTERNAL_USERS_SECRET_LABEL_SUFFIX, PEER_RELATION, + SENTINEL_PORT, TLS_PORT, CharmUsers, Substrate, @@ -467,10 +468,15 @@ async def seed_valkey(juju: jubilant.Juju, target_gb: float = 1.0) -> None: def exec_valkey_cli( - hostname: str, username: str, password: str, command: str + hostname: str, + username: str, + password: str, + command: str, + port: int = CLIENT_PORT, + json: bool = False, ) -> valkey_cli_result: """Execute a Valkey CLI command and returns the output as a string.""" - command = f"valkey-cli --no-auth-warning -h {hostname} -p {CLIENT_PORT} --user {username} --pass {password} {command}" + command = f"valkey-cli --no-auth-warning -h {hostname} -p {port} --user {username} --pass {password} {'--json' if json else ''} {command}" result = subprocess.run( command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) @@ -479,6 +485,27 @@ def exec_valkey_cli( ) +def get_quorum(juju: jubilant.Juju, unit_name: str) -> int: + """Get the currently configured sentinel quorum.""" + status = juju.status() + model_info = juju.show_model() + units = status.get_units(APP_NAME) + unit_endpoint = ( + units[unit_name].public_address + if model_info.type != "kubernetes" + else units[unit_name].address + ) + result = exec_valkey_cli( + hostname=unit_endpoint, + username=CharmUsers.SENTINEL_CHARM_ADMIN.value, + password=get_password(juju, user=CharmUsers.SENTINEL_CHARM_ADMIN), + command="SENTINEL primary primary", + port=SENTINEL_PORT, + json=True, + ) + return int(json.loads(result.stdout)["quorum"]) + + async def set_key( hostnames: list[str], username: str, diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a89ab1e..5f27728 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -84,7 +84,7 @@ async def test_update_admin_password(juju: jubilant.Juju) -> None: # wait for config-changed hook to finish executing juju.wait( - lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=10), + lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=30), timeout=1200, ) @@ -164,7 +164,7 @@ async def test_update_admin_password_wrong_username(juju: jubilant.Juju) -> None set_password(juju, username=CharmUsers.VALKEY_ADMIN.value, password=new_password) # wait for config-changed hook to finish executing juju.wait( - lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=10), + lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=30), timeout=1200, ) @@ -224,7 +224,7 @@ async def test_user_secret_permissions(juju: jubilant.Juju) -> None: with fast_forward(juju): juju.grant_secret(identifier=secret_name, app=APP_NAME) juju.wait( - lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=10), + lambda status: are_apps_active_and_agents_idle(status, APP_NAME, idle_period=30), timeout=1200, ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index f83aa99..5e18ef6 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -13,6 +13,7 @@ INTERNAL_USERS_PASSWORD_CONFIG, INTERNAL_USERS_SECRET_LABEL_SUFFIX, PEER_RELATION, + PRIMARY_NAME, STATUS_PEERS_RELATION, CharmUsers, StartState, @@ -344,7 +345,10 @@ def test_update_status_leader_unit(cloud_spec): containers={container}, ) - with patch("managers.tls.TLSManager.will_certificate_expire"): + with ( + patch("managers.tls.TLSManager.will_certificate_expire"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + ): state_out = ctx.run(ctx.on.update_status(), state_in) assert state_out.unit_status == ActiveStatus() @@ -363,7 +367,10 @@ def test_update_status_non_leader_unit(cloud_spec): relations={relation, status_peer_relation}, containers={container}, ) - with patch("managers.tls.TLSManager.will_certificate_expire"): + with ( + patch("managers.tls.TLSManager.will_certificate_expire"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + ): state_out = ctx.run(ctx.on.update_status(), state_in) assert state_out.unit_status == ActiveStatus() @@ -696,7 +703,10 @@ def test_relation_changed_event_leader_setting_starting_member(cloud_spec): containers={container}, model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), ) - with patch("managers.tls.TLSManager.will_certificate_expire"): + with ( + patch("managers.tls.TLSManager.will_certificate_expire"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + ): state_out = ctx.run(ctx.on.relation_changed(relation), state_in) assert state_out.get_relation(1).local_app_data.get("start-member") == "valkey/1" @@ -718,7 +728,10 @@ def test_relation_changed_event_leader_clears_starting_member(cloud_spec): containers={container}, model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), ) - with patch("managers.tls.TLSManager.will_certificate_expire"): + with ( + patch("managers.tls.TLSManager.will_certificate_expire"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "2"}), + ): state_out = ctx.run(ctx.on.relation_changed(relation), state_in) assert state_out.get_relation(1).local_app_data.get("start-member") is None @@ -745,6 +758,63 @@ def test_relation_changed_event_leader_leaves_starting_member_as_is(cloud_spec): containers={container}, model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), ) - with patch("managers.tls.TLSManager.will_certificate_expire"): + with ( + patch("managers.tls.TLSManager.will_certificate_expire"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + ): state_out = ctx.run(ctx.on.relation_changed(relation), state_in) assert state_out.get_relation(1).local_app_data.get("start-member") == "valkey/1" + + +def test_relation_changed_event_update_quorum(cloud_spec): + ctx = testing.Context(ValkeyCharm, app_trusted=True) + relation = testing.PeerRelation( + id=1, + endpoint=PEER_RELATION, + local_app_data={"start-member": "valkey/1"}, + local_unit_data={"start-state": StartState.STARTED.value}, + peers_data={1: {"start-state": StartState.STARTED.value}}, + ) + container = testing.Container(name=CONTAINER, can_connect=True) + + state_in = testing.State( + leader=True, + relations={relation}, + containers={container}, + model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), + ) + with ( + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + patch("common.client.SentinelClient.set") as mock_set, + patch("managers.sentinel.SentinelManager.get_primary_ip", return_value="127.1.0.1"), + ): + ctx.run(ctx.on.relation_changed(relation), state_in) + mock_set.assert_called_once_with("127.1.1.1", PRIMARY_NAME, "quorum", "2") + + +def test_relation_changed_event_do_not_update_quorum(cloud_spec): + ctx = testing.Context(ValkeyCharm, app_trusted=True) + relation = testing.PeerRelation( + id=1, + endpoint=PEER_RELATION, + local_app_data={"start-member": "valkey/1"}, + local_unit_data={"start-state": StartState.STARTED.value}, + peers_data={ + 1: {"start-state": StartState.STARTED.value}, + 2: {"start-state": StartState.STARTED.value}, + }, + ) + container = testing.Container(name=CONTAINER, can_connect=True) + + state_in = testing.State( + leader=True, + relations={relation}, + containers={container}, + model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), + ) + with ( + patch("common.client.SentinelClient.primary", return_value={"quorum": "2"}), + patch("common.client.SentinelClient.set") as mock_set, + ): + ctx.run(ctx.on.relation_changed(relation), state_in) + mock_set.assert_not_called() diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index 2f49a08..6b7296d 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -698,6 +698,7 @@ def test_internal_peer_ca_rotation_single_unit(cloud_spec): patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) @@ -739,6 +740,7 @@ def test_internal_peer_ca_rotation_started(cloud_spec): patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) @@ -783,6 +785,7 @@ def test_ca_rotation_not_all_units_added(cloud_spec): ) with ( patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation), state_in) @@ -828,6 +831,7 @@ def test_ca_rotation_all_units_added(cloud_spec): with ( patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation), state_in) @@ -872,6 +876,7 @@ def test_ca_rotation_not_all_units_ca_updated(cloud_spec): ) with ( patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation), state_in) @@ -918,6 +923,7 @@ def test_ca_rotation_all_units_ca_updated(cloud_spec): patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), patch("managers.tls.TLSManager.rehash_ca_certificates"), + patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation), state_in)