diff --git a/nebula/controller/federation/controllers/docker_federation_controller.py b/nebula/controller/federation/controllers/docker_federation_controller.py index 890aa1262..5fa2f3b66 100644 --- a/nebula/controller/federation/controllers/docker_federation_controller.py +++ b/nebula/controller/federation/controllers/docker_federation_controller.py @@ -7,8 +7,8 @@ import docker from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.scenario_builder import ScenarioBuilder -from nebula.controller.federation.utils_requests import factory_requests_path -from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest +from nebula.controller.federation.utils_requests import factory_requests +from nebula.controller.federation.utils_requests import RemoveScenarioRequest, NodeUpdateRequest, NodeDoneRequest from typing import Dict from nebula.config.config import Config from nebula.core.utils.certificate import generate_ca_certificate @@ -209,6 +209,36 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id)) return {"message": "Nodes done received successfully"} + async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest): + if(await self._check_active_federation(federation_id)): + self.logger.info(f"WARNING: Cannot remove files from active federation: ({federation_id})") + return False + + folder_name = remove_scenario_request.user+"_"+remove_scenario_request.scenario_name + scenario_config_path = os.path.join(self.config_dir, folder_name) + scenario_log_path = os.path.join(self.log_dir, folder_name) + + if not os.path.exists(scenario_config_path): + self.logger.info(f"ERROR {scenario_config_path} - no config folder found") + if not os.path.exists(scenario_log_path): + self.logger.info(f"ERROR {scenario_log_path} - no log folder found") + + try: + shutil.rmtree(scenario_config_path) + self.logger.info(f"Removed config folder {scenario_config_path}") + except Exception as e: + self.logger.info(f"Could not remove config folder {scenario_config_path}: {e}") + return False + + try: + shutil.rmtree(scenario_log_path) + self.logger.info(f"Removed log folder {scenario_log_path}") + except Exception as e: + self.logger.info(f"Could not remove log folder {scenario_log_path}: {e}") + return False + + return True + """ ############################### # FUNCTIONALITIES # ############################### @@ -235,6 +265,13 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..") return None + async def _check_active_federation(self, federation_id: str) -> bool: + async with self._federations_dict_lock: + if federation_id in self.nfp: + return True + else: + return False + async def _update_federation_on_pool(self, federation_id: str, user: str, nf: NebulaFederationDocker): updated = False async with self._federations_dict_lock: @@ -246,11 +283,9 @@ async def _update_federation_on_pool(self, federation_id: str, user: str, nf: Ne self.logger.info(f"ERROR: trying to update ({federation_id}) on federations pool..") return updated - async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ): + async def _send_to_hub(self, operation, payload, **kwargs): try: - url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id) - # self.logger.info(f"Sending to hub, url: {url_request}") - # self.logger.info(f"payload sent to hub, data: {payload}") + url_request = self._hub_url + factory_requests(operation, **kwargs) await APIUtils.post(url_request, payload) except Exception as e: self.logger.info(f"Failed to send update to Hub: {e}") @@ -263,13 +298,12 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat self.root_path = os.environ.get("NEBULA_ROOT_HOST") self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM") - # self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name) - # self.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name) + self.config_dir = os.environ.get("NEBULA_CONFIG_DIR") + self.log_dir = os.environ.get("NEBULA_LOGS_DIR") federation.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name) federation.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name) self.cert_dir = os.environ.get("NEBULA_CERTS_DIR") self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True" - #self.config = Config(entity="FederationController") self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev") self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev") self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown")) diff --git a/nebula/controller/federation/controllers/processes_federation_controller.py b/nebula/controller/federation/controllers/processes_federation_controller.py index f7f208677..20eef4666 100644 --- a/nebula/controller/federation/controllers/processes_federation_controller.py +++ b/nebula/controller/federation/controllers/processes_federation_controller.py @@ -7,8 +7,8 @@ import docker from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.scenario_builder import ScenarioBuilder -from nebula.controller.federation.utils_requests import factory_requests_path -from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest +from nebula.controller.federation.utils_requests import factory_requests +from nebula.controller.federation.utils_requests import RemoveScenarioRequest, NodeUpdateRequest, NodeDoneRequest from typing import Dict from fastapi import Request from nebula.config.config import Config @@ -195,6 +195,36 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id)) return {"message": "Nodes done received successfully"} + async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest): + if(await self._check_active_federation(federation_id)): + self.logger.info(f"WARNING: Cannot remove files from active federation: ({federation_id})") + return False + + folder_name = remove_scenario_request.user+"_"+remove_scenario_request.scenario_name + scenario_config_path = os.path.join(self.config_dir, folder_name) + scenario_log_path = os.path.join(self.log_dir, folder_name) + + if not os.path.exists(scenario_config_path): + self.logger.info(f"ERROR {scenario_config_path} - no config folder found") + if not os.path.exists(scenario_log_path): + self.logger.info(f"ERROR {scenario_log_path} - no log folder found") + + try: + shutil.rmtree(scenario_config_path) + self.logger.info(f"Removed config folder {scenario_config_path}") + except Exception as e: + self.logger.info(f"Could not remove config folder {scenario_config_path}: {e}") + return False + + try: + shutil.rmtree(scenario_log_path) + self.logger.info(f"Removed log folder {scenario_log_path}") + except Exception as e: + self.logger.info(f"Could not remove log folder {scenario_log_path}: {e}") + return False + + return True + """ ############################### # FUNCTIONALITIES # ############################### @@ -220,12 +250,17 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul else: self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..") return None + + async def _check_active_federation(self, federation_id: str) -> bool: + async with self._federations_dict_lock: + if federation_id in self.nfp: + return True + else: + return False - async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ): + async def _send_to_hub(self, operation, payload, **kwargs): try: - url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id) - # self.logger.info(f"Seding to hub, url: {url_request}") - # self.logger.info(f"payload sent to hub, data: {payload}") + url_request = self._hub_url + factory_requests(operation, **kwargs) await APIUtils.post(url_request, payload) except Exception as e: self.logger.info(f"Failed to send update to Hub: {e}") @@ -238,13 +273,12 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat self.root_path = os.environ.get("NEBULA_ROOT_HOST") self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM") - # self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name) - # self.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name) + self.config_dir = os.environ.get("NEBULA_CONFIG_DIR") + self.log_dir = os.environ.get("NEBULA_LOGS_DIR") federation.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name) federation.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name) self.cert_dir = os.environ.get("NEBULA_CERTS_DIR") self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True" - # self.config = Config(entity="scenarioManagement") self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev") self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev") self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown")) diff --git a/nebula/controller/federation/federation_api.py b/nebula/controller/federation/federation_api.py index b6edd00b5..f7513c828 100644 --- a/nebula/controller/federation/federation_api.py +++ b/nebula/controller/federation/federation_api.py @@ -10,7 +10,7 @@ from nebula.utils import LoggerUtils from nebula.controller.federation.federation_controller import FederationController from nebula.controller.federation.factory_federation_controller import federation_controller_factory -from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes +from nebula.controller.federation.utils_requests import RemoveScenarioRequest, RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes fed_controllers: Dict[str, FederationController] = {} @@ -103,6 +103,19 @@ async def node_done( return await controller.node_done(federation_id, node_done_request) else: return {"message": "Experiment type not allowed on responde for Node done message.."} + +@app.post(Routes.REMOVE) +async def scenario_remove( + federation_id: str, + remove_scenario_request: RemoveScenarioRequest, +): + global fed_controllers + experiment_type = remove_scenario_request.experiment_type + controller = fed_controllers.get(experiment_type, None) + if controller: + return await controller.remove_scenario(federation_id, remove_scenario_request) + else: + return {"message": "Experiment type not allowed on responde for scenario remove message.."} if __name__ == "__main__": # Parse args from command line diff --git a/nebula/controller/federation/federation_controller.py b/nebula/controller/federation/federation_controller.py index 99bdda0bc..290aae62d 100644 --- a/nebula/controller/federation/federation_controller.py +++ b/nebula/controller/federation/federation_controller.py @@ -2,7 +2,7 @@ from fastapi import Request from typing import Dict from nebula.controller.federation.scenario_builder import ScenarioBuilder -from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest +from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest, RemoveScenarioRequest import logging class NebulaFederation(ABC): @@ -32,4 +32,8 @@ async def update_nodes(self, federation_id: str, node_update_request: NodeUpdate abstractmethod async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest): + pass + + abstractmethod + async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest): pass \ No newline at end of file diff --git a/nebula/controller/federation/utils_requests.py b/nebula/controller/federation/utils_requests.py index e6fe76c2b..e51d7f22a 100644 --- a/nebula/controller/federation/utils_requests.py +++ b/nebula/controller/federation/utils_requests.py @@ -19,6 +19,11 @@ class NodeDoneRequest(BaseModel): name: str federation_id: str +class RemoveScenarioRequest(BaseModel): + experiment_type: str + user: str + scenario_name: str + class Routes: INIT = "/init" RUN = "/scenarios/run" @@ -26,21 +31,18 @@ class Routes: UPDATE = "/nodes/{federation_id}/update" DONE = "/nodes/{federation_id}/done" FINISH = "/scenarios/{federation_id}/finish" + REMOVE = "scenario/{federation_id}/remove" + + @classmethod + def format(cls, route: str, **kwargs) -> str: + return getattr(cls, route).format(**kwargs) -def factory_requests_path(resource: str, scenario_name: str = "", federation_id: str = "") -> str: - if resource == "init": - return "/init" - elif resource == "run": - return Routes.RUN - elif resource == "stop": - return Routes.STOP.format(federation_id=federation_id) - elif resource == "update": - return Routes.UPDATE.format(federation_id=federation_id) - elif resource == "done": - return Routes.DONE.format(federation_id=federation_id) - elif resource == "finish": - return Routes.FINISH.format(federation_id=federation_id) - else: - raise Exception(f"resource not found: {resource}") +def factory_requests(resource: str, **kwargs) -> str: + try: + return Routes.format(resource.upper(), **kwargs) + except AttributeError: + raise ValueError(f"Resource not found: {resource}") + except KeyError as e: + raise ValueError(f"Missing parameter for route '{resource}': {e}") \ No newline at end of file