Skip to content

Commit

Permalink
Feature/local deployment (#8)
Browse files Browse the repository at this point in the history
* add prediction logging func to api

* Add local deployment for train pipeline

* reset python <3.10 type hints for typer

* added serve for local deployment

* Update readme
  • Loading branch information
azuur authored Jan 20, 2024
1 parent bb8fe1b commit 52cab8f
Show file tree
Hide file tree
Showing 14 changed files with 636 additions and 75 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Simple ML pipeline repo for experimenting with CI/CD / DevOps / MLOps.
~~- Use Python 3.11 in CI github action~~
~~- make pipelines functions~~
~~- add loggers to stuff~~
- add db conn / func to save inference cases
- add deployment code...
- add versioning to training... in deployment?
~~- add local deployment code...~~
~~- add versioning to training... in deployment?~~
- add eval pipeline, model comparison
- add "best model" mark. add "get_best_model"
- add db conn / func to save inference cases (local deployment)
- add Dockerfile
- add build script to push to ECR (AWS deployment)
- add rest of AWS deployment (using S3, EC2, AWS CodePipeline)
100 changes: 100 additions & 0 deletions ml_pipelines/deployment/local/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
import os
import pickle
from pathlib import Path
from uuid import uuid4

import pandas as pd
from matplotlib.figure import Figure
from sklearn.linear_model import LogisticRegression

from ml_pipelines.logic.common.feature_eng import FeatureEngineeringParams
from ml_pipelines.pipeline.train_pipeline import TrainArtifacts


def make_version(prefix: str) -> str:
version = str(uuid4())
return f"{prefix}_{version}"


def get_raw_data(version: str, root_path: os.PathLike) -> pd.DataFrame:
return pd.read_csv(Path(root_path) / version / "raw_data.csv")


class ModelVersionAlreadyExists(Exception):
pass


def save_train_artifacts(
version: str,
root_path: os.PathLike,
artifacts: TrainArtifacts,
):
version_dir = Path(root_path) / version
try:
version_dir.mkdir(parents=True)
except FileExistsError:
raise ModelVersionAlreadyExists()

with open(version_dir / "model.pickle", "wb") as f:
pickle.dump(artifacts["model"], f)
with open(version_dir / "feature_eng_params.json", "w") as f: # type: ignore
f.write(artifacts["feature_eng_params"].json())

data_keys = [
"raw_train_data",
"raw_test_data",
"train_data",
"test_data",
]
for key in data_keys:
if key in artifacts:
dataset = artifacts[key] # type: ignore
dataset.to_csv(version_dir / f"{key}.csv", index=False)


def get_train_artifacts(version: str, root_path: os.PathLike, load_data: bool = True):
version_dir = Path(root_path) / version
with open(version_dir / "model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open(version_dir / "feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

if not load_data:
return TrainArtifacts(
model=model,
feature_eng_params=feature_eng_params,
)

raw_train_data = pd.read_csv(version_dir / "raw_train_data.csv")
raw_test_data = pd.read_csv(version_dir / "raw_test_data.csv")
train_data = pd.read_csv(version_dir / "train_data.csv")
test_data = pd.read_csv(version_dir / "test_data.csv")

return TrainArtifacts(
model=model,
feature_eng_params=feature_eng_params,
raw_train_data=raw_train_data,
raw_test_data=raw_test_data,
train_data=train_data,
test_data=test_data,
)


def save_eval_artifacts(
version: str, root_path: os.PathLike, metrics: float, plots: Figure
):
version_dir = Path(root_path) / version
with open(version_dir / "metrics.txt", "w") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig(str(version_dir / "calibration_plot.png"))


def get_latest_version(root_path: os.PathLike, filename: str) -> str:
root_dir = Path(root_path)
versions: list[tuple[str, float]] = []
for version_dir in root_dir.iterdir():
st_mtime = (version_dir / filename).stat().st_mtime
versions.append((version_dir.stem, st_mtime))
return max(versions, key=lambda t: t[1])[0]
Empty file.
Empty file.
64 changes: 64 additions & 0 deletions ml_pipelines/deployment/local/serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging
import os
import sys
from logging import Logger
from typing import Union

import typer
import uvicorn

from ml_pipelines.deployment.local.common import get_latest_version, get_train_artifacts
from ml_pipelines.logic.serve.serve import Point, create_fastapi_app


def prediction_logging_func(predictions: list[tuple[Point, float]]):
print("yay") # noqa: T201
return True, None


def run_serve(
train_version: str,
train_artifacts_root_path: os.PathLike, # type: ignore
logger: Logger,
uvicorn_kwargs: dict,
):
logger.info(f"Serving model {train_version}.")
train_artifacts = get_train_artifacts(
train_version, train_artifacts_root_path, load_data=False
)
model = train_artifacts["model"]
feature_eng_params = train_artifacts["feature_eng_params"]
app = create_fastapi_app(model, feature_eng_params, logger, prediction_logging_func)
logger.info("Loaded model, set up endpoint.")

uvicorn.run(app=app, **uvicorn_kwargs)


if __name__ == "__main__":
from dotenv import load_dotenv

load_dotenv()
TRAIN_ARTIFACTS_ROOT_DIR = os.environ["TRAIN_ARTIFACTS_ROOT_DIR"]

def main(
train_version: Union[str, None] = None, # noqa: UP007
train_artifacts_root_path: str = TRAIN_ARTIFACTS_ROOT_DIR,
):
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))

