Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/make pipelines #7

Merged
merged 5 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Simple ML pipeline repo for experimenting with CI/CD / DevOps / MLOps.

## To do

- Use Python 3.11 in CI github action
- make pipelines functions
- add loggers to stuff
~~- Use Python 3.11 in CI github action~~
~~- make pipelines functions~~
~~- add loggers to stuff~~
- add db conn / func to save inference cases
- add deployment code...
- add versioning to training... in deployment?
8 changes: 6 additions & 2 deletions ml_pipelines/logic/common/feature_eng.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

import numpy as np
import pandas as pd
from pydantic import BaseModel
Expand All @@ -9,15 +11,17 @@ class FeatureEngineeringParams(BaseModel):


def transform_features(
data: pd.DataFrame, feature_eng_params: FeatureEngineeringParams
data: pd.DataFrame, feature_eng_params: FeatureEngineeringParams, logger: Logger
):
logger.info(f"Transforming features for {len(data)} samples.")
data = data.copy()
data["X1"] = np.exp(data["X1"]) - feature_eng_params.x1_exp_mean
data["X2"] = np.log(data["X2"]) - feature_eng_params.x2_log_mean
return data


def fit_feature_transform(data: pd.DataFrame):
def fit_feature_transform(data: pd.DataFrame, logger: Logger):
logger.info(f"Fitting feature transforms on {len(data)} samples.")
x1_exp_mean = np.exp(data["X1"]).mean()
x2_log_mean = np.log(data["X2"]).mean()
return FeatureEngineeringParams(x1_exp_mean=x1_exp_mean, x2_log_mean=x2_log_mean)
10 changes: 3 additions & 7 deletions ml_pipelines/logic/common/model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import pickle
from logging import Logger

import pandas as pd
from sklearn.linear_model import LogisticRegression


def load_model() -> LogisticRegression:
with open("model.pickle", "rb") as f:
return pickle.load(f)


def predict(model: LogisticRegression, x_matrix: pd.DataFrame):
def predict(model: LogisticRegression, x_matrix: pd.DataFrame, logger: Logger):
logger.info(f"Generating predictions for {len(x_matrix)} samples")
return model.predict_proba(x_matrix)[:, 1]
22 changes: 18 additions & 4 deletions ml_pipelines/logic/eval/eval.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from logging import Logger

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import RocCurveDisplay, roc_auc_score


def calculate_metrics(y_true: pd.Series, y_score: np.ndarray):
def calculate_metrics(y_true: pd.Series, y_score: np.ndarray, logger: Logger):
logger.info(f"Calculating metrics for {len(y_true)} samples.")
return roc_auc_score(y_true, y_score)


def make_roc_plot(model: LogisticRegression, data: pd.DataFrame):
def make_roc_plot(model: LogisticRegression, data: pd.DataFrame, logger: Logger):
display = RocCurveDisplay.from_estimator(model, data[["X1", "X2"]], data["Y"])
return display

def plot(ax):
logger.info(f"Producing AUROC plot on {len(data)} samples.")
display.plot(ax)
ax.plot([0, 1], [0, 1])
return ax

return plot


def prob_calibration_plot(data: pd.DataFrame, y_score, step=0.005):
def make_calibration_plot(
data: pd.DataFrame, y_score: np.ndarray, logger: Logger, step=0.005
):
xs = []
ys = []

Expand All @@ -33,6 +45,7 @@ def prob_calibration_plot(data: pd.DataFrame, y_score, step=0.005):
m_bins = beta[1] * m * 1.5

def plot(ax):
logger.info(f"Producing calibration plot on {len(data)} samples.")
ax.scatter(xs, ys, c="red")
ax.plot(xs, ys, c="red")
ax.set_xlabel("Model scores")
Expand All @@ -43,3 +56,4 @@ def plot(ax):
ax.set_ylim(-0.05, 1.1 * m_bins)

return plot
return plot
11 changes: 8 additions & 3 deletions ml_pipelines/logic/serve/serve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
Expand All @@ -17,7 +19,9 @@ class Point(BaseModel):


def create_fastapi_app(
model: LogisticRegression, feature_eng_params: FeatureEngineeringParams
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
logger: Logger,
):
app = FastAPI()

Expand All @@ -35,10 +39,10 @@ async def _(inputs: Point | list[Point]):
inputs = [inputs]
# Convert the input points to a numpy array
x_matrix = pd.DataFrame([{"X1": p.x1, "X2": p.x2} for p in inputs])
x_matrix = transform_features(x_matrix, feature_eng_params)
x_matrix = transform_features(x_matrix, feature_eng_params, logger)

# Make predictions using the pre-trained model
predictions = predict(model, x_matrix).tolist()
predictions = predict(model, x_matrix, logger).tolist()

if single:
predictions = predictions[0]
Expand All @@ -49,3 +53,4 @@ async def _(inputs: Point | list[Point]):
raise HTTPException(status_code=500, detail="Internal server error")

