diff --git a/nebula/controller/federation/controllers/docker_federation_controller.py b/nebula/controller/federation/controllers/docker_federation_controller.py index 5fa2f3b66..30870df78 100644 --- a/nebula/controller/federation/controllers/docker_federation_controller.py +++ b/nebula/controller/federation/controllers/docker_federation_controller.py @@ -4,6 +4,7 @@ import os import shutil from nebula.utils import DockerUtils, APIUtils +from datetime import datetime, timezone import docker from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.scenario_builder import ScenarioBuilder @@ -13,9 +14,10 @@ from nebula.config.config import Config from nebula.core.utils.certificate import generate_ca_certificate from nebula.core.utils.locker import Locker +from nebula.controller.federation.resource_manager import ResourceManager, ReleaseDevicesEvent, RAMOverusedEvent class NebulaFederationDocker(): - def __init__(self): + def __init__(self, timestamp: datetime): self.scenario_name = "" self.participants_alive = 0 self.round_per_participant = {} @@ -31,6 +33,7 @@ def __init__(self): self.participants_alive_lock = Locker("participants_alive_lock", async_lock=True) self.config_dir = "" self.log_dir = "" + self.timestamp = timestamp async def get_additionals_to_be_deployed(self, config) -> list: async with self.federation_deployment_lock: @@ -85,12 +88,12 @@ def nfp(self): ############################### """ - async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str): + async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str, rol: str): #TODO maintain files on memory, not read them again federation = await self._add_nebula_federation_to_pool(federation_id, user) scenario_info = {} if federation: - scenario_builder = ScenarioBuilder(federation_id, user=user) + scenario_builder = ScenarioBuilder(federation_id, user=user, rol=rol) await self._initialize_scenario(scenario_builder, scenario_data, federation) generate_ca_certificate(dir_path=self.cert_dir) await self._load_configuration_and_start_nodes(scenario_builder, federation) @@ -200,6 +203,8 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})") if await nebula_federation.is_experiment_finish(): + asyncio.create_task(self._release_devices(federation_id)) + payload = node_done_request.model_dump() self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..") await self._remove_nebula_federation_from_pool(federation_id) @@ -248,7 +253,7 @@ async def _add_nebula_federation_to_pool(self, federation_id: str, user: str): fed = None async with self._federations_dict_lock: if not federation_id in self.nfp: - fed = NebulaFederationDocker() + fed = NebulaFederationDocker(datetime.now(timezone.utc)) self.nfp[federation_id] = fed self.logger.info(f"SUCCESS: new ID: ({federation_id}) added to the pool") else: @@ -264,6 +269,13 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul else: self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..") return None + + async def _get_most_recent_federation(self): + async with self._federations_dict_lock: + if not self.nfp: + return None + federation_id = max(self.nfp, key=lambda k: self.nfp[k].timestamp) + return federation_id async def _check_active_federation(self, federation_id: str) -> bool: async with self._federations_dict_lock: @@ -360,7 +372,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat self.logger.info(f"ERROR while creating files: {e}") try: - participant_config = sb.build_scenario_config_for_node(index, node) + participant_config = await sb.build_scenario_config_for_node(index, node) #self.logger.info(f"dictionary: {participant_config}") except Exception as e: self.logger.info(f"ERROR while building configuration for node: {e}") @@ -615,3 +627,21 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base json.dump(metadata, f, indent=2) return success + + """ ############################### + # RESOURCE MANAGEMENT # + ############################### + """ + + async def initialize_resources_functionalities(self): + await ResourceManager.get_instance().subscribe_resource_event(RAMOverusedEvent, self._ram_overused_event_callback) + + async def _ram_overused_event_callback(self, roe: RAMOverusedEvent): + federation_id = await self._get_most_recent_federation() + if federation_id: + await self.stop_scenario(federation_id) + + async def _release_devices(self, federation_id: str): + rde = ReleaseDevicesEvent(federation_id) + await ResourceManager.get_instance().publish_recource_event(rde) + \ No newline at end of file diff --git a/nebula/controller/federation/controllers/processes_federation_controller.py b/nebula/controller/federation/controllers/processes_federation_controller.py index 20eef4666..cd668af70 100644 --- a/nebula/controller/federation/controllers/processes_federation_controller.py +++ b/nebula/controller/federation/controllers/processes_federation_controller.py @@ -4,19 +4,19 @@ import os import shutil from nebula.utils import APIUtils -import docker +from datetime import datetime, timezone from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.scenario_builder import ScenarioBuilder from nebula.controller.federation.utils_requests import factory_requests from nebula.controller.federation.utils_requests import RemoveScenarioRequest, NodeUpdateRequest, NodeDoneRequest from typing import Dict -from fastapi import Request from nebula.config.config import Config from nebula.core.utils.certificate import generate_ca_certificate from nebula.core.utils.locker import Locker +from nebula.controller.federation.resource_manager import ResourceManager, ReleaseDevicesEvent, RAMOverusedEvent class NebulaFederationProcesses(): - def __init__(self): + def __init__(self, timestamp: datetime): self.scenario_name = "" self.participants_alive = 0 self.round_per_participant = {} @@ -32,6 +32,7 @@ def __init__(self): self.participants_alive_lock = Locker("participants_alive_lock", async_lock=True) self.config_dir = "" self.log_dir = "" + self.timestamp = timestamp async def get_additionals_to_be_deployed(self, config) -> list: async with self.federation_deployment_lock: @@ -86,12 +87,12 @@ def nfp(self): ############################### """ - async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str): + async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str, rol: str): #TODO maintain files on memory, not read them again federation = await self._add_nebula_federation_to_pool(federation_id, user) scenario_info = {} if federation: - scenario_builder = ScenarioBuilder(federation_id, user=user) + scenario_builder = ScenarioBuilder(federation_id, user=user, rol=rol) await self._initialize_scenario(scenario_builder, scenario_data, federation) generate_ca_certificate(dir_path=self.cert_dir) await self._load_configuration_and_start_nodes(scenario_builder, federation) @@ -186,6 +187,8 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})") if await nebula_federation.is_experiment_finish(): + asyncio.create_task(self._release_devices(federation_id)) + payload = node_done_request.model_dump() self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..") await self._remove_nebula_federation_from_pool(federation_id) @@ -234,7 +237,7 @@ async def _add_nebula_federation_to_pool(self, federation_id: str, user: str): fed = None async with self._federations_dict_lock: if not federation_id in self.nfp: - fed = NebulaFederationProcesses() + fed = NebulaFederationProcesses(datetime.now(timezone.utc)) self.nfp[federation_id] = fed self.logger.info(f"SUCCESS: new ID: ({federation_id}) added to the pool") else: @@ -251,6 +254,13 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..") return None + async def _get_most_recent_federation(self): + async with self._federations_dict_lock: + if not self.nfp: + return None + federation_id = max(self.nfp, key=lambda k: self.nfp[k].timestamp) + return federation_id + async def _check_active_federation(self, federation_id: str) -> bool: async with self._federations_dict_lock: if federation_id in self.nfp: @@ -335,7 +345,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat self.logger.info(f"ERROR while creating files: {e}") try: - participant_config = sb.build_scenario_config_for_node(index, node) + participant_config = await sb.build_scenario_config_for_node(index, node) #self.logger.info(f"dictionary: {participant_config}") except Exception as e: self.logger.info(f"ERROR while building configuration for node: {e}") @@ -560,3 +570,20 @@ def _write_commands_on_file(self, commands: str, federation: NebulaFederationPro os.chmod(f"{federation.config_dir}/current_scenario_commands.sh", 0o755) except Exception as e: raise Exception(f"Error starting nodes as processes: {e}") + + """ ############################### + # RESOURCE MANAGEMENT # + ############################### + """ + + async def initialize_resources_functionalities(self): + await ResourceManager.get_instance().subscribe_resource_event(RAMOverusedEvent, self._ram_overused_event_callback) + + async def _ram_overused_event_callback(self, roe: RAMOverusedEvent): + federation_id = await self._get_most_recent_federation() + if federation_id: + await self.stop_scenario(federation_id) + + async def _release_devices(self, federation_id: str): + rde = ReleaseDevicesEvent(federation_id) + await ResourceManager.get_instance().publish_recource_event(rde) \ No newline at end of file diff --git a/nebula/controller/federation/federation_api.py b/nebula/controller/federation/federation_api.py index f7513c828..e81ae5ec3 100644 --- a/nebula/controller/federation/federation_api.py +++ b/nebula/controller/federation/federation_api.py @@ -11,6 +11,7 @@ from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.factory_federation_controller import federation_controller_factory from nebula.controller.federation.utils_requests import RemoveScenarioRequest, RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes +from nebula.controller.federation.resource_manager import ResourceManager fed_controllers: Dict[str, FederationController] = {} @@ -30,9 +31,14 @@ async def lifespan(app: FastAPI): controller_host = os.environ.get("NEBULA_CONTROLLER_HOST") hub_url = f"http://{controller_host}:{hub_port}" + # Initialize resource manager to assign devices availables to federations + #TODO get maxRAM from environ + ResourceManager.get_instance(logger=logger, verbose=False).init() + #["docker", "processes", "physical"] for exp_type in ["docker", "process"]: fed_controllers[exp_type] = federation_controller_factory(exp_type, hub_url, logger) + await fed_controllers[exp_type].initialize_resources_functionalities() logger.info(f"{exp_type} Federation controller created.") yield @@ -59,7 +65,7 @@ async def run_scenario(run_scenario_request: RunScenarioRequest): logger.info(f"[API]: run experiment request for deployment type: {experiment_type}") controller = fed_controllers.get(experiment_type, None) if controller: - return await controller.run_scenario(run_scenario_request.federation_id, run_scenario_request.scenario_data, run_scenario_request.user) + return await controller.run_scenario(run_scenario_request.federation_id, run_scenario_request.scenario_data, run_scenario_request.user, run_scenario_request.rol) else: return {"message": "Experiment type not allowed"} diff --git a/nebula/controller/federation/federation_controller.py b/nebula/controller/federation/federation_controller.py index 290aae62d..b42e01b76 100644 --- a/nebula/controller/federation/federation_controller.py +++ b/nebula/controller/federation/federation_controller.py @@ -19,7 +19,7 @@ def logger(self): return self._logger @abstractmethod - async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str): + async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str, rol: str): pass @abstractmethod @@ -36,4 +36,8 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest abstractmethod async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest): + pass + + abstractmethod + async def initialize_resources_functionalities(self): pass \ No newline at end of file diff --git a/nebula/controller/federation/resource_manager.py b/nebula/controller/federation/resource_manager.py new file mode 100644 index 000000000..7036ee062 --- /dev/null +++ b/nebula/controller/federation/resource_manager.py @@ -0,0 +1,207 @@ +import asyncio +import importlib +from collections.abc import Callable +from typing import Dict, List +import logging +import psutil +from nebula.core.utils.locker import Locker +import inspect +import random +from abc import ABC, abstractmethod + +""" ############################### + # RESOURCE EVENTS # + ############################### +""" + +class ResourceEvent(ABC): + """ + Abstract base class for all resource-related events in the system. + """ + + @abstractmethod + async def get_event_data(self): + """ + Retrieve the data associated with the event. + + Returns: + Any: The event-specific data payload. + """ + pass + +class ReleaseDevicesEvent(ResourceEvent): + def __init__(self, federation_id): + self._federation_id = federation_id + + async def get_event_data(self): + return self._federation_id + +class RAMOverusedEvent(ResourceEvent): + def __init__(self): + pass + async def get_event_data(self): + pass + +""" ############################### + # RESOURCE MANAGER CLASS # + ############################### +""" + +class ResourceManager: + _instance = None + _lock = Locker("event_manager") + + def __new__(cls, *args, **kwargs): + """Singleton implementation""" + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialize(*args, **kwargs) + return cls._instance + + def _initialize(self, logger, verbose=False): + """Single initialization""" + if hasattr(self, "_initialized"): + return + self._initialized = True + self._logger: logging.Logger = logger + self._subscribers: dict[type, list] = {} + self._resources_events_lock = Locker("resources_events_lock", async_lock=True) + self._devices: List = [] + self._available_devices: List = [] + self._available_devices_lock = Locker("_avaialble_devices_lock", async_lock=True) + self._currently_used_devices: Dict[str, str] = {} + self._currently_used_devices_lock = Locker("currently_used_devices_lock", async_lock=True) + self._max_devices_per_user = 0 + self._monitor_cooldown = 10 + self._max_ram = None + self._monitor_task = None + self._verbose = verbose + + @staticmethod + def get_instance(logger=None, verbose=False): + """Static method to obtain EventManager instance""" + if ResourceManager._instance is None: + ResourceManager(logger=logger,verbose=verbose) + return ResourceManager._instance + + @property + def cud(self): + """ + Currently used devices Dictionary + [Federation ID, Devices] + """ + return self._currently_used_devices + + """ ############################### + # RESOURCE EVENTS MANAGEMENT # + ############################### + """ + + async def subscribe_resource_event(self, resource_event: type[ResourceEvent], callback: Callable): + """Register a callback for a specific type of ResouceEvent.""" + async with self._resources_events_lock: + if resource_event not in self._subscribers: + self._subscribers[resource_event] = [] + self._subscribers[resource_event].append(callback) + + async def publish_recource_event(self, resource_event: ResourceEvent): + """Trigger all callbacks registered for a specific type of ResourceEvent.""" + async with self._resources_events_lock: + event_type = type(resource_event) + callbacks = self._subscribers.get(event_type, []) + + for callback in callbacks: + try: + if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback): + await callback(resource_event) + else: + callback(resource_event) + except Exception as e: + raise Exception(f"{e}") + + """ ############################### + # FUNCTIONALITIES # + ############################### + """ + + async def init(self): + await self._get_available_gpu() + await self.subscribe_resource_event(ReleaseDevicesEvent, self._release_device_used) + if self._max_ram: + self._monitor_task = asyncio.create_task(self._monitor_resources()) + + async def _get_available_gpu(self): + if importlib.util.find_spec("pynvml") is not None: + try: + import pynvml + await asyncio.to_thread(pynvml.nvmlInit) + devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount) + self._devices = devices + self._max_devices_per_user = len(devices) + await self._update_available_devices(self._devices) + except Exception: + pass + + def _verify_valid_devices(self, devices: List): + return all(d in self._devices for d in devices) + + def _devices_allowed_for_permissions(self, permissions: str): + if permissions == "admin": + return self._max_devices_per_user + else: + return 1 + + async def _remove_available_devices(self, devices: List): + async with self._available_devices_lock: + self._available_devices.remove(devices) + if self._verbose: + self._logger.info(f"[ResourceManager] - REMOVE available devices: {devices}") + + async def _update_available_devices(self, devices: List): + async with self._available_devices_lock: + self._available_devices.extend(devices) + if self._verbose: + self._logger.info(f"[ResourceManager] - UPDATE available devices: {devices}") + + async def _get_devices(self, n: int): + async with self._available_devices_lock: + n_devices = min(n, len(self._available_devices)) + if n_devices > 0: + devices = random.sample(self._available_devices, n_devices) + self._available_devices.remove(devices) + else: + devices = [] + return devices + + async def assign_device_to_federation(self, federation_id: str, permissions: str): + n_devices = self._devices_allowed_for_permissions(permissions=permissions) + devices = await self._get_devices(n_devices) + async with self._currently_used_devices_lock: + self.cud[federation_id] = devices + if self._verbose: + self._logger.info(f"[ResourceManager] - ALLOCATED federation ID: {federation_id}, devices: {devices}") + return devices + + async def _release_device_used(self, rde: ReleaseDevicesEvent): + federation_id = await rde.get_event_data() + async with self._currently_used_devices_lock: + devices = self.cud.pop(federation_id, None) + if devices: + await self._update_available_devices(devices) + else: + raise Exception(f"Not found devices for federation ID: ({federation_id})") + + """ ############################### + # RESOURCES MONITOR # + ############################### + """ + + async def _monitor_resources(self): + while True: + await asyncio.sleep(self._monitor_cooldown) + memory_info = await asyncio.to_thread(psutil.virtual_memory) + if memory_info.percent > self._max_ram: + asyncio.create_task(self.publish_recource_event(RAMOverusedEvent())) + if self._verbose: + self._logger.info(f"[ResourceManager] - MONITOR RAM overused detected") diff --git a/nebula/controller/federation/scenario_builder.py b/nebula/controller/federation/scenario_builder.py index ab4f2ffd9..f77b72c58 100644 --- a/nebula/controller/federation/scenario_builder.py +++ b/nebula/controller/federation/scenario_builder.py @@ -7,9 +7,10 @@ from nebula.config.config import Config from nebula.core.utils.certificate import generate_certificate from nebula.core.datasets.nebuladataset import NebulaDataset, factory_nebuladataset, factory_dataset_setup +from nebula.controller.federation.resource_manager import ResourceManager class ScenarioBuilder(): - def __init__(self, federation_id, user): + def __init__(self, federation_id, user, rol): self._scenario_data = None self._config_setup = None self.logger = logging.getLogger("Federation-Controller") @@ -17,6 +18,7 @@ def __init__(self, federation_id, user): self._scenario_name = "" self._federation_id = federation_id self._user = user + self._rol = rol @property def sd(self): @@ -387,7 +389,7 @@ def _mobility_assign(self, nodes, mobile_participants_percent): ############################### """ - def build_scenario_config_for_node(self, index, node) -> dict: + async def build_scenario_config_for_node(self, index, node) -> dict: self.logger.info(f"Start building the scenario configuration for participant {index}") def recursive_defaultdict(): @@ -434,7 +436,7 @@ def dictify(d): participant_config["training_args"]["epochs"] = int(self.sd["epochs"]) participant_config["training_args"]["trainer"] = "lightning" participant_config["device_args"]["accelerator"] = self.sd["accelerator"] - participant_config["device_args"]["gpu_id"] = self.sd["gpu_id"] + participant_config["device_args"]["gpu_id"] = await ResourceManager.get_instance().assign_device_to_federation(self._federation_id, self._rol) #self.sd["gpu_id"] participant_config["device_args"]["logging"] = self.sd["logginglevel"] participant_config["aggregator_args"]["algorithm"] = self.sd["agg_algorithm"] participant_config["aggregator_args"]["aggregation_timeout"] = 60 diff --git a/nebula/controller/federation/utils_requests.py b/nebula/controller/federation/utils_requests.py index e51d7f22a..84a9e0995 100644 --- a/nebula/controller/federation/utils_requests.py +++ b/nebula/controller/federation/utils_requests.py @@ -4,11 +4,11 @@ class RunScenarioRequest(BaseModel): scenario_data: Dict[str, Any] user: str + rol: str federation_id: str class StopScenarioRequest(BaseModel): experiment_type: str - federation_id: str class NodeUpdateRequest(BaseModel): config: Dict[str, Any] = {}