from copy import deepcopy
import logging
import numpy as np
from tqdm import tqdm
from scipy.stats import norm
from causalml.inference.meta.base import BaseLearner
from causalml.inference.meta.utils import (
check_treatment_vector,
convert_pd_to_np,
)
from causalml.metrics import regression_metrics, classification_metrics
logger = logging.getLogger("causalml")
[docs]class BaseXLearner(BaseLearner):
"""A parent class for X-learner regressor classes.
An X-learner estimates treatment effects with four machine learning models.
Details of X-learner are available at `Kunzel et al. (2018) <https://arxiv.org/abs/1706.03461>`_.
"""
def __init__(
self,
learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=0.05,
control_name=0,
):
"""Initialize a X-learner.
Args:
learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment
groups
control_outcome_learner (optional): a model to estimate outcomes in the control group
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group
control_effect_learner (optional): a model to estimate treatment effects in the control group
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
assert (learner is not None) or (
(control_outcome_learner is not None)
and (treatment_outcome_learner is not None)
and (control_effect_learner is not None)
and (treatment_effect_learner is not None)
)
if control_outcome_learner is None:
self.model_mu_c = deepcopy(learner)
else:
self.model_mu_c = control_outcome_learner
if treatment_outcome_learner is None:
self.model_mu_t = deepcopy(learner)
else:
self.model_mu_t = treatment_outcome_learner
if control_effect_learner is None:
self.model_tau_c = deepcopy(learner)
else:
self.model_tau_c = control_effect_learner
if treatment_effect_learner is None:
self.model_tau_t = deepcopy(learner)
else:
self.model_tau_t = treatment_effect_learner
self.ate_alpha = ate_alpha
self.control_name = control_name
self.propensity = None
self.propensity_model = None
def __repr__(self):
return (
"{}(control_outcome_learner={},\n"
"\ttreatment_outcome_learner={},\n"
"\tcontrol_effect_learner={},\n"
"\ttreatment_effect_learner={})".format(
self.__class__.__name__,
self.model_mu_c.__repr__(),
self.model_mu_t.__repr__(),
self.model_tau_c.__repr__(),
self.model_tau_t.__repr__(),
)
)
[docs] def fit(self, X, treatment, y, p=None):
"""Fit the inference model.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups}
self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups}
self.models_tau_c = {
group: deepcopy(self.model_tau_c) for group in self.t_groups
}
self.models_tau_t = {
group: deepcopy(self.model_tau_t) for group in self.t_groups
}
self.vars_c = {}
self.vars_t = {}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
# Train outcome models
self.models_mu_c[group].fit(X_filt[w == 0], y_filt[w == 0])
self.models_mu_t[group].fit(X_filt[w == 1], y_filt[w == 1])
# Calculate variances and treatment effects
var_c = (
y_filt[w == 0] - self.models_mu_c[group].predict(X_filt[w == 0])
).var()
self.vars_c[group] = var_c
var_t = (
y_filt[w == 1] - self.models_mu_t[group].predict(X_filt[w == 1])
).var()
self.vars_t[group] = var_t
# Train treatment models
d_c = self.models_mu_t[group].predict(X_filt[w == 0]) - y_filt[w == 0]
d_t = y_filt[w == 1] - self.models_mu_c[group].predict(X_filt[w == 1])
self.models_tau_c[group].fit(X_filt[w == 0], d_c)
self.models_tau_t[group].fit(X_filt[w == 1], d_t)
[docs] def predict(
self, X, treatment=None, y=None, p=None, return_components=False, verbose=True
):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
logger.info("Generating propensity score")
p = dict()
for group in self.t_groups:
p_model = self.propensity_model[group]
p[group] = p_model.predict(X)
else:
p = self._format_p(p, self.t_groups)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(
-1, 1
)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = self.models_mu_c[group].predict(X_filt[w == 0])
yhat[w == 1] = self.models_mu_t[group].predict(X_filt[w == 1])
logger.info("Error metrics for group {}".format(group))
regression_metrics(y_filt, yhat, w)
if not return_components:
return te
else:
return te, dhat_cs, dhat_ts
[docs] def fit_predict(
self,
X,
treatment,
y,
p=None,
return_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
return_components=False,
verbose=True,
):
"""Fit the treatment effect and outcome models of the R learner and predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_ci (bool): whether to return confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (str): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment]
If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment],
UB [n_samples, n_treatment]
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
self.fit(X, treatment, y, p)
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
te = self.predict(
X, treatment=treatment, y=y, p=p, return_components=return_components
)
if not return_ci:
return te
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_mu_c_global = deepcopy(self.models_mu_c)
models_mu_t_global = deepcopy(self.models_mu_t)
models_tau_c_global = deepcopy(self.models_tau_c)
models_tau_t_global = deepcopy(self.models_tau_t)
te_bootstraps = np.zeros(
shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps)
)
logger.info("Bootstrap Confidence Intervals")
for i in tqdm(range(n_bootstraps)):
te_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
te_bootstraps[:, :, i] = te_b
te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2)
te_upper = np.percentile(
te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models_mu_c = deepcopy(models_mu_c_global)
self.models_mu_t = deepcopy(models_mu_t_global)
self.models_tau_c = deepcopy(models_tau_c_global)
self.models_tau_t = deepcopy(models_tau_t_global)
return (te, te_lower, te_upper)
[docs] def estimate_ate(
self,
X,
treatment,
y,
p=None,
bootstrap_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
pretrain=False,
):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
bootstrap_ci (bool): whether run bootstrap for confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
pretrain (bool): whether a model has been fit, default False.
Returns:
The mean and confidence interval (LB, UB) of the ATE estimate.
"""
if pretrain:
if p is None:
# when p is null, use pretrain propensity score
if not self.propensity:
raise ValueError("no propensity score, please call fit() first")
te, dhat_cs, dhat_ts = self.predict(
X, treatment, y, p=self.propensity, return_components=True
)
else:
p = self._format_p(p, self.t_groups)
te, dhat_cs, dhat_ts = self.predict(
X, treatment, y, p=p, return_components=True
)
else:
te, dhat_cs, dhat_ts = self.fit_predict(
X, treatment, y, p, return_components=True
)
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
ate = np.zeros(self.t_groups.shape[0])
ate_lb = np.zeros(self.t_groups.shape[0])
ate_ub = np.zeros(self.t_groups.shape[0])
for i, group in enumerate(self.t_groups):
_ate = te[:, i].mean()
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
w = (treatment_filt == group).astype(int)
prob_treatment = float(sum(w)) / w.shape[0]
dhat_c = dhat_cs[group][mask]
dhat_t = dhat_ts[group][mask]
p_filt = p[group][mask]
# SE formula is based on the lower bound formula (7) from Imbens, Guido W., and Jeffrey M. Wooldridge. 2009.
# "Recent Developments in the Econometrics of Program Evaluation." Journal of Economic Literature
se = np.sqrt(
(
self.vars_t[group] / prob_treatment
+ self.vars_c[group] / (1 - prob_treatment)
+ (p_filt * dhat_c + (1 - p_filt) * dhat_t).var()
)
/ w.shape[0]
)
_ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2)
ate[i] = _ate
ate_lb[i] = _ate_lb
ate_ub[i] = _ate_ub
if not bootstrap_ci:
return ate, ate_lb, ate_ub
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_mu_c_global = deepcopy(self.models_mu_c)
models_mu_t_global = deepcopy(self.models_mu_t)
models_tau_c_global = deepcopy(self.models_tau_c)
models_tau_t_global = deepcopy(self.models_tau_t)
logger.info("Bootstrap Confidence Intervals for ATE")
ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps))
for n in tqdm(range(n_bootstraps)):
cate_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
ate_bootstraps[:, n] = cate_b.mean()
ate_lower = np.percentile(
ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1
)
ate_upper = np.percentile(
ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models_mu_c = deepcopy(models_mu_c_global)
self.models_mu_t = deepcopy(models_mu_t_global)
self.models_tau_c = deepcopy(models_tau_c_global)
self.models_tau_t = deepcopy(models_tau_t_global)
return ate, ate_lower, ate_upper
[docs]class BaseXRegressor(BaseXLearner):
"""
A parent class for X-learner regressor classes.
"""
def __init__(
self,
learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=0.05,
control_name=0,
):
"""Initialize an X-learner regressor.
Args:
learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment
groups
control_outcome_learner (optional): a model to estimate outcomes in the control group
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group
control_effect_learner (optional): a model to estimate treatment effects in the control group
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
super().__init__(
learner=learner,
control_outcome_learner=control_outcome_learner,
treatment_outcome_learner=treatment_outcome_learner,
control_effect_learner=control_effect_learner,
treatment_effect_learner=treatment_effect_learner,
ate_alpha=ate_alpha,
control_name=control_name,
)
[docs]class BaseXClassifier(BaseXLearner):
"""
A parent class for X-learner classifier classes.
"""
def __init__(
self,
outcome_learner=None,
effect_learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=0.05,
control_name=0,
):
"""Initialize an X-learner classifier.
Args:
outcome_learner (optional): a model to estimate outcomes in both the control and treatment groups.
Should be a classifier.
effect_learner (optional): a model to estimate treatment effects in both the control and treatment groups.
Should be a regressor.
control_outcome_learner (optional): a model to estimate outcomes in the control group.
Should be a classifier.
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group.
Should be a classifier.
control_effect_learner (optional): a model to estimate treatment effects in the control group.
Should be a regressor.
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
Should be a regressor.
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
if outcome_learner is not None:
control_outcome_learner = outcome_learner
treatment_outcome_learner = outcome_learner
if effect_learner is not None:
control_effect_learner = effect_learner
treatment_effect_learner = effect_learner
super().__init__(
learner=None,
control_outcome_learner=control_outcome_learner,
treatment_outcome_learner=treatment_outcome_learner,
control_effect_learner=control_effect_learner,
treatment_effect_learner=treatment_effect_learner,
ate_alpha=ate_alpha,
control_name=control_name,
)
if (
(control_outcome_learner is None) or (treatment_outcome_learner is None)
) and ((control_effect_learner is None) or (treatment_effect_learner is None)):
raise ValueError(
"Either the outcome learner or the effect learner pair must be specified."
)
[docs] def fit(self, X, treatment, y, p=None):
"""Fit the inference model.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups}
self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups}
self.models_tau_c = {
group: deepcopy(self.model_tau_c) for group in self.t_groups
}
self.models_tau_t = {
group: deepcopy(self.model_tau_t) for group in self.t_groups
}
self.vars_c = {}
self.vars_t = {}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
# Train outcome models
self.models_mu_c[group].fit(X_filt[w == 0], y_filt[w == 0])
self.models_mu_t[group].fit(X_filt[w == 1], y_filt[w == 1])
# Calculate variances and treatment effects
var_c = (
y_filt[w == 0]
- self.models_mu_c[group].predict_proba(X_filt[w == 0])[:, 1]
).var()
self.vars_c[group] = var_c
var_t = (
y_filt[w == 1]
- self.models_mu_t[group].predict_proba(X_filt[w == 1])[:, 1]
).var()
self.vars_t[group] = var_t
# Train treatment models
d_c = (
self.models_mu_t[group].predict_proba(X_filt[w == 0])[:, 1]
- y_filt[w == 0]
)
d_t = (
y_filt[w == 1]
- self.models_mu_c[group].predict_proba(X_filt[w == 1])[:, 1]
)
self.models_tau_c[group].fit(X_filt[w == 0], d_c)
self.models_tau_t[group].fit(X_filt[w == 1], d_t)
[docs] def predict(
self, X, treatment=None, y=None, p=None, return_components=False, verbose=True
):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_components (bool, optional): whether to return outcome for treatment and control seperately
return_p_score (bool, optional): whether to return propensity score
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
logger.info("Generating propensity score")
p = dict()
for group in self.t_groups:
p_model = self.propensity_model[group]
p[group] = p_model.predict(X)
else:
p = self._format_p(p, self.t_groups)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(
-1, 1
)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = self.models_mu_c[group].predict_proba(X_filt[w == 0])[
:, 1
]
yhat[w == 1] = self.models_mu_t[group].predict_proba(X_filt[w == 1])[
:, 1
]
logger.info("Error metrics for group {}".format(group))
classification_metrics(y_filt, yhat, w)
if not return_components:
return te
else:
return te, dhat_cs, dhat_ts