return app
return app
16 changes: 6 additions & 10 deletions ml_pipelines/logic/train/train.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pickle
from logging import Logger

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, random_state=123):
def split_data(data: pd.DataFrame, random_state: int, logger: Logger):
logger.info(f"Splitting data for train/test. {random_state = }")
train_data, test_data = train_test_split(
data,
train_size=0.8,
Expand All @@ -16,17 +17,12 @@ def split_data(data: pd.DataFrame, random_state=123):
return train_data, test_data


def train_model(train_data: pd.DataFrame):
def train_model(train_data: pd.DataFrame, logger: Logger):
logger.info("Training model.")
x_matrix = train_data[["X1", "X2"]]
y_matrix = train_data["Y"]

model = LogisticRegression(penalty=None)
model.fit(X=x_matrix, y=y_matrix)

logger.info("Model trained.")
return model


def save_model(model: LogisticRegression):
with open("model.pickle", "wb") as f:
pickle.dump(model, f)
pickle.dump(model, f)
74 changes: 53 additions & 21 deletions ml_pipelines/pipeline/eval_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
import json
import logging
import pickle
import sys
from logging import Logger

import matplotlib.pyplot as plt
import pandas as pd
from sklearn.linear_model import LogisticRegression

from ml_pipelines.logic.common.model import load_model, predict
from ml_pipelines.logic.common.feature_eng import (
FeatureEngineeringParams,
transform_features,
)
from ml_pipelines.logic.common.model import predict
from ml_pipelines.logic.eval.eval import (
calculate_metrics,
make_calibration_plot,
make_roc_plot,
prob_calibration_plot,
)

# Input
model = load_model()
test_data = pd.read_csv("test_data.csv")

y_score = predict(model, test_data[["X1", "X2"]])
metrics = calculate_metrics(test_data["Y"], y_score)
roc_plot = make_roc_plot(model, test_data)
calibration_plot = prob_calibration_plot(test_data, y_score)

# Output
with open("metrics.txt", "w") as f:
f.write(str(metrics))
fig, ax = plt.subplots()
roc_plot.plot(ax=ax)
ax.plot([0, 1], [0, 1])
plt.savefig("roc_plot.png")

fig, ax = plt.subplots()
calibration_plot(ax=ax)
plt.savefig("calibration_plot.png")


def eval_pipeline(
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
data: pd.DataFrame,
logger: Logger,
):
logger.info("Starting evaluation pipeline.")
data = transform_features(data, feature_eng_params, logger)
y_score = predict(model, data[["X1", "X2"]], logger)
metrics = calculate_metrics(data["Y"], y_score, logger)

# Output
plots, axs = plt.subplots(1, 2, figsize=(10, 5))
make_roc_plot(model, data, logger)(ax=axs[0])
make_calibration_plot(data, y_score, logger)(ax=axs[1])
logger.info("Finished evaluation pipeline.")
return (metrics, plots)


with open("model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open("feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

test_data = pd.read_csv("raw_test_data.csv")
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
(metrics, plots) = eval_pipeline(model, feature_eng_params, test_data, logger)

with open("metrics.txt", "w+") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig("calibration_plot.png")


# fig, ax = plt.subplots()
# for i in [0, 1]:
Expand All @@ -45,3 +73,7 @@
# ax.set_title("Scatter plot of training data")
# ax.legend(framealpha=1)
# plt.show()
# plt.show()
# ax.legend(framealpha=1)
# plt.show()
# plt.show()
49 changes: 40 additions & 9 deletions ml_pipelines/pipeline/train_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,59 @@
import logging
import pickle
import sys
from logging import Logger

import pandas as pd

from ml_pipelines.logic.common.feature_eng import (
fit_feature_transform,
transform_features,
)
from ml_pipelines.logic.train.train import save_model, split_data, train_model
from ml_pipelines.logic.train.train import split_data, train_model

# Input


def train_pipeline(data: pd.DataFrame, split_random_state: int, logger: Logger):
logger.info("Starting train pipeline.")
raw_train_data, raw_test_data = split_data(data, split_random_state, logger)
feature_eng_params = fit_feature_transform(raw_train_data, logger)
train_data = transform_features(raw_train_data, feature_eng_params, logger)
test_data = transform_features(raw_test_data, feature_eng_params, logger)
model = train_model(train_data, logger)
logger.info("Finished train pipeline.")
return (
model,
feature_eng_params,
raw_train_data,
raw_test_data,
train_data,
test_data,
)


logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
data = pd.read_csv("data.csv")
(
model,
feature_eng_params,
raw_train_data,
raw_test_data,
train_data,
test_data,
) = train_pipeline(data, split_random_state=3825, logger=logger)

raw_train_data, raw_test_data = split_data(data, random_state=3397)
feature_eng_params = fit_feature_transform(raw_train_data)
train_data = transform_features(raw_train_data, feature_eng_params)
test_data = transform_features(raw_test_data, feature_eng_params)
model = train_model(train_data=train_data)

# Outputs
save_model(model)
with open("model.pickle", "wb") as f:
pickle.dump(model, f)
raw_train_data.to_csv("raw_train_data.csv", index=False)
raw_test_data.to_csv("raw_test_data.csv", index=False)
train_data.to_csv("train_data.csv", index=False)
test_data.to_csv("test_data.csv", index=False)
with open("feature_eng_params.json", "w") as f:
with open("feature_eng_params.json", "w") as f: # type: ignore
f.write(feature_eng_params.model_dump_json())
test_data.to_csv("test_data.csv", index=False)
with open("feature_eng_params.json", "w") as f:
with open("feature_eng_params.json", "w") as f: # type: ignore
f.write(feature_eng_params.model_dump_json())