Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import docker
from nebula.controller.federation.federation_controller import FederationController
from nebula.controller.federation.scenario_builder import ScenarioBuilder
from nebula.controller.federation.utils_requests import factory_requests_path
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
from nebula.controller.federation.utils_requests import factory_requests
from nebula.controller.federation.utils_requests import RemoveScenarioRequest, NodeUpdateRequest, NodeDoneRequest
from typing import Dict
from nebula.config.config import Config
from nebula.core.utils.certificate import generate_ca_certificate
Expand Down Expand Up @@ -209,6 +209,36 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
return {"message": "Nodes done received successfully"}

async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest):
if(await self._check_active_federation(federation_id)):
self.logger.info(f"WARNING: Cannot remove files from active federation: ({federation_id})")
return False

folder_name = remove_scenario_request.user+"_"+remove_scenario_request.scenario_name
scenario_config_path = os.path.join(self.config_dir, folder_name)
scenario_log_path = os.path.join(self.log_dir, folder_name)

if not os.path.exists(scenario_config_path):
self.logger.info(f"ERROR {scenario_config_path} - no config folder found")
if not os.path.exists(scenario_log_path):
self.logger.info(f"ERROR {scenario_log_path} - no log folder found")

try:
shutil.rmtree(scenario_config_path)
self.logger.info(f"Removed config folder {scenario_config_path}")
except Exception as e:
self.logger.info(f"Could not remove config folder {scenario_config_path}: {e}")
return False

try:
shutil.rmtree(scenario_log_path)
self.logger.info(f"Removed log folder {scenario_log_path}")
except Exception as e:
self.logger.info(f"Could not remove log folder {scenario_log_path}: {e}")
return False

return True

""" ###############################
# FUNCTIONALITIES #
###############################
Expand All @@ -235,6 +265,13 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul
self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..")
return None

async def _check_active_federation(self, federation_id: str) -> bool:
async with self._federations_dict_lock:
if federation_id in self.nfp:
return True
else:
return False

async def _update_federation_on_pool(self, federation_id: str, user: str, nf: NebulaFederationDocker):
updated = False
async with self._federations_dict_lock:
Expand All @@ -246,11 +283,9 @@ async def _update_federation_on_pool(self, federation_id: str, user: str, nf: Ne
self.logger.info(f"ERROR: trying to update ({federation_id}) on federations pool..")
return updated

async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ):
async def _send_to_hub(self, operation, payload, **kwargs):
try:
url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id)
# self.logger.info(f"Sending to hub, url: {url_request}")
# self.logger.info(f"payload sent to hub, data: {payload}")
url_request = self._hub_url + factory_requests(operation, **kwargs)
await APIUtils.post(url_request, payload)
except Exception as e:
self.logger.info(f"Failed to send update to Hub: {e}")
Expand All @@ -263,13 +298,12 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat

self.root_path = os.environ.get("NEBULA_ROOT_HOST")
self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM")
# self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
# self.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
self.config_dir = os.environ.get("NEBULA_CONFIG_DIR")
self.log_dir = os.environ.get("NEBULA_LOGS_DIR")
federation.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
federation.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
self.cert_dir = os.environ.get("NEBULA_CERTS_DIR")
self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True"
#self.config = Config(entity="FederationController")
self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev")
self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev")
self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import docker
from nebula.controller.federation.federation_controller import FederationController
from nebula.controller.federation.scenario_builder import ScenarioBuilder
from nebula.controller.federation.utils_requests import factory_requests_path
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
from nebula.controller.federation.utils_requests import factory_requests
from nebula.controller.federation.utils_requests import RemoveScenarioRequest, NodeUpdateRequest, NodeDoneRequest
from typing import Dict
from fastapi import Request
from nebula.config.config import Config
Expand Down Expand Up @@ -195,6 +195,36 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
return {"message": "Nodes done received successfully"}

async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest):
if(await self._check_active_federation(federation_id)):
self.logger.info(f"WARNING: Cannot remove files from active federation: ({federation_id})")
return False

folder_name = remove_scenario_request.user+"_"+remove_scenario_request.scenario_name
scenario_config_path = os.path.join(self.config_dir, folder_name)
scenario_log_path = os.path.join(self.log_dir, folder_name)

if not os.path.exists(scenario_config_path):
self.logger.info(f"ERROR {scenario_config_path} - no config folder found")
if not os.path.exists(scenario_log_path):
self.logger.info(f"ERROR {scenario_log_path} - no log folder found")

try:
shutil.rmtree(scenario_config_path)
self.logger.info(f"Removed config folder {scenario_config_path}")
except Exception as e:
self.logger.info(f"Could not remove config folder {scenario_config_path}: {e}")
return False

try:
shutil.rmtree(scenario_log_path)
self.logger.info(f"Removed log folder {scenario_log_path}")
except Exception as e:
self.logger.info(f"Could not remove log folder {scenario_log_path}: {e}")
return False

return True

""" ###############################
# FUNCTIONALITIES #
###############################
Expand All @@ -220,12 +250,17 @@ async def _remove_nebula_federation_from_pool(self, federation_id: str) -> Nebul
else:
self.logger.info(f"ERROR: trying to remove ({federation_id}) from federations pool..")
return None

