Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/local deployment #8

Merged
merged 5 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Simple ML pipeline repo for experimenting with CI/CD / DevOps / MLOps.
~~- Use Python 3.11 in CI github action~~
~~- make pipelines functions~~
~~- add loggers to stuff~~
- add db conn / func to save inference cases
- add deployment code...
- add versioning to training... in deployment?
~~- add local deployment code...~~
~~- add versioning to training... in deployment?~~
- add eval pipeline, model comparison
- add "best model" mark. add "get_best_model"
- add db conn / func to save inference cases (local deployment)
- add Dockerfile
- add build script to push to ECR (AWS deployment)
- add rest of AWS deployment (using S3, EC2, AWS CodePipeline)
100 changes: 100 additions & 0 deletions ml_pipelines/deployment/local/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
import os
import pickle
from pathlib import Path
from uuid import uuid4

import pandas as pd
from matplotlib.figure import Figure
from sklearn.linear_model import LogisticRegression

from ml_pipelines.logic.common.feature_eng import FeatureEngineeringParams
from ml_pipelines.pipeline.train_pipeline import TrainArtifacts


def make_version(prefix: str) -> str:
version = str(uuid4())
return f"{prefix}_{version}"


def get_raw_data(version: str, root_path: os.PathLike) -> pd.DataFrame:
return pd.read_csv(Path(root_path) / version / "raw_data.csv")


class ModelVersionAlreadyExists(Exception):
pass


def save_train_artifacts(
version: str,
root_path: os.PathLike,
artifacts: TrainArtifacts,
):
version_dir = Path(root_path) / version
try:
version_dir.mkdir(parents=True)
except FileExistsError:
raise ModelVersionAlreadyExists()

with open(version_dir / "model.pickle", "wb") as f:
pickle.dump(artifacts["model"], f)
with open(version_dir / "feature_eng_params.json", "w") as f: # type: ignore
f.write(artifacts["feature_eng_params"].json())

data_keys = [
"raw_train_data",
"raw_test_data",
"train_data",
"test_data",
]
for key in data_keys:
if key in artifacts:
dataset = artifacts[key] # type: ignore
dataset.to_csv(version_dir / f"{key}.csv", index=False)


def get_train_artifacts(version: str, root_path: os.PathLike, load_data: bool = True):
version_dir = Path(root_path) / version
with open(version_dir / "model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open(version_dir / "feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

if not load_data:
return TrainArtifacts(
model=model,
feature_eng_params=feature_eng_params,
)

raw_train_data = pd.read_csv(version_dir / "raw_train_data.csv")
raw_test_data = pd.read_csv(version_dir / "raw_test_data.csv")
train_data = pd.read_csv(version_dir / "train_data.csv")
test_data = pd.read_csv(version_dir / "test_data.csv")

return TrainArtifacts(
model=model,
feature_eng_params=feature_eng_params,
raw_train_data=raw_train_data,
raw_test_data=raw_test_data,
train_data=train_data,
test_data=test_data,
)


def save_eval_artifacts(
version: str, root_path: os.PathLike, metrics: float, plots: Figure
):
version_dir = Path(root_path) / version
with open(version_dir / "metrics.txt", "w") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig(str(version_dir / "calibration_plot.png"))


def get_latest_version(root_path: os.PathLike, filename: str) -> str:
root_dir = Path(root_path)
versions: list[tuple[str, float]] = []
for version_dir in root_dir.iterdir():
st_mtime = (version_dir / filename).stat().st_mtime
versions.append((version_dir.stem, st_mtime))
return max(versions, key=lambda t: t[1])[0]
Empty file.
Empty file.
64 changes: 64 additions & 0 deletions ml_pipelines/deployment/local/serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging
import os
import sys
from logging import Logger
from typing import Union

import typer
import uvicorn

from ml_pipelines.deployment.local.common import get_latest_version, get_train_artifacts
from ml_pipelines.logic.serve.serve import Point, create_fastapi_app


def prediction_logging_func(predictions: list[tuple[Point, float]]):
print("yay") # noqa: T201
return True, None


def run_serve(
train_version: str,
train_artifacts_root_path: os.PathLike, # type: ignore
logger: Logger,
uvicorn_kwargs: dict,
):
logger.info(f"Serving model {train_version}.")
train_artifacts = get_train_artifacts(
train_version, train_artifacts_root_path, load_data=False
)
model = train_artifacts["model"]
feature_eng_params = train_artifacts["feature_eng_params"]
app = create_fastapi_app(model, feature_eng_params, logger, prediction_logging_func)
logger.info("Loaded model, set up endpoint.")

uvicorn.run(app=app, **uvicorn_kwargs)


if __name__ == "__main__":
from dotenv import load_dotenv

load_dotenv()
TRAIN_ARTIFACTS_ROOT_DIR = os.environ["TRAIN_ARTIFACTS_ROOT_DIR"]

def main(
train_version: Union[str, None] = None, # noqa: UP007
train_artifacts_root_path: str = TRAIN_ARTIFACTS_ROOT_DIR,
):
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))