if train_version is None:
train_version = get_latest_version(
train_artifacts_root_path, # type: ignore
"model.pickle",
)

uvicorn_kwargs: dict = {}
run_serve( # noqa: PLR0913
train_version=train_version,
train_artifacts_root_path=train_artifacts_root_path, # type: ignore
logger=logger,
uvicorn_kwargs=uvicorn_kwargs,
)

typer.run(main)
81 changes: 81 additions & 0 deletions ml_pipelines/deployment/local/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
import sys
from logging import Logger
from typing import Union

import typer

from ml_pipelines.deployment.local.common import (
get_latest_version,
get_raw_data,
make_version,
save_eval_artifacts,
save_train_artifacts,
)
from ml_pipelines.pipeline.eval_pipeline import eval_pipeline
from ml_pipelines.pipeline.train_pipeline import train_pipeline


def run_train_pipeline( # noqa: PLR0913
raw_data_version: str,
raw_data_root_path: os.PathLike,
train_version: str,
train_artifacts_root_path: os.PathLike,
logger: Logger,
split_random_state: int = 3825,
):
logger.info(f"Running full training pipeline version {train_version}.")
logger.info(f"Raw data version {raw_data_version}.")
raw_data = get_raw_data(raw_data_version, raw_data_root_path)
train_artifacts = train_pipeline(
raw_data, split_random_state=split_random_state, logger=logger
)
save_train_artifacts(train_version, train_artifacts_root_path, train_artifacts)
logger.info("Saved train artifacts.")
metrics, plots = eval_pipeline(
train_artifacts["model"],
train_artifacts["feature_eng_params"],
train_artifacts["raw_test_data"],
logger,
)
save_eval_artifacts(train_version, train_artifacts_root_path, metrics, plots)
logger.info("Saved eval artifacts.")


if __name__ == "__main__":
from dotenv import load_dotenv

load_dotenv()
RAW_DATA_ROOT_DIR = os.environ["RAW_DATA_ROOT_DIR"]
TRAIN_ARTIFACTS_ROOT_DIR = os.environ["TRAIN_ARTIFACTS_ROOT_DIR"]

def main(
raw_data_version: Union[str, None] = None, # noqa: UP007
train_version: Union[str, None] = None, # noqa: UP007
raw_data_root_path: str = RAW_DATA_ROOT_DIR,
train_artifacts_root_path: str = TRAIN_ARTIFACTS_ROOT_DIR,
split_random_state: int = 3825,
):
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))

if raw_data_version is None:
raw_data_version = get_latest_version(
raw_data_root_path, # type: ignore
"raw_data.csv",
)

if train_version is None:
train_version = make_version(prefix="model")

run_train_pipeline( # noqa: PLR0913
raw_data_version=raw_data_version,
raw_data_root_path=raw_data_root_path, # type: ignore
train_version=train_version,
train_artifacts_root_path=train_artifacts_root_path, # type: ignore
logger=logger,
split_random_state=split_random_state,
)

