Skip to content

Commit

Permalink
Update all except serve
Browse files Browse the repository at this point in the history
  • Loading branch information
azuur committed Feb 5, 2024
1 parent f4b5d8c commit bdaadb3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 74 deletions.
6 changes: 6 additions & 0 deletions ml_pipelines/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import typer

from ml_pipelines.deployment.aws.eval import main as eval_aws
from ml_pipelines.deployment.aws.serve import main as serve_aws
from ml_pipelines.deployment.aws.train import main as train_aws
from ml_pipelines.deployment.local.eval import main as eval_local
from ml_pipelines.deployment.local.serve import main as serve_local
from ml_pipelines.deployment.local.train import main as train_local
Expand All @@ -9,5 +12,8 @@
app.command("train_local")(train_local)
app.command("eval_local")(eval_local)
app.command("serve_local")(serve_local)
app.command("train_aws")(train_aws)
app.command("eval_aws")(eval_aws)
app.command("serve_aws")(serve_aws)

app()
43 changes: 21 additions & 22 deletions ml_pipelines/deployment/aws/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,73 @@

import typer

from ml_pipelines.deployment.common.eval import run_eval_comparison_pipeline
from ml_pipelines.deployment.local.io import (
from ml_pipelines.deployment.aws.io import (
get_all_available_train_versions,
get_latest_version,
get_raw_data,
get_train_artifacts,
tag_best_version,
)
from ml_pipelines.deployment.common.eval import run_eval_comparison_pipeline


def main(
raw_data_root_path: Union[str, None] = None, # noqa: UP007
train_artifacts_root_path: Union[str, None] = None, # noqa: UP007, E501
raw_data_bucket: Union[str, None] = None, # noqa: UP007
train_artifacts_bucket: Union[str, None] = None, # noqa: UP007, E501
raw_data_version: Union[str, None] = None, # noqa: UP007
train_versions: Union[list[str], None] = None, # noqa: UP007
tag_best_model: bool = False,
):
"""
Runs the model evaluation and comparison pipeline using local paths
Runs the model evaluation and comparison pipeline using AWS S3 buckets
for inputs and outputs.
If `raw_data_root_path` is null, the command searches for the RAW_DATA_ROOT_PATH
environment variable, and if not present, assumes this to be "/".
If `raw_data_bucket` is null, the command searches for the RAW_DATA_BUCKET
environment variable.
If `train_artifacts_root_path` is null, the command searches for the
TRAIN_ARTIFACTS_ROOT_PATH environment variable, and if not present,
assumes this to be "/".
If `train_artifacts_bucket` is null, the command searches for the
TRAIN_ARTIFACTS_BUCKET environment variable.
If `raw_data_version` is null, the command searches for the latest version in
`raw_data_root_path`.
`raw_data_bucket`.
If `train_versions` is null or empty, the command automatically evaluates all
models found in `train_artifacts_root_path`.
models found in `train_artifacts_bucket`.
If `tag_best_model` is set (to true) and more than one model version is evaluated,
the best performing one is tagged as the best version.
"""
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))

if raw_data_root_path is None:
raw_data_root_path = os.environ.get("RAW_DATA_ROOT_PATH", "/")
if train_artifacts_root_path is None:
train_artifacts_root_path = os.environ.get("TRAIN_ARTIFACTS_ROOT_PATH", "/")
if raw_data_bucket is None:
raw_data_bucket = os.environ["RAW_DATA_BUCKET"]
if train_artifacts_bucket is None:
train_artifacts_bucket = os.environ["TRAIN_ARTIFACTS_BUCKET"]

if raw_data_version is None:
raw_data_version = get_latest_version(
raw_data_root_path, # type: ignore
raw_data_bucket, # type: ignore
"raw_data.csv",
)