if train_version is None:
train_version = get_latest_version(
train_artifacts_root_path, # type: ignore
"model.pickle",
)

uvicorn_kwargs: dict = {}
run_serve( # noqa: PLR0913
train_version=train_version,
train_artifacts_root_path=train_artifacts_root_path, # type: ignore
logger=logger,
uvicorn_kwargs=uvicorn_kwargs,
)

typer.run(main)
81 changes: 81 additions & 0 deletions ml_pipelines/deployment/local/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
import sys
from logging import Logger
from typing import Union

import typer

from ml_pipelines.deployment.local.common import (
get_latest_version,
get_raw_data,
make_version,
save_eval_artifacts,
save_train_artifacts,
)
from ml_pipelines.pipeline.eval_pipeline import eval_pipeline
from ml_pipelines.pipeline.train_pipeline import train_pipeline


def run_train_pipeline( # noqa: PLR0913
raw_data_version: str,
raw_data_root_path: os.PathLike,
train_version: str,
train_artifacts_root_path: os.PathLike,
logger: Logger,
split_random_state: int = 3825,
):
logger.info(f"Running full training pipeline version {train_version}.")
logger.info(f"Raw data version {raw_data_version}.")
raw_data = get_raw_data(raw_data_version, raw_data_root_path)
train_artifacts = train_pipeline(
raw_data, split_random_state=split_random_state, logger=logger
)
save_train_artifacts(train_version, train_artifacts_root_path, train_artifacts)
logger.info("Saved train artifacts.")
metrics, plots = eval_pipeline(
train_artifacts["model"],
train_artifacts["feature_eng_params"],
train_artifacts["raw_test_data"],
logger,
)
save_eval_artifacts(train_version, train_artifacts_root_path, metrics, plots)
logger.info("Saved eval artifacts.")


if __name__ == "__main__":
from dotenv import load_dotenv

load_dotenv()
RAW_DATA_ROOT_DIR = os.environ["RAW_DATA_ROOT_DIR"]
TRAIN_ARTIFACTS_ROOT_DIR = os.environ["TRAIN_ARTIFACTS_ROOT_DIR"]

def main(
raw_data_version: Union[str, None] = None, # noqa: UP007
train_version: Union[str, None] = None, # noqa: UP007
raw_data_root_path: str = RAW_DATA_ROOT_DIR,
train_artifacts_root_path: str = TRAIN_ARTIFACTS_ROOT_DIR,
split_random_state: int = 3825,
):
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))

if raw_data_version is None:
raw_data_version = get_latest_version(
raw_data_root_path, # type: ignore
"raw_data.csv",
)

if train_version is None:
train_version = make_version(prefix="model")

run_train_pipeline( # noqa: PLR0913
raw_data_version=raw_data_version,
raw_data_root_path=raw_data_root_path, # type: ignore
train_version=train_version,
train_artifacts_root_path=train_artifacts_root_path, # type: ignore
logger=logger,
split_random_state=split_random_state,
)

