Skip to content

Commit

Permalink
Feature/make pipelines (#7)
Browse files Browse the repository at this point in the history
* make pipelines functions

* update readme

* update figsize

* add loggers to pipelines

* add missing param
  • Loading branch information
azuur authored Jan 20, 2024
1 parent ca31542 commit bb8fe1b
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 59 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Simple ML pipeline repo for experimenting with CI/CD / DevOps / MLOps.

## To do

- Use Python 3.11 in CI github action
- make pipelines functions
- add loggers to stuff
~~- Use Python 3.11 in CI github action~~
~~- make pipelines functions~~
~~- add loggers to stuff~~
- add db conn / func to save inference cases
- add deployment code...
- add versioning to training... in deployment?
8 changes: 6 additions & 2 deletions ml_pipelines/logic/common/feature_eng.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

import numpy as np
import pandas as pd
from pydantic import BaseModel
Expand All @@ -9,15 +11,17 @@ class FeatureEngineeringParams(BaseModel):


def transform_features(
data: pd.DataFrame, feature_eng_params: FeatureEngineeringParams
data: pd.DataFrame, feature_eng_params: FeatureEngineeringParams, logger: Logger
):
logger.info(f"Transforming features for {len(data)} samples.")
data = data.copy()
data["X1"] = np.exp(data["X1"]) - feature_eng_params.x1_exp_mean
data["X2"] = np.log(data["X2"]) - feature_eng_params.x2_log_mean
return data


def fit_feature_transform(data: pd.DataFrame):
def fit_feature_transform(data: pd.DataFrame, logger: Logger):
logger.info(f"Fitting feature transforms on {len(data)} samples.")
x1_exp_mean = np.exp(data["X1"]).mean()
x2_log_mean = np.log(data["X2"]).mean()
return FeatureEngineeringParams(x1_exp_mean=x1_exp_mean, x2_log_mean=x2_log_mean)
10 changes: 3 additions & 7 deletions ml_pipelines/logic/common/model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import pickle
from logging import Logger

import pandas as pd
from sklearn.linear_model import LogisticRegression


def load_model() -> LogisticRegression:
with open("model.pickle", "rb") as f:
return pickle.load(f)


def predict(model: LogisticRegression, x_matrix: pd.DataFrame):
def predict(model: LogisticRegression, x_matrix: pd.DataFrame, logger: Logger):
logger.info(f"Generating predictions for {len(x_matrix)} samples")
return model.predict_proba(x_matrix)[:, 1]
22 changes: 18 additions & 4 deletions ml_pipelines/logic/eval/eval.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from logging import Logger

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import RocCurveDisplay, roc_auc_score


def calculate_metrics(y_true: pd.Series, y_score: np.ndarray):
def calculate_metrics(y_true: pd.Series, y_score: np.ndarray, logger: Logger):
logger.info(f"Calculating metrics for {len(y_true)} samples.")
return roc_auc_score(y_true, y_score)


def make_roc_plot(model: LogisticRegression, data: pd.DataFrame):
def make_roc_plot(model: LogisticRegression, data: pd.DataFrame, logger: Logger):
display = RocCurveDisplay.from_estimator(model, data[["X1", "X2"]], data["Y"])
return display

def plot(ax):
logger.info(f"Producing AUROC plot on {len(data)} samples.")
display.plot(ax)
ax.plot([0, 1], [0, 1])
return ax

return plot


def prob_calibration_plot(data: pd.DataFrame, y_score, step=0.005):
def make_calibration_plot(
data: pd.DataFrame, y_score: np.ndarray, logger: Logger, step=0.005
):
xs = []
ys = []

Expand All @@ -33,6 +45,7 @@ def prob_calibration_plot(data: pd.DataFrame, y_score, step=0.005):
m_bins = beta[1] * m * 1.5

def plot(ax):
logger.info(f"Producing calibration plot on {len(data)} samples.")
ax.scatter(xs, ys, c="red")
ax.plot(xs, ys, c="red")
ax.set_xlabel("Model scores")
Expand All @@ -43,3 +56,4 @@ def plot(ax):
ax.set_ylim(-0.05, 1.1 * m_bins)

return plot
return plot
11 changes: 8 additions & 3 deletions ml_pipelines/logic/serve/serve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import Logger

import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
Expand All @@ -17,7 +19,9 @@ class Point(BaseModel):


def create_fastapi_app(
model: LogisticRegression, feature_eng_params: FeatureEngineeringParams
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
logger: Logger,
):
app = FastAPI()

Expand All @@ -35,10 +39,10 @@ async def _(inputs: Point | list[Point]):
inputs = [inputs]
# Convert the input points to a numpy array
x_matrix = pd.DataFrame([{"X1": p.x1, "X2": p.x2} for p in inputs])
x_matrix = transform_features(x_matrix, feature_eng_params)
x_matrix = transform_features(x_matrix, feature_eng_params, logger)

# Make predictions using the pre-trained model
predictions = predict(model, x_matrix).tolist()
predictions = predict(model, x_matrix, logger).tolist()

if single:
predictions = predictions[0]
Expand All @@ -49,3 +53,4 @@ async def _(inputs: Point | list[Point]):
raise HTTPException(status_code=500, detail="Internal server error")

return app
return app
16 changes: 6 additions & 10 deletions ml_pipelines/logic/train/train.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import pickle
from logging import Logger

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, random_state=123):
def split_data(data: pd.DataFrame, random_state: int, logger: Logger):
logger.info(f"Splitting data for train/test. {random_state = }")
train_data, test_data = train_test_split(
data,
train_size=0.8,
Expand All @@ -16,17 +17,12 @@ def split_data(data: pd.DataFrame, random_state=123):
return train_data, test_data


def train_model(train_data: pd.DataFrame):
def train_model(train_data: pd.DataFrame, logger: Logger):
logger.info("Training model.")
x_matrix = train_data[["X1", "X2"]]
y_matrix = train_data["Y"]

model = LogisticRegression(penalty=None)
model.fit(X=x_matrix, y=y_matrix)

logger.info("Model trained.")
return model


def save_model(model: LogisticRegression):
with open("model.pickle", "wb") as f:
pickle.dump(model, f)
pickle.dump(model, f)
74 changes: 53 additions & 21 deletions ml_pipelines/pipeline/eval_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
import json
import logging
import pickle
import sys
from logging import Logger

import matplotlib.pyplot as plt
import pandas as pd
from sklearn.linear_model import LogisticRegression

from ml_pipelines.logic.common.model import load_model, predict
from ml_pipelines.logic.common.feature_eng import (
FeatureEngineeringParams,
transform_features,
)
from ml_pipelines.logic.common.model import predict
from ml_pipelines.logic.eval.eval import (
calculate_metrics,
make_calibration_plot,
make_roc_plot,
prob_calibration_plot,
)

# Input
model = load_model()
test_data = pd.read_csv("test_data.csv")

y_score = predict(model, test_data[["X1", "X2"]])
metrics = calculate_metrics(test_data["Y"], y_score)
roc_plot = make_roc_plot(model, test_data)
calibration_plot = prob_calibration_plot(test_data, y_score)

# Output
with open("metrics.txt", "w") as f:
f.write(str(metrics))
fig, ax = plt.subplots()
roc_plot.plot(ax=ax)
ax.plot([0, 1], [0, 1])
plt.savefig("roc_plot.png")

fig, ax = plt.subplots()
calibration_plot(ax=ax)
plt.savefig("calibration_plot.png")


def eval_pipeline(
model: LogisticRegression,
feature_eng_params: FeatureEngineeringParams,
data: pd.DataFrame,
logger: Logger,
):
logger.info("Starting evaluation pipeline.")
data = transform_features(data, feature_eng_params, logger)
y_score = predict(model, data[["X1", "X2"]], logger)
metrics = calculate_metrics(data["Y"], y_score, logger)

# Output
plots, axs = plt.subplots(1, 2, figsize=(10, 5))
make_roc_plot(model, data, logger)(ax=axs[0])
make_calibration_plot(data, y_score, logger)(ax=axs[1])
logger.info("Finished evaluation pipeline.")
return (metrics, plots)


with open("model.pickle", "rb") as f: # type: ignore
model: LogisticRegression = pickle.load(f)

with open("feature_eng_params.json") as f: # type: ignore
feature_eng_params = FeatureEngineeringParams(**json.loads(f.read()))

test_data = pd.read_csv("raw_test_data.csv")
logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
(metrics, plots) = eval_pipeline(model, feature_eng_params, test_data, logger)

with open("metrics.txt", "w+") as f: # type: ignore
f.write(str(metrics)) # type: ignore
plots.savefig("calibration_plot.png")


# fig, ax = plt.subplots()
# for i in [0, 1]:
Expand All @@ -45,3 +73,7 @@
# ax.set_title("Scatter plot of training data")
# ax.legend(framealpha=1)
# plt.show()
# plt.show()
# ax.legend(framealpha=1)
# plt.show()
# plt.show()
49 changes: 40 additions & 9 deletions ml_pipelines/pipeline/train_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,59 @@
import logging
import pickle
import sys
from logging import Logger

import pandas as pd

from ml_pipelines.logic.common.feature_eng import (
fit_feature_transform,
transform_features,
)
from ml_pipelines.logic.train.train import save_model, split_data, train_model
from ml_pipelines.logic.train.train import split_data, train_model

# Input


def train_pipeline(data: pd.DataFrame, split_random_state: int, logger: Logger):
logger.info("Starting train pipeline.")
raw_train_data, raw_test_data = split_data(data, split_random_state, logger)
feature_eng_params = fit_feature_transform(raw_train_data, logger)
train_data = transform_features(raw_train_data, feature_eng_params, logger)
test_data = transform_features(raw_test_data, feature_eng_params, logger)
model = train_model(train_data, logger)
logger.info("Finished train pipeline.")
return (
model,
feature_eng_params,
raw_train_data,
raw_test_data,
train_data,
test_data,
)


logger = Logger(__file__)
logger.addHandler(logging.StreamHandler(sys.stdout))
data = pd.read_csv("data.csv")
(
model,
feature_eng_params,
raw_train_data,
raw_test_data,
train_data,
test_data,
) = train_pipeline(data, split_random_state=3825, logger=logger)

raw_train_data, raw_test_data = split_data(data, random_state=3397)
feature_eng_params = fit_feature_transform(raw_train_data)
train_data = transform_features(raw_train_data, feature_eng_params)
test_data = transform_features(raw_test_data, feature_eng_params)
model = train_model(train_data=train_data)

# Outputs
save_model(model)
with open("model.pickle", "wb") as f:
pickle.dump(model, f)
raw_train_data.to_csv("raw_train_data.csv", index=False)
raw_test_data.to_csv("raw_test_data.csv", index=False)
train_data.to_csv("train_data.csv", index=False)
test_data.to_csv("test_data.csv", index=False)
with open("feature_eng_params.json", "w") as f:
with open("feature_eng_params.json", "w") as f: # type: ignore
f.write(feature_eng_params.model_dump_json())
test_data.to_csv("test_data.csv", index=False)
with open("feature_eng_params.json", "w") as f:
with open("feature_eng_params.json", "w") as f: # type: ignore
f.write(feature_eng_params.model_dump_json())

0 comments on commit bb8fe1b

Please sign in to comment.