diff --git a/mlcube/mlcube/__main__.py b/mlcube/mlcube/__main__.py index 3a69e475..45b5eff1 100644 --- a/mlcube/mlcube/__main__.py +++ b/mlcube/mlcube/__main__.py @@ -49,7 +49,7 @@ def parser_process(value: t.Text, state: click.parser.ParsingState): def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platform: t.Optional[t.Text], - workspace: t.Optional[t.Text], + workspace: t.Optional[t.Text], aws: t.Optional[t.Text], resolve: bool) -> t.Tuple[t.Optional[t.Type[Runner]], DictConfig]: """ Args: @@ -74,7 +74,7 @@ def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platfor else: runner_cls, runner_config = None, None mlcube_config = MLCubeConfig.create_mlcube_config( - os.path.join(mlcube_inst.path, mlcube_inst.file), mlcube_cli_args, task_cli_args, runner_config, workspace, + os.path.join(mlcube_inst.path, mlcube_inst.file), mlcube_cli_args, task_cli_args, runner_config, workspace, aws, resolve=resolve, runner_cls=runner_cls ) return runner_cls, mlcube_config @@ -103,6 +103,10 @@ def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platfor '--workspace', required=False, type=str, default=None, help="Workspace location that is used to store input/output artifacts of MLCube tasks." ) +aws_option = click.option( + '--aws', required=False, type=str, default=os.path.join(os.getcwd(), 'aws_credentials.json'), + help="Aws config file location that contains credentials." +) @click.group(name='mlcube', help="MLCube 📦 is a packaging tool for ML models") @@ -132,7 +136,7 @@ def show_config(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, works workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used. resolve: if True, compute values in MLCube configuration. """ - _, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve) + _, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve, aws=None) print(OmegaConf.to_yaml(mlcube_config)) @@ -148,11 +152,29 @@ def configure(ctx: click.core.Context, mlcube: t.Text, platform: t.Text) -> None mlcube: Path to MLCube root directory or mlcube.yaml file. platform: Platform to use to configure this MLCube for (docker, singularity, gcp, k8s etc). """ - runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace=None, resolve=True) + runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace=None, aws=None, resolve=True) docker_runner = runner_cls(mlcube_config, task=None) docker_runner.configure() +@cli.command(name='upload', help='upload MLCube image.', + context_settings=dict(ignore_unknown_options=True, allow_extra_args=True)) +@mlcube_option +@platform_option +@aws_option +@click.pass_context +def upload(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, aws: t.Text) -> None: + """ + Args: + ctx: Click context. We need this to get access to extra CLI arguments. + mlcube: Path to MLCube root directory or mlcube.yaml file. + platform: Platform to use to configure this MLCube for (docker, singularity, gcp, k8s etc). + """ + runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace=None, aws=aws, resolve=True) + docker_runner = runner_cls(mlcube_config, task=None) + docker_runner.upload() + + @cli.command(name='run', help='Run MLCube ML task.', context_settings=dict(ignore_unknown_options=True, allow_extra_args=True)) @mlcube_option @@ -169,7 +191,7 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text, task: Comma separated list of tasks to run. workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used. """ - runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve=True) + runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, aws=None, resolve=True) mlcube_tasks: t.List[str] = list((mlcube_config.get('tasks', None) or {}).keys()) # Tasks in this MLCube. tasks: t.List[str] = CliParser.parse_list_arg(task, default=None) # Requested tasks. @@ -197,7 +219,7 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text, @cli.command(name='describe', help='Describe MLCube.') @mlcube_option def describe(mlcube: t.Text) -> None: - _, mlcube_config = _parse_cli_args(None, mlcube, None, None, resolve=True) + _, mlcube_config = _parse_cli_args(None, mlcube, None, None, None, resolve=True) print(f"MLCube") print(f" path = {mlcube_config.runtime.root}") print(f" name = {mlcube_config.name}:{mlcube_config.get('version', 'latest')}") diff --git a/mlcube/mlcube/config.py b/mlcube/mlcube/config.py index 373552e5..806b3cf5 100644 --- a/mlcube/mlcube/config.py +++ b/mlcube/mlcube/config.py @@ -48,7 +48,7 @@ def get_uri(value: t.Text) -> t.Text: @staticmethod def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional[DictConfig] = None, task_cli_args: t.Optional[t.Dict] = None, runner_config: t.Optional[DictConfig] = None, - workspace: t.Optional[t.Text] = None, resolve: bool = True, + workspace: t.Optional[t.Text] = None, aws: t.Optional[t.Text] = None, resolve: bool = True, runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig: """ Create MLCube mlcube merging different configs - base, global, local and cli. Args: @@ -91,6 +91,7 @@ def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional # located inside workspace (internal or custom), users are encouraged not to use ${runtime.workspace} or # ${workspace} in their MLCube configuration files. mlcube_config['workspace'] = actual_workspace + mlcube_config['aws'] = aws # Merge, for instance, docker runner config from system settings with docker config from MLCube config. if runner_cls: runner_cls.CONFIG.merge(mlcube_config) diff --git a/mlcube/mlcube/runner.py b/mlcube/mlcube/runner.py index 51f42a10..97710ccf 100644 --- a/mlcube/mlcube/runner.py +++ b/mlcube/mlcube/runner.py @@ -50,5 +50,8 @@ def __init__(self, mlcube: t.Union[DictConfig, t.Dict], task: t.Optional[t.Text] def configure(self) -> None: ... + def upload(self) -> None: + ... + def run(self) -> None: ... diff --git a/runners/mlcube_docker/mlcube_docker/docker_run.py b/runners/mlcube_docker/mlcube_docker/docker_run.py index 53fbc499..8ce1de9a 100644 --- a/runners/mlcube_docker/mlcube_docker/docker_run.py +++ b/runners/mlcube_docker/mlcube_docker/docker_run.py @@ -1,13 +1,17 @@ import os import logging import typing as t -from omegaconf import (DictConfig, OmegaConf) +from omegaconf import DictConfig, OmegaConf from mlcube.shell import Shell -from mlcube.runner import (Runner, RunnerConfig) +from mlcube.runner import Runner, RunnerConfig from mlcube.errors import IllegalParameterValueError +import base64 +import json +import boto3 +import docker as docker_pkg -__all__ = ['Config', 'DockerRun'] +__all__ = ["Config", "DockerRun"] from mlcube.validate import Validate @@ -18,41 +22,43 @@ class Config(RunnerConfig): """ Helper class to manage `docker` environment configuration.""" class BuildStrategy(object): - PULL = 'pull' - AUTO = 'auto' - ALWAYS = 'always' + PULL = "pull" + AUTO = "auto" + ALWAYS = "always" @staticmethod def validate(build_strategy: t.Text) -> None: - if build_strategy not in ('pull', 'auto', 'always'): - raise IllegalParameterValueError('build_strategy', build_strategy, "['pull', 'auto', 'always']") + if build_strategy not in ("pull", "auto", "always"): + raise IllegalParameterValueError( + "build_strategy", build_strategy, "['pull', 'auto', 'always']" + ) - DEFAULT = OmegaConf.create({ - 'runner': 'docker', - - 'image': '${docker.image}', # Image name. - 'docker': 'docker', # Executable (docker, podman, sudo docker ...). - - 'env_args': {}, # Environmental variables for build and run actions. - - 'gpu_args': '', # Docker run arguments when accelerator_count > 0. - 'cpu_args': '', # Docker run arguments when accelerator_count == 0. - - 'build_args': {}, # Docker build arguments - 'build_context': '.', # Docker build context relative to $MLCUBE_ROOT. Default is $MLCUBE_ROOT. - 'build_file': 'Dockerfile', # Docker file relative to $MLCUBE_ROOT, default is: - # `$MLCUBE_ROOT/Dockerfile`. - 'build_strategy': 'pull', # How to configure MLCube: - # 'pull': never try to build, always pull - # 'auto': build if image not found and dockerfile found - # 'always': build even if image found - # TODO: The above variable may be confusing. Is `configure_strategy` better? Docker uses `--pull` - # switch as build arg to force pulling the base image. - }) + DEFAULT = OmegaConf.create( + { + "runner": "docker", + "image": "${docker.image}", # Image name. + "docker": "docker", # Executable (docker, podman, sudo docker ...). + "env_args": {}, # Environmental variables for build and run actions. + "gpu_args": "", # Docker run arguments when accelerator_count > 0. + "cpu_args": "", # Docker run arguments when accelerator_count == 0. + "build_args": {}, # Docker build arguments + "build_context": ".", # Docker build context relative to $MLCUBE_ROOT. Default is $MLCUBE_ROOT. + "build_file": "Dockerfile", # Docker file relative to $MLCUBE_ROOT, default is: + # `$MLCUBE_ROOT/Dockerfile`. + "build_strategy": "pull", # How to configure MLCube: + # 'pull': never try to build, always pull + # 'auto': build if image not found and dockerfile found + # 'always': build even if image found + # TODO: The above variable may be confusing. Is `configure_strategy` better? Docker uses `--pull` + # switch as build arg to force pulling the base image. + } + ) @staticmethod def merge(mlcube: DictConfig) -> None: - mlcube.runner = OmegaConf.merge(mlcube.runner, mlcube.get('docker', OmegaConf.create({}))) + mlcube.runner = OmegaConf.merge( + mlcube.runner, mlcube.get("docker", OmegaConf.create({})) + ) @staticmethod def validate(mlcube: DictConfig) -> None: @@ -63,15 +69,20 @@ def validate(mlcube: DictConfig) -> None: Initialized configuration. """ # Make sure all parameters present with their default values. - validator = Validate(mlcube.runner, 'runner') - _ = validator.check_unknown_keys(Config.DEFAULT.keys())\ - .check_values(['image', 'docker', 'build_strategy'], str, blanks=False) + validator = Validate(mlcube.runner, "runner") + _ = validator.check_unknown_keys(Config.DEFAULT.keys()).check_values( + ["image", "docker", "build_strategy"], str, blanks=False + ) Config.BuildStrategy.validate(mlcube.runner.build_strategy) if isinstance(mlcube.runner.build_args, DictConfig): - mlcube.runner.build_args = Shell.to_cli_args(mlcube.runner.build_args, parent_arg='--build-arg') + mlcube.runner.build_args = Shell.to_cli_args( + mlcube.runner.build_args, parent_arg="--build-arg" + ) if isinstance(mlcube.runner.env_args, DictConfig): - mlcube.runner.env_args = Shell.to_cli_args(mlcube.runner.env_args, parent_arg='-e') + mlcube.runner.env_args = Shell.to_cli_args( + mlcube.runner.env_args, parent_arg="-e" + ) class DockerRun(Runner): @@ -82,10 +93,143 @@ class DockerRun(Runner): def __init__(self, mlcube: t.Union[DictConfig, t.Dict], task: t.Text) -> None: super().__init__(mlcube, task) + # get AWS credentials + aws_credentials = self.read_aws_credentials(self.mlcube.aws) + access_key_id = aws_credentials["access_key_id"] + secret_access_key = aws_credentials["secret_access_key"] + aws_region = aws_credentials["region"] + + # get AWS ECR login token + self.ecr_client = boto3.client( + "ecr", + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + def list_aws_repositories(self): + """List existing repositories (images) on ECR""" + try: + response = self.ecr_client.describe_repositories(maxResults=1000)[ + "repositories" + ] + list_repos = [x["repositoryName"] for x in response] + return list_repos + except: + msg = "ECR client failed, make sure you provided the correct credentials" + raise RuntimeError(msg) + + @staticmethod + def read_aws_credentials(filename): + """Read AWS credentials from file. + + :param filename: Credentials filename + :param filename: str, optional + :return: Dictionary of AWS credentials. + :rtype: Dict[str, str] + """ + + try: + with open(filename) as json_data: + credentials = json.load(json_data) + + for variable in ("access_key_id", "secret_access_key", "region"): + if variable not in credentials.keys(): + msg = '"{}" cannot be found in {}'.format(variable, filename) + raise KeyError(msg) + + except FileNotFoundError: + try: + credentials = { + "access_key_id": os.environ["AWS_ACCESS_KEY_ID"], + "secret_access_key": os.environ["AWS_SECRET_ACCESS_KEY"], + "region": os.environ["AWS_REGION"], + } + except KeyError: + msg = "no AWS credentials found in file or environment variables" + raise RuntimeError(msg) + + return credentials + + def upload(self) -> None: + + # build Docker image + docker_client = docker_pkg.from_env() + self.configure() + + ecr_credentials = self.ecr_client.get_authorization_token()[ + "authorizationData" + ][0] + + ecr_username = "AWS" + + ecr_password = ( + base64.b64decode(ecr_credentials["authorizationToken"]) + .replace(b"AWS:", b"") + .decode("utf-8") + ) + + ecr_url = ecr_credentials["proxyEndpoint"] + + print("docker logging....") + # get Docker to login/authenticate with ECR + response = docker_client.login( + username=ecr_username, password=ecr_password, registry=ecr_url + ) + + print(response["Status"]) + + image_name = self.mlcube.docker.image.split(":")[0] + image_tag = self.mlcube.docker.image.split(":")[1] + # tag image for AWS ECR + ecr_repo_name = "{}/{}".format( + ecr_url.replace("https://", ""), self.mlcube.docker.image + ) + + list_repos = self.list_aws_repositories() + if image_name not in list_repos: + print("Creating repository...") + response = self.ecr_client.create_repository(repositoryName=image_name) + if response["ResponseMetadata"]["HTTPStatusCode"] == 200: + print("Repository created with URI:") + print(response["repository"]["repositoryUri"]) + else: + print("ERROR while creating repository") + + print("tagging image...") + image = docker_client.images.get(self.mlcube.docker.image) + image.tag(ecr_repo_name, tag=image_tag) + + # push image to AWS ECR + print("pushing image...") + print(self.mlcube.docker.image) + print(ecr_repo_name) + docker: t.Text = self.mlcube.runner.docker + Shell.run( + [ + "echo", + ecr_password, + "|", + docker, + "login", + ecr_url, + "--username", + ecr_username, + "--password-stdin", + "&&", + docker, + "push", + ecr_repo_name, + ], + on_error="raise", + ) + def configure(self) -> None: """Build Docker image on a current host.""" image: t.Text = self.mlcube.runner.image - context: t.Text = os.path.join(self.mlcube.runtime.root, self.mlcube.runner.build_context) + context: t.Text = os.path.join( + self.mlcube.runtime.root, self.mlcube.runner.build_context + ) recipe: t.Text = os.path.join(context, self.mlcube.runner.build_file) docker: t.Text = self.mlcube.runner.docker @@ -93,18 +237,32 @@ def configure(self) -> None: build_strategy: t.Text = self.mlcube.runner.build_strategy build_recipe_exists: bool = os.path.exists(recipe) if build_strategy == Config.BuildStrategy.PULL or not build_recipe_exists: - logger.info("Will pull image (%s) because (build_strategy=%s, build_recipe_exists=%r)", - image, build_strategy, build_recipe_exists) - Shell.run([docker, 'pull', image], on_error='raise') + logger.info( + "Will pull image (%s) because (build_strategy=%s, build_recipe_exists=%r)", + image, + build_strategy, + build_recipe_exists, + ) + Shell.run([docker, "pull", image], on_error="raise") if build_recipe_exists: - logger.warning("Docker recipe exists (%s), but your build strategy is '%s', and so the image has been " - "pulled, not built. Make sure your image is up-to-data with your source code.", - recipe, build_strategy) + logger.warning( + "Docker recipe exists (%s), but your build strategy is '%s', and so the image has been " + "pulled, not built. Make sure your image is up-to-data with your source code.", + recipe, + build_strategy, + ) else: - logger.info("Will build image (%s) because (build_strategy=%s, build_recipe_exists=%r)", - image, build_strategy, build_recipe_exists) + logger.info( + "Will build image (%s) because (build_strategy=%s, build_recipe_exists=%r)", + image, + build_strategy, + build_recipe_exists, + ) build_args: t.Text = self.mlcube.runner.build_args - Shell.run([docker, 'build', build_args, '-t', image, '-f', recipe, context], on_error='raise') + Shell.run( + [docker, "build", build_args, "-t", image, "-f", recipe, context], + on_error="raise", + ) def run(self) -> None: """ Run a cube. """ @@ -112,18 +270,30 @@ def run(self) -> None: image: t.Text = self.mlcube.runner.image build_strategy: t.Text = self.mlcube.runner.build_strategy - if build_strategy == Config.BuildStrategy.ALWAYS or not Shell.docker_image_exists(docker, image): - logger.warning("Docker image (%s) does not exist or build strategy is 'always'. " - "Will run 'configure' phase.", image) + if ( + build_strategy == Config.BuildStrategy.ALWAYS + or not Shell.docker_image_exists(docker, image) + ): + logger.warning( + "Docker image (%s) does not exist or build strategy is 'always'. " + "Will run 'configure' phase.", + image, + ) try: self.configure() except RuntimeError: - context: t.Text = os.path.join(self.mlcube.runtime.root, self.mlcube.runner.build_context) + context: t.Text = os.path.join( + self.mlcube.runtime.root, self.mlcube.runner.build_context + ) recipe: t.Text = os.path.join(context, self.mlcube.runner.build_file) - if build_strategy == Config.BuildStrategy.PULL and os.path.exists(recipe): - logger.warning("MLCube configuration failed. Docker recipe (%s) exists, but your build strategy is " - "set to pull. Rerun with: -Prunner.build_strategy=auto to build image locally.", - recipe) + if build_strategy == Config.BuildStrategy.PULL and os.path.exists( + recipe + ): + logger.warning( + "MLCube configuration failed. Docker recipe (%s) exists, but your build strategy is " + "set to pull. Rerun with: -Prunner.build_strategy=auto to build image locally.", + recipe, + ) raise # Deal with user-provided workspace Shell.sync_workspace(self.mlcube, self.task) @@ -132,9 +302,13 @@ def run(self) -> None: mounts, task_args = Shell.generate_mounts_and_args(self.mlcube, self.task) logger.info(f"mounts={mounts}, task_args={task_args}") - volumes = Shell.to_cli_args(mounts, sep=':', parent_arg='--volume') + volumes = Shell.to_cli_args(mounts, sep=":", parent_arg="--volume") env_args = self.mlcube.runner.env_args - num_gpus: int = self.mlcube.platform.get('accelerator_count', None) or 0 + num_gpus: int = self.mlcube.platform.get("accelerator_count", None) or 0 run_args: t.Text = self.mlcube.runner.cpu_args if num_gpus == 0 else self.mlcube.runner.gpu_args - Shell.run([docker, 'run', run_args, env_args, volumes, image, ' '.join(task_args)], on_error='raise') + Shell.run( + [docker, "run", run_args, env_args, volumes, image, " ".join(task_args)], + on_error="raise", + ) +