typer.run(main)
2 changes: 1 addition & 1 deletion ml_pipelines/logic/eval/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def calculate_metrics(y_true: pd.Series, y_score: np.ndarray, logger: Logger):
logger.info(f"Calculating metrics for {len(y_true)} samples.")
return roc_auc_score(y_true, y_score)
return float(roc_auc_score(y_true, y_score))


def make_roc_plot(model: LogisticRegression, data: pd.DataFrame, logger: Logger):
Expand Down
36 changes: 26 additions & 10 deletions ml_pipelines/logic/serve/serve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Callable
from logging import Logger

import pandas as pd
Expand All @@ -18,39 +19,54 @@ class Point(BaseModel):
x2: float


PredictionLoggingFunc = Callable[
[list[tuple[Point, float]]], tuple[bool, Exception | None]
]


def create_fastapi_app(
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
logger: Logger,
prediction_logging_func: PredictionLoggingFunc,
):
app = FastAPI()

@app.get("/predict/")
@app.post("/predict/")
async def _(inputs: Point | list[Point]):
"""
Endpoint to predict probabilities for the given list of points.
:param points: List of points (x1, x2) (or single point)
:return: List of probability predictions (or single prediction)
"""
try:
logger.info("Responding to request for predictions. (GET /predict/)")
single = False
if isinstance(inputs, Point):
single = True
inputs = [inputs]
# Convert the input points to a numpy array
logger.info(f"Number of samples for inference: {len(inputs)}")

x_matrix = pd.DataFrame([{"X1": p.x1, "X2": p.x2} for p in inputs])
x_matrix = transform_features(x_matrix, feature_eng_params, logger)
predictions: list[float] = predict(model, x_matrix, logger).tolist()
logger.info("Computed inference.")

# Make predictions using the pre-trained model
predictions = predict(model, x_matrix, logger).tolist()
predictions_to_log = list(zip(inputs, predictions))
log_success, log_exception = prediction_logging_func(predictions_to_log)
if log_success and log_exception is None:
logger.info("Logged predictions for samples.")
else:
logger.warning(
"Unable to log predictions for samples.", exc_info=log_exception
)

if single:
predictions = predictions[0]
output = predictions[0] if single else predictions
logger.info("Returning predictions.")
return {"predictions": output}

return {"predictions": predictions}

except Exception:
except Exception as e:
logger.error("Inference failed.", exc_info=e)
raise HTTPException(status_code=500, detail="Internal server error")

return app
return app
4 changes: 3 additions & 1 deletion ml_pipelines/logic/train/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

def split_data(data: pd.DataFrame, random_state: int, logger: Logger):
logger.info(f"Splitting data for train/test. {random_state = }")
train_data: pd.DataFrame
test_data: pd.DataFrame
train_data, test_data = train_test_split(
data,
train_size=0.8,
random_state=random_state,
stratify=data["Y"],
shuffle=True,
)
) # type: ignore
return train_data, test_data


Expand Down
2 changes: 1 addition & 1 deletion ml_pipelines/pipeline/data_gen_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ml_pipelines.logic.common.dgp import generate_raw_data

data = generate_raw_data(10_000, 813)
data.to_csv("data.csv", index=False)
data.to_csv("raw_data.csv", index=False)
20 changes: 0 additions & 20 deletions ml_pipelines/pipeline/eval_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import json
import logging
import pickle
import sys
from logging import Logger

import matplotlib.pyplot as plt
Expand Down Expand Up @@ -41,22 +37,6 @@ def eval_pipeline(
return (metrics, plots)


with open("model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open("feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

test_data = pd.read_csv("raw_test_data.csv")
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
(metrics, plots) = eval_pipeline(model, feature_eng_params, test_data, logger)

with open("metrics.txt", "w+") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig("calibration_plot.png")


# fig, ax = plt.subplots()
# for i in [0, 1]:
# tmp = test_data.loc[test_data.Y == i, :]
Expand Down
Loading