import logging
import numpy as np
from tqdm import tqdm
from scipy.stats import norm
from sklearn.dummy import DummyRegressor
import statsmodels.api as sm
from copy import deepcopy
from causalml.inference.meta.base import BaseLearner
from causalml.inference.meta.utils import check_treatment_vector, convert_pd_to_np
from causalml.metrics import regression_metrics, classification_metrics
logger = logging.getLogger("causalml")
class StatsmodelsOLS:
"""A sklearn style wrapper class for statsmodels' OLS."""
def __init__(self, cov_type="HC1", alpha=0.05):
"""Initialize a statsmodels' OLS wrapper class object.
Args:
cov_type (str, optional): covariance estimator type.
alpha (float, optional): the confidence level alpha.
"""
self.cov_type = cov_type
self.alpha = alpha
def fit(self, X, y):
"""Fit OLS.
Args:
X (np.matrix): a feature matrix
y (np.array): a label vector
"""
# Append ones. The first column is for the treatment indicator.
X = sm.add_constant(X, prepend=False, has_constant="add")
self.model = sm.OLS(y, X).fit(cov_type=self.cov_type)
self.coefficients = self.model.params
self.conf_ints = self.model.conf_int(alpha=self.alpha)
def predict(self, X):
# Append ones. The first column is for the treatment indicator.
X = sm.add_constant(X, prepend=False, has_constant="add")
return self.model.predict(X)
[docs]class BaseSLearner(BaseLearner):
"""A parent class for S-learner classes.
An S-learner estimates treatment effects with one machine learning model.
Details of S-learner are available at `Kunzel et al. (2018) <https://arxiv.org/abs/1706.03461>`_.
"""
def __init__(self, learner=None, ate_alpha=0.05, control_name=0):
"""Initialize an S-learner.
Args:
learner (optional): a model to estimate the treatment effect
control_name (str or int, optional): name of control group
"""
if learner is not None:
self.model = learner
else:
self.model = DummyRegressor()
self.ate_alpha = ate_alpha
self.control_name = control_name
def __repr__(self):
return "{}(model={})".format(self.__class__.__name__, self.model.__repr__())
[docs] def fit(self, X, treatment, y, p=None):
"""Fit the inference model
Args:
X (np.matrix, np.array, or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models = {group: deepcopy(self.model) for group in self.t_groups}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
X_new = np.hstack((w.reshape((-1, 1)), X_filt))
self.models[group].fit(X_new, y_filt)
[docs] def predict(
self, X, treatment=None, y=None, p=None, return_components=False, verbose=True
):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
yhat_cs = {}
yhat_ts = {}
for group in self.t_groups:
model = self.models[group]
# set the treatment column to zero (the control group)
X_new = np.hstack((np.zeros((X.shape[0], 1)), X))
yhat_cs[group] = model.predict(X_new)
# set the treatment column to one (the treatment group)
X_new[:, 0] = 1
yhat_ts[group] = model.predict(X_new)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
w = (treatment_filt == group).astype(int)
y_filt = y[mask]
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = yhat_cs[group][mask][w == 0]
yhat[w == 1] = yhat_ts[group][mask][w == 1]
logger.info("Error metrics for group {}".format(group))
regression_metrics(y_filt, yhat, w)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
for i, group in enumerate(self.t_groups):
te[:, i] = yhat_ts[group] - yhat_cs[group]
if not return_components:
return te
else:
return te, yhat_cs, yhat_ts
[docs] def fit_predict(
self,
X,
treatment,
y,
p=None,
return_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
return_components=False,
verbose=True,
):
"""Fit the inference model of the S learner and predict treatment effects.
Args:
X (np.matrix, np.array, or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
return_ci (bool, optional): whether to return confidence intervals
n_bootstraps (int, optional): number of bootstrap iterations
bootstrap_size (int, optional): number of samples per bootstrap
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment].
If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment],
UB [n_samples, n_treatment]
"""
self.fit(X, treatment, y)
te = self.predict(X, treatment, y, return_components=return_components)
if not return_ci:
return te
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_global = deepcopy(self.models)
te_bootstraps = np.zeros(
shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps)
)
logger.info("Bootstrap Confidence Intervals")
for i in tqdm(range(n_bootstraps)):
te_b = self.bootstrap(X, treatment, y, size=bootstrap_size)
te_bootstraps[:, :, i] = te_b
te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2)
te_upper = np.percentile(
te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models = deepcopy(models_global)
return (te, te_lower, te_upper)
[docs] def estimate_ate(
self,
X,
treatment,
y,
p=None,
return_ci=False,
bootstrap_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
pretrain=False,
):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix, np.array, or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
return_ci (bool, optional): whether to return confidence intervals
bootstrap_ci (bool): whether to return confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
pretrain (bool): whether a model has been fit, default False.
Returns:
The mean and confidence interval (LB, UB) of the ATE estimate.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if pretrain:
te, yhat_cs, yhat_ts = self.predict(X, treatment, y, return_components=True)
else:
te, yhat_cs, yhat_ts = self.fit_predict(
X, treatment, y, return_components=True
)
ate = np.zeros(self.t_groups.shape[0])
ate_lb = np.zeros(self.t_groups.shape[0])
ate_ub = np.zeros(self.t_groups.shape[0])
for i, group in enumerate(self.t_groups):
_ate = te[:, i].mean()
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
prob_treatment = float(sum(w)) / w.shape[0]
yhat_c = yhat_cs[group][mask]
yhat_t = yhat_ts[group][mask]
se = np.sqrt(
(
(y_filt[w == 0] - yhat_c[w == 0]).var() / (1 - prob_treatment)
+ (y_filt[w == 1] - yhat_t[w == 1]).var() / prob_treatment
+ (yhat_t - yhat_c).var()
)
/ y_filt.shape[0]
)
_ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2)
ate[i] = _ate
ate_lb[i] = _ate_lb
ate_ub[i] = _ate_ub
if not return_ci:
return ate
elif return_ci and not bootstrap_ci:
return ate, ate_lb, ate_ub
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_global = deepcopy(self.models)
logger.info("Bootstrap Confidence Intervals for ATE")
ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps))
for n in tqdm(range(n_bootstraps)):
ate_b = self.bootstrap(X, treatment, y, size=bootstrap_size)
ate_bootstraps[:, n] = ate_b.mean()
ate_lower = np.percentile(
ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1
)
ate_upper = np.percentile(
ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models = deepcopy(models_global)
return ate, ate_lower, ate_upper
[docs]class BaseSRegressor(BaseSLearner):
"""
A parent class for S-learner regressor classes.
"""
def __init__(self, learner=None, ate_alpha=0.05, control_name=0):
"""Initialize an S-learner regressor.
Args:
learner (optional): a model to estimate the treatment effect
control_name (str or int, optional): name of control group
"""
super().__init__(
learner=learner, ate_alpha=ate_alpha, control_name=control_name
)
[docs]class BaseSClassifier(BaseSLearner):
"""
A parent class for S-learner classifier classes.
"""
def __init__(self, learner=None, ate_alpha=0.05, control_name=0):
"""Initialize an S-learner classifier.
Args:
learner (optional): a model to estimate the treatment effect.
Should have a predict_proba() method.
control_name (str or int, optional): name of control group
"""
super().__init__(
learner=learner, ate_alpha=ate_alpha, control_name=control_name
)
[docs] def predict(
self, X, treatment=None, y=None, p=None, return_components=False, verbose=True
):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
yhat_cs = {}
yhat_ts = {}
for group in self.t_groups:
model = self.models[group]
# set the treatment column to zero (the control group)
X_new = np.hstack((np.zeros((X.shape[0], 1)), X))
yhat_cs[group] = model.predict_proba(X_new)[:, 1]
# set the treatment column to one (the treatment group)
X_new[:, 0] = 1
yhat_ts[group] = model.predict_proba(X_new)[:, 1]
if y is not None and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
w = (treatment_filt == group).astype(int)
y_filt = y[mask]
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = yhat_cs[group][mask][w == 0]
yhat[w == 1] = yhat_ts[group][mask][w == 1]
logger.info("Error metrics for group {}".format(group))
classification_metrics(y_filt, yhat, w)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
for i, group in enumerate(self.t_groups):
te[:, i] = yhat_ts[group] - yhat_cs[group]
if not return_components:
return te
else:
return te, yhat_cs, yhat_ts
[docs]class LRSRegressor(BaseSRegressor):
def __init__(self, ate_alpha=0.05, control_name=0):
"""Initialize an S-learner with a linear regression model.
Args:
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
super().__init__(StatsmodelsOLS(alpha=ate_alpha), ate_alpha, control_name)
[docs] def estimate_ate(self, X, treatment, y, p=None, pretrain=False):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix, np.array, or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
Returns:
The mean and confidence interval (LB, UB) of the ATE estimate.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if not pretrain:
self.fit(X, treatment, y)
ate = np.zeros(self.t_groups.shape[0])
ate_lb = np.zeros(self.t_groups.shape[0])
ate_ub = np.zeros(self.t_groups.shape[0])
for i, group in enumerate(self.t_groups):
ate[i] = self.models[group].coefficients[0]
ate_lb[i] = self.models[group].conf_ints[0, 0]
ate_ub[i] = self.models[group].conf_ints[0, 1]
return ate, ate_lb, ate_ub