async def _check_active_federation(self, federation_id: str) -> bool:
async with self._federations_dict_lock:
if federation_id in self.nfp:
return True
else:
return False

async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ):
async def _send_to_hub(self, operation, payload, **kwargs):
try:
url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id)
# self.logger.info(f"Seding to hub, url: {url_request}")
# self.logger.info(f"payload sent to hub, data: {payload}")
url_request = self._hub_url + factory_requests(operation, **kwargs)
await APIUtils.post(url_request, payload)
except Exception as e:
self.logger.info(f"Failed to send update to Hub: {e}")
Expand All @@ -238,13 +273,12 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat

self.root_path = os.environ.get("NEBULA_ROOT_HOST")
self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM")
# self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
# self.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
self.config_dir = os.environ.get("NEBULA_CONFIG_DIR")
self.log_dir = os.environ.get("NEBULA_LOGS_DIR")
federation.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
federation.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
self.cert_dir = os.environ.get("NEBULA_CERTS_DIR")
self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True"
# self.config = Config(entity="scenarioManagement")
self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev")
self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev")
self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown"))
Expand Down
15 changes: 14 additions & 1 deletion nebula/controller/federation/federation_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from nebula.utils import LoggerUtils
from nebula.controller.federation.federation_controller import FederationController
from nebula.controller.federation.factory_federation_controller import federation_controller_factory
from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes
from nebula.controller.federation.utils_requests import RemoveScenarioRequest, RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes

fed_controllers: Dict[str, FederationController] = {}

Expand Down Expand Up @@ -103,6 +103,19 @@ async def node_done(
return await controller.node_done(federation_id, node_done_request)
else:
return {"message": "Experiment type not allowed on responde for Node done message.."}

@app.post(Routes.REMOVE)
async def scenario_remove(
federation_id: str,
remove_scenario_request: RemoveScenarioRequest,
):
global fed_controllers
experiment_type = remove_scenario_request.experiment_type
controller = fed_controllers.get(experiment_type, None)
if controller:
return await controller.remove_scenario(federation_id, remove_scenario_request)
else:
return {"message": "Experiment type not allowed on responde for scenario remove message.."}

if __name__ == "__main__":
# Parse args from command line
Expand Down
6 changes: 5 additions & 1 deletion nebula/controller/federation/federation_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from fastapi import Request
from typing import Dict
from nebula.controller.federation.scenario_builder import ScenarioBuilder
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest, RemoveScenarioRequest
import logging

class NebulaFederation(ABC):
Expand Down Expand Up @@ -32,4 +32,8 @@ async def update_nodes(self, federation_id: str, node_update_request: NodeUpdate

abstractmethod
async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest):
pass

abstractmethod
async def remove_scenario(self, federation_id: str, remove_scenario_request: RemoveScenarioRequest):
pass
32 changes: 17 additions & 15 deletions nebula/controller/federation/utils_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ class NodeDoneRequest(BaseModel):
name: str
federation_id: str

class RemoveScenarioRequest(BaseModel):
experiment_type: str
user: str
scenario_name: str

class Routes:
INIT = "/init"
RUN = "/scenarios/run"
STOP = "/scenarios/{federation_id}/stop"
UPDATE = "/nodes/{federation_id}/update"
DONE = "/nodes/{federation_id}/done"
FINISH = "/scenarios/{federation_id}/finish"
REMOVE = "scenario/{federation_id}/remove"

@classmethod
def format(cls, route: str, **kwargs) -> str:
return getattr(cls, route).format(**kwargs)

def factory_requests_path(resource: str, scenario_name: str = "", federation_id: str = "") -> str:
if resource == "init":
return "/init"
elif resource == "run":
return Routes.RUN
elif resource == "stop":
return Routes.STOP.format(federation_id=federation_id)
elif resource == "update":
return Routes.UPDATE.format(federation_id=federation_id)
elif resource == "done":
return Routes.DONE.format(federation_id=federation_id)
elif resource == "finish":
return Routes.FINISH.format(federation_id=federation_id)
else:
raise Exception(f"resource not found: {resource}")
def factory_requests(resource: str, **kwargs) -> str:
try:
return Routes.format(resource.upper(), **kwargs)
except AttributeError:
raise ValueError(f"Resource not found: {resource}")
except KeyError as e:
raise ValueError(f"Missing parameter for route '{resource}': {e}")