typer.run(main)
2 changes: 1 addition & 1 deletion ml_pipelines/logic/eval/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def calculate_metrics(y_true: pd.Series, y_score: np.ndarray, logger: Logger):
logger.info(f"Calculating metrics for {len(y_true)} samples.")
return roc_auc_score(y_true, y_score)
return float(roc_auc_score(y_true, y_score))


def make_roc_plot(model: LogisticRegression, data: pd.DataFrame, logger: Logger):
Expand Down
36 changes: 26 additions & 10 deletions ml_pipelines/logic/serve/serve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Callable
from logging import Logger

import pandas as pd
Expand All @@ -18,39 +19,54 @@ class Point(BaseModel):
x2: float


PredictionLoggingFunc = Callable[
[list[tuple[Point, float]]], tuple[bool, Exception | None]
]


def create_fastapi_app(
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
logger: Logger,
prediction_logging_func: PredictionLoggingFunc,
):
app = FastAPI()

@app.get("/predict/")
@app.post("/predict/")
async def _(inputs: Point | list[Point]):
"""
Endpoint to predict probabilities for the given list of points.
:param points: List of points (x1, x2) (or single point)
:return: List of probability predictions (or single prediction)
"""
try:
logger.info("Responding to request for predictions. (GET /predict/)")
single = False
if isinstance(inputs, Point):
single = True
inputs = [inputs]
# Convert the input points to a numpy array
logger.info(f"Number of samples for inference: {len(inputs)}")

x_matrix = pd.DataFrame([{"X1": p.x1, "X2": p.x2} for p in inputs])
x_matrix = transform_features(x_matrix, feature_eng_params, logger)
predictions: list[float] = predict(model, x_matrix, logger).tolist()
logger.info("Computed inference.")

# Make predictions using the pre-trained model
predictions = predict(model, x_matrix, logger).tolist()
predictions_to_log = list(zip(inputs, predictions))
log_success, log_exception = prediction_logging_func(predictions_to_log)
if log_success and log_exception is None:
logger.info("Logged predictions for samples.")
else:
logger.warning(
"Unable to log predictions for samples.", exc_info=log_exception
)

if single:
predictions = predictions[0]
output = predictions[0] if single else predictions
logger.info("Returning predictions.")
return {"predictions": output}

return {"predictions": predictions}

except Exception:
except Exception as e:
logger.error("Inference failed.", exc_info=e)
raise HTTPException(status_code=500, detail="Internal server error")

return app
return app
4 changes: 3 additions & 1 deletion ml_pipelines/logic/train/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

def split_data(data: pd.DataFrame, random_state: int, logger: Logger):
logger.info(f"Splitting data for train/test. {random_state = }")
train_data: pd.DataFrame
test_data: pd.DataFrame
train_data, test_data = train_test_split(
data,
train_size=0.8,
random_state=random_state,
stratify=data["Y"],
shuffle=True,
)
) # type: ignore
return train_data, test_data


Expand Down
2 changes: 1 addition & 1 deletion ml_pipelines/pipeline/data_gen_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ml_pipelines.logic.common.dgp import generate_raw_data

data = generate_raw_data(10_000, 813)
data.to_csv("data.csv", index=False)
data.to_csv("raw_data.csv", index=False)
20 changes: 0 additions & 20 deletions ml_pipelines/pipeline/eval_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import json
import logging
import pickle
import sys
from logging import Logger

import matplotlib.pyplot as plt
Expand Down Expand Up @@ -41,22 +37,6 @@ def eval_pipeline(
return (metrics, plots)


with open("model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open("feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

test_data = pd.read_csv("raw_test_data.csv")
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
(metrics, plots) = eval_pipeline(model, feature_eng_params, test_data, logger)

with open("metrics.txt", "w+") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig("calibration_plot.png")


# fig, ax = plt.subplots()
# for i in [0, 1]:
# tmp = test_data.loc[test_data.Y == i, :]
Expand Down
Loading

0 comments on commit 52cab8f

Please sign in to comment.