if not train_versions:
train_versions = get_all_available_train_versions( # type: ignore
train_artifacts_root_path # type: ignore
train_versions = get_all_available_train_versions(
train_artifacts_bucket # type: ignore
)

get_raw_data_func = partial(
get_raw_data,
raw_data_root_path, # type: ignore
raw_data_bucket, # type: ignore
)
get_train_artifacts_func = partial(
get_train_artifacts,
train_artifacts_root_path, # type: ignore
train_artifacts_bucket, # type: ignore
load_data=False,
)
tag_best_version_func = partial(
tag_best_version,
train_artifacts_root_path, # type: ignore
train_artifacts_bucket, # type: ignore
)

run_eval_comparison_pipeline( # noqa: PLR0913
Expand Down
108 changes: 61 additions & 47 deletions ml_pipelines/deployment/aws/io.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
import os
import pickle
from io import StringIO
from pathlib import Path
from io import BytesIO, StringIO
from tempfile import NamedTemporaryFile
from uuid import uuid4

Expand Down Expand Up @@ -59,14 +57,18 @@ def save_train_artifacts(
raise ModelVersionAlreadyExists()

model_pickle_key = s3_prefix + "model.pickle"
with NamedTemporaryFile() as f:
pickle.dump(artifacts["model"], f)
s3.upload_file(f.name, bucket_name, model_pickle_key)
s3.upload_fileobj(
Fileobj=BytesIO(pickle.dumps(artifacts["model"])),
Bucket=bucket_name,
Key=model_pickle_key,
)

feature_eng_params_key = s3_prefix + "feature_eng_params.json"
with NamedTemporaryFile(mode="w") as f:
f.write(artifacts["feature_eng_params"].json())
s3.upload_file(f.name, bucket_name, feature_eng_params_key)
s3.upload_fileobj(
Fileobj=BytesIO(artifacts["feature_eng_params"].json().encode()),
Bucket=bucket_name,
Key=feature_eng_params_key,
)

data_keys = ["raw_train_data", "raw_test_data", "train_data", "test_data"]
for key in data_keys:
Expand All @@ -84,16 +86,14 @@ def get_train_artifacts(bucket_name: str, version: str, load_data: bool = True):
s3_prefix = f"{version}/"

model_key = s3_prefix + "model.pickle"
s3.download_file(bucket_name, model_key, "/tmp/model.pickle")
with open("/tmp/model.pickle", "rb") as f:
model: LogisticRegression = pickle.load(f)

feature_eng_params_key = s3_prefix + "feature_eng_params.json"
s3.download_file(
bucket_name, feature_eng_params_key, "/tmp/feature_eng_params.json"
model: LogisticRegression = pickle.load(
s3.get_object(Bucket=bucket_name, Key=model_key)["Body"]
)
with open("/tmp/feature_eng_params.json") as f:
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

feature_eng_params_key = s3_prefix + "feature_eng_params.json"
fep = s3.get_object(Bucket=bucket_name, Key=feature_eng_params_key)["Body"].read()
feature_eng_params = FeatureEngineeringParams(**json.loads(fep.decode()))

if not load_data:
return TrainArtifacts(
Expand All @@ -107,9 +107,7 @@ def get_train_artifacts(bucket_name: str, version: str, load_data: bool = True):
csv_key = s3_prefix + f"{key}.csv"
data_dict[key] = pd.read_csv(
StringIO(
s3.get_object(Bucket=bucket_name, Key=csv_key)["Body"]
.read()
.decode("utf-8")
s3.get_object(Bucket=bucket_name, Key=csv_key)["Body"].read().decode()
)
)

Expand All @@ -129,56 +127,72 @@ def save_eval_artifacts(bucket_name: str, version: str, metrics: float, plots: F
s3_prefix = f"{version}/"

metrics_key = s3_prefix + "metrics.txt"
with open("/tmp/metrics.txt", "w") as f:
f.write(str(metrics))
s3.upload_file("/tmp/metrics.txt", bucket_name, metrics_key)
s3.upload_fileobj(
Fileobj=BytesIO(str(metrics).encode()),
Bucket=bucket_name,
Key=metrics_key,
)

plot_key = s3_prefix + "calibration_plot.png"
plots.savefig("/tmp/calibration_plot.png")
s3.upload_file("/tmp/calibration_plot.png", bucket_name, plot_key)
buf = BytesIO()
plots.savefig(buf)
buf.seek(0)
s3.upload_fileobj(
Fileobj=buf,
Bucket=bucket_name,
Key=plot_key,
)


def get_all_available_train_versions(bucket_name: str):
s3 = boto3.client("s3")

# List objects in the specified prefix
objects = s3.list_objects(Bucket=bucket_name, Prefix="")
version_names = [Path(obj["Key"]).stem for obj in objects.get("Contents", [])]
return version_names

return list(
{
obj["Key"].split("/")[0]
for obj in objects.get("Contents", [])
if len(obj["Key"].split("/")) == 2 # noqa: PLR2004
}
)


def get_latest_version(bucket_name: str, filename: str) -> str:
s3 = boto3.client("s3")

# List objects in the specified prefix
objects = s3.list_objects(Bucket=bucket_name, Prefix="")

dirs = {
obj["Key"].split("/")[0]
for obj in objects.get("Contents", [])
if obj["Size"] == 0 and obj["Key"].endswith("/")
}
depth = 2
versions = [
(obj["Key"].split("/")[0], obj["LastModified"])
for obj in objects.get("Contents", [])
if obj["Key"].endswith(filename) and obj["Key"].split("/")[0] in dirs
if obj["Key"].endswith(filename) and len(obj["Key"].split("/")) == depth
]

return max(versions, key=lambda t: t[1])[0]


def get_best_version(train_artifacts_root_path: os.PathLike):
train_dir = Path(train_artifacts_root_path)
if "best_model" not in set(f for f in train_dir.iterdir() if f.is_file()):
def get_best_version(bucket_name: str):
s3 = boto3.client("s3")
objects = s3.list_objects(Bucket=bucket_name, Prefix="").get("Contents", [])
keys = {o["Key"] for o in objects}
if "best_model" not in keys:
return None
with open(train_dir / "best_model") as f:
return f.read()
return (
s3.get_object(Bucket=bucket_name, Key="best_model")["Body"]
.read()
.decode("utf-8")
)


def tag_best_version(train_version: str, train_artifacts_root_path: os.PathLike):
train_dir = Path(train_artifacts_root_path)
with open(train_dir / "best_model", "w") as f:
f.write(train_version)
def tag_best_version(bucket_name: str, train_version: str):
s3 = boto3.client("s3")
s3.upload_fileobj(
Fileobj=BytesIO(train_version.encode()),
Bucket=bucket_name,
Key="best_model",
)


def prediction_logging_func(predictions: list[tuple[Point, float]]):
print("yay") # noqa: T201
print("logged preds!") # noqa: T201
return True, None
9 changes: 4 additions & 5 deletions ml_pipelines/deployment/aws/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ def main(
Runs the feature engineering and training pipeline using local paths
for inputs and outputs.
If `raw_data_root_path` is null, the command searches for the RAW_DATA_ROOT_PATH
environment variable, and if not present, assumes this to be "/".
If `raw_data_bucket` is null, the command searches for the RAW_DATA_BUCKET
environment variable.
If `train_artifacts_root_path` is null, the command searches for the
TRAIN_ARTIFACTS_ROOT_PATH environment variable, and if not present,
assumes this to be "/".
If `train_artifacts_bucket` is null, the command searches for the
TRAIN_ARTIFACTS_BUCKET environment variable.
If `raw_data_version` is null, the command searches for the latest version in
`raw_data_root_path`.
Expand Down

0 comments on commit bdaadb3

Please sign in to comment.