Skip to content

Commit

Permalink
Merge pull request #19 from CyberAgentAILab/feat/multi-task
Browse files Browse the repository at this point in the history
Support multi-task learning
  • Loading branch information
TomeHirata authored Aug 12, 2024
2 parents 1d8043a + 85fc069 commit cf31d38
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 194 deletions.
Binary file modified docs/source/_static/qte.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 13 additions & 4 deletions docs/source/get_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ Then, let's build an empirical cumulative distribution function (CDF).
estimator = dte_adj.SimpleDistributionEstimator()
estimator.fit(X, D, Y)
cdf = estimator.predict(D, Y)
locations = np.linspace(Y.min(), Y.max(), 20)
cdf = estimator.predict(1, locations)
Distributional treatment effect (DTE) can be computed easily in the following code.

.. code-block:: python
locations = np.linspace(Y.min(), Y.max(), 20)
dte, lower_bound, upper_bound = estimator.predict_dte(target_treatment_arm=1, control_treatment_arm=0, locations=locations, variance_type="simple")
A convenience function is available to visualize distribution effects. This method can be used for other distribution parameters including Probability Treatment Effect (PTE) and Quantile Treatment Effect (QTE).
Expand All @@ -89,7 +89,7 @@ In the following example, we use Logistic Regression. Please make sure that your
logit = LogisticRegression()
estimator = dte_adj.AdjustedDistributionEstimator(logit, folds=3)
estimator.fit(X, D, Y)
cdf = estimator.predict(D, Y)
cdf = estimator.predict(1, locations)
DTE can be computed and visualized in the following code.

Expand Down Expand Up @@ -155,4 +155,13 @@ To compute QTE, we use "predict_qte" method. The confidence band is computed by
:alt: QTE of adjusted estimator
:height: 300px
:width: 450px
:align: center
:align: center

You can use any model with "predict_proba" or "predict" method to adjust the distribution function estimation. For example, the following code use XGBoost classifier to estimate the conditional distribution.

