diff --git a/framework/py/flwr/common/exit/exit.py b/framework/py/flwr/common/exit/exit.py index 106911a6e82a..f5550071ba5d 100644 --- a/framework/py/flwr/common/exit/exit.py +++ b/framework/py/flwr/common/exit/exit.py @@ -126,6 +126,10 @@ def _try_obtain_telemetry_event() -> EventType | None: return EventType.RUN_SUPERNODE_LEAVE if sys.argv[0].endswith("flwr-agentapp"): return EventType.FLWR_AGENTAPP_RUN_LEAVE + if sys.argv[0].endswith("flwr-model"): + return EventType.FLWR_MODEL_RUN_LEAVE + if sys.argv[0].endswith("flwr-connector"): + return EventType.FLWR_CONNECTOR_RUN_LEAVE if sys.argv[0].endswith("flwr-serverapp"): return EventType.FLWR_SERVERAPP_RUN_LEAVE if sys.argv[0].endswith("flwr-clientapp"): diff --git a/framework/py/flwr/common/telemetry.py b/framework/py/flwr/common/telemetry.py index 32919911caca..1aa924abe3ab 100644 --- a/framework/py/flwr/common/telemetry.py +++ b/framework/py/flwr/common/telemetry.py @@ -160,6 +160,14 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A FLWR_AGENTAPP_RUN_ENTER = auto() FLWR_AGENTAPP_RUN_LEAVE = auto() + # CLI: flwr-model + FLWR_MODEL_RUN_ENTER = auto() + FLWR_MODEL_RUN_LEAVE = auto() + + # CLI: flwr-connector + FLWR_CONNECTOR_RUN_ENTER = auto() + FLWR_CONNECTOR_RUN_LEAVE = auto() + # --- Simulation Engine ------------------------------------------------------------ # Python API: `run_simulation` diff --git a/framework/py/flwr/supercore/cli/__init__.py b/framework/py/flwr/supercore/cli/__init__.py index 679ec22284b8..a40979ddc116 100644 --- a/framework/py/flwr/supercore/cli/__init__.py +++ b/framework/py/flwr/supercore/cli/__init__.py @@ -17,8 +17,12 @@ from .flower_superexec import flower_superexec from .flwr_agentapp import flwr_agentapp +from .flwr_connector import flwr_connector +from .flwr_model import flwr_model __all__ = [ "flower_superexec", "flwr_agentapp", + "flwr_connector", + "flwr_model", ] diff --git a/framework/py/flwr/supercore/cli/flwr_connector.py b/framework/py/flwr/supercore/cli/flwr_connector.py new file mode 100644 index 000000000000..cfa70f0aa602 --- /dev/null +++ b/framework/py/flwr/supercore/cli/flwr_connector.py @@ -0,0 +1,76 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""`flwr-connector` command.""" + + +import argparse +from logging import DEBUG, INFO +from queue import Queue + +from flwr.common.args import add_args_flwr_app_common +from flwr.common.constant import SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS +from flwr.common.exit import ExitCode, flwr_exit +from flwr.common.logger import log, mirror_output_to_queue, restore_output +from flwr.supercore.executors.run_connector import run_connector + + +def flwr_connector() -> None: + """Run process-isolated Flower ConnectorApp.""" + args = _parse_args_run_flwr_connector().parse_args() + + if not args.insecure: + flwr_exit( + ExitCode.COMMON_TLS_NOT_SUPPORTED, + "`flwr-connector` does not support TLS yet.", + ) + + # Capture stdout/stderr + log_queue: Queue[str | None] = Queue() + mirror_output_to_queue(log_queue) + + log(INFO, "Start `flwr-connector` process") + log( + DEBUG, + "`flwr-connector` will attempt to connect to SuperLink's " + "ServerAppIo API at %s", + args.serverappio_api_address, + ) + run_connector( + serverappio_api_address=args.serverappio_api_address, + log_queue=log_queue, + token=args.token, + certificates=None, + parent_pid=args.parent_pid, + runtime_dependency_install=args.runtime_dependency_install, + ) + + # Restore stdout/stderr + restore_output() + + +def _parse_args_run_flwr_connector() -> argparse.ArgumentParser: + """Parse `flwr-connector` command line arguments.""" + parser = argparse.ArgumentParser( + description="Run a Flower ConnectorApp", + ) + parser.add_argument( + "--serverappio-api-address", + default=SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS, + type=str, + help="Address of SuperLink's ServerAppIo API (IPv4, IPv6, or a domain name)." + f"By default, it is set to {SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS}.", + ) + add_args_flwr_app_common(parser=parser) + return parser diff --git a/framework/py/flwr/supercore/cli/flwr_connector_test.py b/framework/py/flwr/supercore/cli/flwr_connector_test.py new file mode 100644 index 000000000000..8829f1ec3d1f --- /dev/null +++ b/framework/py/flwr/supercore/cli/flwr_connector_test.py @@ -0,0 +1,129 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for ConnectorApp process CLI parsing and wiring.""" + + +import importlib +from types import SimpleNamespace +from unittest.mock import Mock, patch + +import pytest + +from flwr.common.constant import SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS + +from .flwr_connector import _parse_args_run_flwr_connector + +flwr_connector_module = importlib.import_module("flwr.supercore.cli.flwr_connector") + + +def test_parse_flwr_connector_requires_token() -> None: + """The ConnectorApp process CLI should require a token.""" + with pytest.raises(SystemExit): + _parse_args_run_flwr_connector().parse_args([]) + + +def test_parse_flwr_connector_rejects_run_once() -> None: + """The removed deprecated flag should no longer parse.""" + with pytest.raises(SystemExit): + _parse_args_run_flwr_connector().parse_args( + ["--token", "test-token", "--run-once"] + ) + + +def test_parse_flwr_connector_parses_tokenized_invocation() -> None: + """The ConnectorApp process CLI should still parse the supported flags.""" + args = _parse_args_run_flwr_connector().parse_args( + [ + "--token", + "test-token", + "--insecure", + "--parent-pid", + "1234", + "--allow-runtime-dependency-installation", + ] + ) + + assert args.serverappio_api_address == SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS + assert args.token == "test-token" + assert args.insecure is True + assert args.parent_pid == 1234 + assert args.runtime_dependency_install is True + + +def test_flwr_connector_parses_args_before_mirroring_output() -> None: + """Argument parsing should happen before stdout/stderr redirection.""" + + class _Parser: + def parse_args(self) -> SimpleNamespace: + """Raise a parser error before any side effects happen.""" + raise SystemExit(2) + + mirror_output_to_queue = Mock() + + with ( + patch.object(flwr_connector_module, "_parse_args_run_flwr_connector", _Parser), + patch.object( + flwr_connector_module, + "mirror_output_to_queue", + mirror_output_to_queue, + ), + pytest.raises(SystemExit), + ): + flwr_connector_module.flwr_connector() + + mirror_output_to_queue.assert_not_called() + + +def test_flwr_connector_forwards_cli_args() -> None: + """The ConnectorApp CLI should forward parsed args to the runtime.""" + args = SimpleNamespace( + insecure=True, + serverappio_api_address="127.0.0.1:9091", + token="test-token", + parent_pid=321, + runtime_dependency_install=True, + ) + + class _Parser: + def parse_args(self) -> SimpleNamespace: + """Return a fixed namespace for CLI forwarding tests.""" + return args + + mirror_output_to_queue = Mock() + restore_output = Mock() + run_connector = Mock() + + with ( + patch.object(flwr_connector_module, "_parse_args_run_flwr_connector", _Parser), + patch.object( + flwr_connector_module, + "mirror_output_to_queue", + mirror_output_to_queue, + ), + patch.object(flwr_connector_module, "restore_output", restore_output), + patch.object(flwr_connector_module, "run_connector", run_connector), + ): + flwr_connector_module.flwr_connector() + + mirror_output_to_queue.assert_called_once() + restore_output.assert_called_once_with() + run_connector.assert_called_once() + kwargs = run_connector.call_args.kwargs + assert kwargs["serverappio_api_address"] == "127.0.0.1:9091" + assert kwargs["log_queue"] is mirror_output_to_queue.call_args.args[0] + assert kwargs["token"] == "test-token" + assert kwargs["certificates"] is None + assert kwargs["parent_pid"] == 321 + assert kwargs["runtime_dependency_install"] is True diff --git a/framework/py/flwr/supercore/cli/flwr_model.py b/framework/py/flwr/supercore/cli/flwr_model.py new file mode 100644 index 000000000000..d81f1a60a099 --- /dev/null +++ b/framework/py/flwr/supercore/cli/flwr_model.py @@ -0,0 +1,75 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""`flwr-model` command.""" + + +import argparse +from logging import DEBUG, INFO +from queue import Queue + +from flwr.common.args import add_args_flwr_app_common +from flwr.common.constant import SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS +from flwr.common.exit import ExitCode, flwr_exit +from flwr.common.logger import log, mirror_output_to_queue, restore_output +from flwr.supercore.executors.run_model import run_model + + +def flwr_model() -> None: + """Run process-isolated Flower ModelApp.""" + args = _parse_args_run_flwr_model().parse_args() + + if not args.insecure: + flwr_exit( + ExitCode.COMMON_TLS_NOT_SUPPORTED, + "`flwr-model` does not support TLS yet.", + ) + + # Capture stdout/stderr + log_queue: Queue[str | None] = Queue() + mirror_output_to_queue(log_queue) + + log(INFO, "Start `flwr-model` process") + log( + DEBUG, + "`flwr-model` will attempt to connect to SuperLink's ServerAppIo API at %s", + args.serverappio_api_address, + ) + run_model( + serverappio_api_address=args.serverappio_api_address, + log_queue=log_queue, + token=args.token, + certificates=None, + parent_pid=args.parent_pid, + runtime_dependency_install=args.runtime_dependency_install, + ) + + # Restore stdout/stderr + restore_output() + + +def _parse_args_run_flwr_model() -> argparse.ArgumentParser: + """Parse `flwr-model` command line arguments.""" + parser = argparse.ArgumentParser( + description="Run a Flower ModelApp", + ) + parser.add_argument( + "--serverappio-api-address", + default=SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS, + type=str, + help="Address of SuperLink's ServerAppIo API (IPv4, IPv6, or a domain name)." + f"By default, it is set to {SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS}.", + ) + add_args_flwr_app_common(parser=parser) + return parser diff --git a/framework/py/flwr/supercore/cli/flwr_model_test.py b/framework/py/flwr/supercore/cli/flwr_model_test.py new file mode 100644 index 000000000000..9e33a1bc1656 --- /dev/null +++ b/framework/py/flwr/supercore/cli/flwr_model_test.py @@ -0,0 +1,127 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for ModelApp process CLI parsing and wiring.""" + + +import importlib +from types import SimpleNamespace +from unittest.mock import Mock, patch + +import pytest + +from flwr.common.constant import SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS + +from .flwr_model import _parse_args_run_flwr_model + +flwr_model_module = importlib.import_module("flwr.supercore.cli.flwr_model") + + +def test_parse_flwr_model_requires_token() -> None: + """The ModelApp process CLI should require a token.""" + with pytest.raises(SystemExit): + _parse_args_run_flwr_model().parse_args([]) + + +def test_parse_flwr_model_rejects_run_once() -> None: + """The removed deprecated flag should no longer parse.""" + with pytest.raises(SystemExit): + _parse_args_run_flwr_model().parse_args(["--token", "test-token", "--run-once"]) + + +def test_parse_flwr_model_parses_tokenized_invocation() -> None: + """The ModelApp process CLI should still parse the supported flags.""" + args = _parse_args_run_flwr_model().parse_args( + [ + "--token", + "test-token", + "--insecure", + "--parent-pid", + "1234", + "--allow-runtime-dependency-installation", + ] + ) + + assert args.serverappio_api_address == SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS + assert args.token == "test-token" + assert args.insecure is True + assert args.parent_pid == 1234 + assert args.runtime_dependency_install is True + + +def test_flwr_model_parses_args_before_mirroring_output() -> None: + """Argument parsing should happen before stdout/stderr redirection.""" + + class _Parser: + def parse_args(self) -> SimpleNamespace: + """Raise a parser error before any side effects happen.""" + raise SystemExit(2) + + mirror_output_to_queue = Mock() + + with ( + patch.object(flwr_model_module, "_parse_args_run_flwr_model", _Parser), + patch.object( + flwr_model_module, + "mirror_output_to_queue", + mirror_output_to_queue, + ), + pytest.raises(SystemExit), + ): + flwr_model_module.flwr_model() + + mirror_output_to_queue.assert_not_called() + + +def test_flwr_model_forwards_cli_args() -> None: + """The ModelApp CLI should forward parsed args to the runtime.""" + args = SimpleNamespace( + insecure=True, + serverappio_api_address="127.0.0.1:9091", + token="test-token", + parent_pid=321, + runtime_dependency_install=True, + ) + + class _Parser: + def parse_args(self) -> SimpleNamespace: + """Return a fixed namespace for CLI forwarding tests.""" + return args + + mirror_output_to_queue = Mock() + restore_output = Mock() + run_model = Mock() + + with ( + patch.object(flwr_model_module, "_parse_args_run_flwr_model", _Parser), + patch.object( + flwr_model_module, + "mirror_output_to_queue", + mirror_output_to_queue, + ), + patch.object(flwr_model_module, "restore_output", restore_output), + patch.object(flwr_model_module, "run_model", run_model), + ): + flwr_model_module.flwr_model() + + mirror_output_to_queue.assert_called_once() + restore_output.assert_called_once_with() + run_model.assert_called_once() + kwargs = run_model.call_args.kwargs + assert kwargs["serverappio_api_address"] == "127.0.0.1:9091" + assert kwargs["log_queue"] is mirror_output_to_queue.call_args.args[0] + assert kwargs["token"] == "test-token" + assert kwargs["certificates"] is None + assert kwargs["parent_pid"] == 321 + assert kwargs["runtime_dependency_install"] is True diff --git a/framework/py/flwr/supercore/executors/__init__.py b/framework/py/flwr/supercore/executors/__init__.py index 83130220f0b5..be0c0617bd01 100644 --- a/framework/py/flwr/supercore/executors/__init__.py +++ b/framework/py/flwr/supercore/executors/__init__.py @@ -16,7 +16,11 @@ from .run_agentapp import run_agentapp +from .run_connector import run_connector +from .run_model import run_model __all__ = [ "run_agentapp", + "run_connector", + "run_model", ] diff --git a/framework/py/flwr/supercore/executors/run_connector.py b/framework/py/flwr/supercore/executors/run_connector.py new file mode 100644 index 000000000000..8952119d57ae --- /dev/null +++ b/framework/py/flwr/supercore/executors/run_connector.py @@ -0,0 +1,74 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Flower ConnectorApp process.""" + + +from pathlib import Path +from queue import Queue + +from flwr.common import EventType +from flwr.common.constant import RUNTIME_DEPENDENCY_INSTALL +from flwr.common.exit import ExitCode, flwr_exit, register_signal_handlers +from flwr.common.logger import stop_log_uploader +from flwr.supercore.app_utils import start_parent_process_monitor +from flwr.supercore.superexec.dependency_installer import ( + cleanup_app_runtime_environment, +) + + +def run_connector( # pylint: disable=R0913, R0917 + serverappio_api_address: str, + log_queue: Queue[str | None], + token: str, + certificates: bytes | None = None, + parent_pid: int | None = None, + runtime_dependency_install: bool = RUNTIME_DEPENDENCY_INSTALL, +) -> None: + """Run Flower ConnectorApp process. + + This runtime is intentionally a stub until ConnectorApp execution support is added. + """ + # Monitor the main process in case of SIGKILL + if parent_pid is not None: + start_parent_process_monitor(parent_pid) + + log_uploader = None + runtime_env_dir: Path | None = None + + def on_exit() -> None: + if log_uploader: + stop_log_uploader(log_queue, log_uploader) + cleanup_app_runtime_environment(runtime_env_dir) + + register_signal_handlers( + event_type=EventType.FLWR_CONNECTOR_RUN_LEAVE, + exit_message="Run stopped by user.", + exit_handlers=[on_exit], + ) + + _ = ( + serverappio_api_address, + log_queue, + token, + certificates, + parent_pid, + runtime_dependency_install, + ) + flwr_exit( + ExitCode.SERVERAPP_EXCEPTION, + "`flwr-connector` is not implemented yet.", + event_type=EventType.FLWR_CONNECTOR_RUN_LEAVE, + event_details={"success": False}, + ) diff --git a/framework/py/flwr/supercore/executors/run_model.py b/framework/py/flwr/supercore/executors/run_model.py new file mode 100644 index 000000000000..df81d3fd4e34 --- /dev/null +++ b/framework/py/flwr/supercore/executors/run_model.py @@ -0,0 +1,74 @@ +# Copyright 2026 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Flower ModelApp process.""" + + +from pathlib import Path +from queue import Queue + +from flwr.common import EventType +from flwr.common.constant import RUNTIME_DEPENDENCY_INSTALL +from flwr.common.exit import ExitCode, flwr_exit, register_signal_handlers +from flwr.common.logger import stop_log_uploader +from flwr.supercore.app_utils import start_parent_process_monitor +from flwr.supercore.superexec.dependency_installer import ( + cleanup_app_runtime_environment, +) + + +def run_model( # pylint: disable=R0913, R0917 + serverappio_api_address: str, + log_queue: Queue[str | None], + token: str, + certificates: bytes | None = None, + parent_pid: int | None = None, + runtime_dependency_install: bool = RUNTIME_DEPENDENCY_INSTALL, +) -> None: + """Run Flower ModelApp process. + + This runtime is intentionally a stub until ModelApp execution support is added. + """ + # Monitor the main process in case of SIGKILL + if parent_pid is not None: + start_parent_process_monitor(parent_pid) + + log_uploader = None + runtime_env_dir: Path | None = None + + def on_exit() -> None: + if log_uploader: + stop_log_uploader(log_queue, log_uploader) + cleanup_app_runtime_environment(runtime_env_dir) + + register_signal_handlers( + event_type=EventType.FLWR_MODEL_RUN_LEAVE, + exit_message="Run stopped by user.", + exit_handlers=[on_exit], + ) + + _ = ( + serverappio_api_address, + log_queue, + token, + certificates, + parent_pid, + runtime_dependency_install, + ) + flwr_exit( + ExitCode.SERVERAPP_EXCEPTION, + "`flwr-model` is not implemented yet.", + event_type=EventType.FLWR_MODEL_RUN_LEAVE, + event_details={"success": False}, + ) diff --git a/framework/pyproject.toml b/framework/pyproject.toml index 3babb071f2f4..c946150d211c 100644 --- a/framework/pyproject.toml +++ b/framework/pyproject.toml @@ -83,6 +83,8 @@ flower-superexec = "flwr.supercore.cli:flower_superexec" flwr-serverapp = "flwr.server.serverapp:flwr_serverapp" flwr-clientapp = "flwr.supernode.cli:flwr_clientapp" flwr-agentapp = "flwr.supercore.cli:flwr_agentapp" +flwr-model = "flwr.supercore.cli:flwr_model" +flwr-connector = "flwr.supercore.cli:flwr_connector" [project.urls] homepage = "https://flower.ai"