.. code-block:: python
import xgboost as xgb
estimator = dte_adj.AdjustedDistributionEstimator(xgb.XGBClassifier(), folds=3)
estimator.fit(X, D, Y)
cdf = estimator.predict(1, locations)
130 changes: 81 additions & 49 deletions dte_adj/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def predict_qte(

qte_var = qtes.var(axis=0)

qte_lower = qte + norm.ppf(alpha / 2) / np.sqrt(qte_var)
qte_upper = qte + norm.ppf(1 - alpha / 2) / np.sqrt(qte_var)
qte_lower = qte + norm.ppf(alpha / 2) * np.sqrt(qte_var)
qte_upper = qte + norm.ppf(1 - alpha / 2) * np.sqrt(qte_var)

return qte, qte_lower, qte_upper

Expand All @@ -155,14 +155,14 @@ def _compute_dtes(
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute expected DTEs."""
treatment_cdf, treatment_cdf_mat = self._compute_cumulative_distribution(
np.full(locations.shape, target_treatment_arm),
target_treatment_arm,
locations,
self.confoundings,
self.treatment_arms,
self.outcomes,
)
control_cdf, control_cdf_mat = self._compute_cumulative_distribution(
np.full(locations.shape, control_treatment_arm),
control_treatment_arm,
locations,
self.confoundings,
self.treatment_arms,
Expand Down Expand Up @@ -207,7 +207,7 @@ def _compute_ptes(
"""Compute expected PTEs."""
treatment_cumulative_pre, treatment_cdf_mat_pre = (
self._compute_cumulative_distribution(
np.full(locations.shape, target_treatment_arm),
target_treatment_arm,
locations,
self.confoundings,
self.treatment_arms,
Expand All @@ -216,7 +216,7 @@ def _compute_ptes(
)
treatment_cumulative_post, treatment_cdf_mat_post = (
self._compute_cumulative_distribution(
np.full(locations.shape, target_treatment_arm),
target_treatment_arm,
locations + width,
self.confoundings,
self.treatment_arms,
Expand All @@ -226,7 +226,7 @@ def _compute_ptes(
treatment_pdf = treatment_cumulative_post - treatment_cumulative_pre
control_cumulative_pre, control_cdf_mat_pre = (
self._compute_cumulative_distribution(
np.full(locations.shape, control_treatment_arm),
control_treatment_arm,
locations,
self.confoundings,
self.treatment_arms,
Expand All @@ -235,7 +235,7 @@ def _compute_ptes(
)
control_cumulative_post, control_cdf_mat_post = (
self._compute_cumulative_distribution(
np.full(locations.shape, control_treatment_arm),
control_treatment_arm,
locations + width,
self.confoundings,
self.treatment_arms,
Expand Down Expand Up @@ -291,7 +291,7 @@ def find_quantile(quantile, arm):
while low <= high:
mid = (low + high) // 2
val, _ = self._compute_cumulative_distribution(
np.full((1), arm),
arm,
np.full((1), locations[mid]),
confoundings,
treatment_arms,
Expand Down Expand Up @@ -339,11 +339,11 @@ def fit(

return self

def predict(self, treatment_arms: np.ndarray, locations: np.ndarray) -> np.ndarray:
def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray:
"""Compute cumulative distribution values.
Args:
treatment_arms (np.ndarray): The index of the treatment arm.
treatment_arm (int): The index of the treatment arm.
outcomes (np.ndarray): Scalar values to be used for computing the cumulative distribution.
Returns:
Expand All @@ -354,15 +354,13 @@ def predict(self, treatment_arms: np.ndarray, locations: np.ndarray) -> np.ndarr
"This estimator has not been trained yet. Please call fit first"
)

unincluded_arms = set(treatment_arms) - set(self.treatment_arms)

if len(unincluded_arms) > 0:
if treatment_arm not in self.treatment_arms:
raise ValueError(
f"This treatment_arms argument contains arms not included in the training data: {unincluded_arms}"
f"This target treatment arm was not included in the training data: {treatment_arm}"
)

return self._compute_cumulative_distribution(
treatment_arms,
treatment_arm,
locations,
self.confoundings,
self.treatment_arms,
Expand All @@ -371,7 +369,7 @@ def predict(self, treatment_arms: np.ndarray, locations: np.ndarray) -> np.ndarr

def _compute_cumulative_distribution(
self,
target_treatment_arms: np.ndarray,
target_treatment_arm: int,
locations: np.ndarray,
confoundings: np.ndarray,
treatment_arms: np.ndarray,
Expand All @@ -396,7 +394,7 @@ def __init__(self):

def _compute_cumulative_distribution(
self,
target_treatment_arms: np.ndarray,
target_treatment_arm: int,
locations: np.ndarray,
confoundings: np.ndarray,
treatment_arms: np.ndarray,
Expand All @@ -405,7 +403,7 @@ def _compute_cumulative_distribution(
"""Compute the cumulative distribution values.
Args:
target_treatment_arms (np.ndarray): The index of the treatment arm.
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
confoundings: (np.ndarray): An array of confounding variables in the observed data.
treatment_arms (np.ndarray): An array of treatment arms in the observed data.
Expand All @@ -426,22 +424,23 @@ def _compute_cumulative_distribution(
d_confounding[arm] = selected_confounding[sorted_indices]
d_outcome[arm] = selected_outcome[sorted_indices]
cumulative_distribution = np.zeros(locations.shape)
for i, (outcome, arm) in enumerate(zip(locations, target_treatment_arms)):
for i, outcome in enumerate(locations):
cumulative_distribution[i] = (
np.searchsorted(d_outcome[arm], outcome, side="right")
) / d_outcome[arm].shape[0]
np.searchsorted(d_outcome[target_treatment_arm], outcome, side="right")
) / len(d_outcome[target_treatment_arm])
return cumulative_distribution, np.zeros((n_obs, n_loc))


class AdjustedDistributionEstimator(DistributionEstimatorBase):
"""A class is for estimating the adjusted distribution function and computing the Distributional parameters based on the trained conditional estimator."""

def __init__(self, base_model, folds=3):
def __init__(self, base_model, folds=3, is_multi_task=False):
"""Initializes the AdjustedDistributionEstimator.
Args:
base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data).
folds (int): The number of folds for cross-fitting.
is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets).
Returns:
AdjustedDistributionEstimator: An instance of the estimator.
Expand All @@ -454,11 +453,12 @@ def __init__(self, base_model, folds=3):
)
self.base_model = base_model
self.folds = folds
self.is_multi_task = is_multi_task
super().__init__()

def _compute_cumulative_distribution(
self,
target_treatment_arms: np.ndarray,
target_treatment_arm: int,
locations: np.ndarray,
confoundings: np.ndarray,
treatment_arms: np.ndarray,
Expand All @@ -467,7 +467,7 @@ def _compute_cumulative_distribution(
"""Compute the cumulative distribution values.
Args:
target_treatment_arms (np.ndarray): The index of the treatment arm.
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
confoundings: (np.ndarray): An array of confounding variables in the observed data.
treatment_arm (np.ndarray): An array of treatment arms in the observed data.
Expand All @@ -476,43 +476,75 @@ def _compute_cumulative_distribution(
Returns:
np.ndarray: Estimated cumulative distribution values.
"""
n_obs = outcomes.shape[0]
n_records = outcomes.shape[0]
n_loc = locations.shape[0]
cumulative_distribution = np.zeros(locations.shape)
superset_prediction = np.zeros((n_obs, n_loc))
for i, (location, arm) in enumerate(zip(locations, target_treatment_arms)):
confounding_in_arm = confoundings[treatment_arms == arm]
outcome_in_arm = outcomes[treatment_arms == arm]
subset_prediction = np.zeros(outcome_in_arm.shape[0])
binominal = (outcome_in_arm <= location) * 1
cdf = binominal.mean()
cumulative_distribution = np.zeros(n_loc)
superset_prediction = np.zeros((n_records, n_loc))
treatment_mask = treatment_arms == target_treatment_arm
if self.is_multi_task:
confounding_in_arm = confoundings[treatment_mask]
n_records_in_arm = len(confounding_in_arm)
outcome_in_arm = outcomes[treatment_mask] # (n_records)
subset_prediction = np.zeros(
(n_records_in_arm, n_loc)
) # (n_records_in_arm, n_loc)
binominal = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc)
cdf = binominal[treatment_mask].mean(axis=0) # (n_loc)
for fold in range(self.folds):
subset_mask = (
np.arange(confounding_in_arm.shape[0]) % self.folds == fold
)
confounding_train = confounding_in_arm[~subset_mask]
confounding_fit = confounding_in_arm[subset_mask]
superset_mask = np.arange(n_records) % self.folds == fold
subset_mask = superset_mask & treatment_mask
subset_mask_inner = superset_mask[treatment_mask]
confounding_train = confoundings[~subset_mask]
confounding_fit = confoundings[subset_mask]
binominal_train = binominal[~subset_mask]
superset_mask = np.arange(self.outcomes.shape[0]) % self.folds == fold
if np.unique(binominal_train).shape[0] == 1:
subset_prediction[subset_mask] = binominal_train[0]
superset_prediction[superset_mask, i] = binominal_train[0]
continue
model = deepcopy(self.base_model)
model.fit(confounding_train, binominal_train)
subset_prediction[subset_mask] = self._compute_model_prediction(
subset_prediction[subset_mask_inner] = self._compute_model_prediction(
model, confounding_fit
)
superset_prediction[superset_mask, i] = self._compute_model_prediction(
superset_prediction[superset_mask] = self._compute_model_prediction(
model, confoundings[superset_mask]
)
cumulative_distribution[i] = (
cdf - subset_prediction.mean() + superset_prediction[:, i].mean()
)
cumulative_distribution = (
cdf - subset_prediction.mean(axis=0) + superset_prediction.mean(axis=0)
) # (n_loc)
else:
for i, location in enumerate(locations):
confounding_in_arm = confoundings[treatment_mask]
outcome_in_arm = outcomes[treatment_mask]
subset_prediction = np.zeros(outcome_in_arm.shape[0])
binominal = (outcomes <= location) * 1 # (n_records)
cdf = binominal[treatment_mask].mean()
for fold in range(self.folds):
superset_mask = np.arange(n_records) % self.folds == fold
subset_mask = superset_mask & treatment_mask
subset_mask_inner = superset_mask[treatment_mask]
confounding_train = confoundings[~subset_mask]
confounding_fit = confoundings[subset_mask]
binominal_train = binominal[~subset_mask]
if len(np.unique(binominal_train)) == 1:
subset_prediction[subset_mask_inner] = binominal_train[0]
superset_prediction[superset_mask, i] = binominal_train[0]
continue
model = deepcopy(self.base_model)
model.fit(confounding_train, binominal_train)
subset_prediction[subset_mask_inner] = (
self._compute_model_prediction(model, confounding_fit)
)
superset_prediction[superset_mask, i] = (
self._compute_model_prediction(
model, confoundings[superset_mask]
)
)
cumulative_distribution[i] = (
cdf - subset_prediction.mean() + superset_prediction[:, i].mean()
)
return cumulative_distribution, superset_prediction

def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarray:
if hasattr(model, "predict_proba"):
if self.is_multi_task:
return model.predict_proba(confoundings)
return model.predict_proba(confoundings)[:, 1]
else:
return model.predict(confoundings)
Loading

0 comments on commit cf31d38

Please sign in to comment.