from copy import deepcopy
import logging
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.stats import norm
from causalml.inference.meta.base import BaseLearner
from causalml.inference.meta.utils import check_treatment_vector, check_p_conditions, convert_pd_to_np
from causalml.inference.meta.explainer import Explainer
from causalml.metrics import regression_metrics, classification_metrics
from causalml.propensity import compute_propensity_score
logger = logging.getLogger('causalml')
[docs]class BaseXLearner(BaseLearner):
"""A parent class for X-learner regressor classes.
An X-learner estimates treatment effects with four machine learning models.
Details of X-learner are available at Kunzel et al. (2018) (https://arxiv.org/abs/1706.03461).
"""
def __init__(self,
learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=.05,
control_name=0):
"""Initialize a X-learner.
Args:
learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment
groups
control_outcome_learner (optional): a model to estimate outcomes in the control group
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group
control_effect_learner (optional): a model to estimate treatment effects in the control group
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
assert (learner is not None) or ((control_outcome_learner is not None) and
(treatment_outcome_learner is not None) and
(control_effect_learner is not None) and
(treatment_effect_learner is not None))
if control_outcome_learner is None:
self.model_mu_c = deepcopy(learner)
else:
self.model_mu_c = control_outcome_learner
if treatment_outcome_learner is None:
self.model_mu_t = deepcopy(learner)
else:
self.model_mu_t = treatment_outcome_learner
if control_effect_learner is None:
self.model_tau_c = deepcopy(learner)
else:
self.model_tau_c = control_effect_learner
if treatment_effect_learner is None:
self.model_tau_t = deepcopy(learner)
else:
self.model_tau_t = treatment_effect_learner
self.ate_alpha = ate_alpha
self.control_name = control_name
self.propensity = None
self.propensity_model = None
def __repr__(self):
return ('{}(control_outcome_learner={},\n'
'\ttreatment_outcome_learner={},\n'
'\tcontrol_effect_learner={},\n'
'\ttreatment_effect_learner={})'.format(self.__class__.__name__,
self.model_mu_c.__repr__(),
self.model_mu_t.__repr__(),
self.model_tau_c.__repr__(),
self.model_tau_t.__repr__()))
[docs] def fit(self, X, treatment, y, p=None):
"""Fit the inference model.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups}
self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups}
self.models_tau_c = {group: deepcopy(self.model_tau_c) for group in self.t_groups}
self.models_tau_t = {group: deepcopy(self.model_tau_t) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
# Train outcome models
self.models_mu_c[group].fit(X_filt[w == 0], y_filt[w == 0])
self.models_mu_t[group].fit(X_filt[w == 1], y_filt[w == 1])
# Calculate variances and treatment effects
var_c = (y_filt[w == 0] - self.models_mu_c[group].predict(X_filt[w == 0])).var()
self.vars_c[group] = var_c
var_t = (y_filt[w == 1] - self.models_mu_t[group].predict(X_filt[w == 1])).var()
self.vars_t[group] = var_t
# Train treatment models
d_c = self.models_mu_t[group].predict(X_filt[w == 0]) - y_filt[w == 0]
d_t = y_filt[w == 1] - self.models_mu_c[group].predict(X_filt[w == 1])
self.models_tau_c[group].fit(X_filt[w == 0], d_c)
self.models_tau_t[group].fit(X_filt[w == 1], d_t)
[docs] def predict(self, X, treatment=None, y=None, p=None, return_components=False,
verbose=True):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
logger.info('Generating propensity score')
p = dict()
for group in self.t_groups:
p_model = self.propensity_model[group]
p[group] = p_model.predict(X)
else:
p = self._format_p(p, self.t_groups)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(-1, 1)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = self.models_mu_c[group].predict(X_filt[w == 0])
yhat[w == 1] = self.models_mu_t[group].predict(X_filt[w == 1])
logger.info('Error metrics for group {}'.format(group))
regression_metrics(y_filt, yhat, w)
if not return_components:
return te
else:
return te, dhat_cs, dhat_ts
[docs] def fit_predict(self, X, treatment, y, p=None, return_ci=False, n_bootstraps=1000, bootstrap_size=10000,
return_components=False, verbose=True):
"""Fit the treatment effect and outcome models of the R learner and predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_ci (bool): whether to return confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (str): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment]
If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment],
UB [n_samples, n_treatment]
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
self.fit(X, treatment, y, p)
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
te = self.predict(X, treatment=treatment, y=y, p=p, return_components=return_components)
if not return_ci:
return te
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_mu_c_global = deepcopy(self.models_mu_c)
models_mu_t_global = deepcopy(self.models_mu_t)
models_tau_c_global = deepcopy(self.models_tau_c)
models_tau_t_global = deepcopy(self.models_tau_t)
te_bootstraps = np.zeros(shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps))
logger.info('Bootstrap Confidence Intervals')
for i in tqdm(range(n_bootstraps)):
te_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
te_bootstraps[:, :, i] = te_b
te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2)
te_upper = np.percentile(te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models_mu_c = deepcopy(models_mu_c_global)
self.models_mu_t = deepcopy(models_mu_t_global)
self.models_tau_c = deepcopy(models_tau_c_global)
self.models_tau_t = deepcopy(models_tau_t_global)
return (te, te_lower, te_upper)
[docs] def estimate_ate(self, X, treatment, y, p=None, bootstrap_ci=False, n_bootstraps=1000, bootstrap_size=10000):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
bootstrap_ci (bool): whether run bootstrap for confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
Returns:
The mean and confidence interval (LB, UB) of the ATE estimate.
"""
te, dhat_cs, dhat_ts = self.fit_predict(X, treatment, y, p, return_components=True)
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
ate = np.zeros(self.t_groups.shape[0])
ate_lb = np.zeros(self.t_groups.shape[0])
ate_ub = np.zeros(self.t_groups.shape[0])
for i, group in enumerate(self.t_groups):
_ate = te[:, i].mean()
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
w = (treatment_filt == group).astype(int)
prob_treatment = float(sum(w)) / w.shape[0]
dhat_c = dhat_cs[group][mask]
dhat_t = dhat_ts[group][mask]
p_filt = p[group][mask]
# SE formula is based on the lower bound formula (7) from Imbens, Guido W., and Jeffrey M. Wooldridge. 2009.
# "Recent Developments in the Econometrics of Program Evaluation." Journal of Economic Literature
se = np.sqrt((
self.vars_t[group] / prob_treatment + self.vars_c[group] / (1 - prob_treatment) +
(p_filt * dhat_c + (1 - p_filt) * dhat_t).var()
) / w.shape[0])
_ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2)
ate[i] = _ate
ate_lb[i] = _ate_lb
ate_ub[i] = _ate_ub
if not bootstrap_ci:
return ate, ate_lb, ate_ub
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_mu_c_global = deepcopy(self.models_mu_c)
models_mu_t_global = deepcopy(self.models_mu_t)
models_tau_c_global = deepcopy(self.models_tau_c)
models_tau_t_global = deepcopy(self.models_tau_t)
logger.info('Bootstrap Confidence Intervals for ATE')
ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps))
for n in tqdm(range(n_bootstraps)):
cate_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
ate_bootstraps[:, n] = cate_b.mean()
ate_lower = np.percentile(ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1)
ate_upper = np.percentile(ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.models_mu_c = deepcopy(models_mu_c_global)
self.models_mu_t = deepcopy(models_mu_t_global)
self.models_tau_c = deepcopy(models_tau_c_global)
self.models_tau_t = deepcopy(models_tau_t_global)
return ate, ate_lower, ate_upper
[docs]class BaseXRegressor(BaseXLearner):
"""
A parent class for X-learner regressor classes.
"""
def __init__(self,
learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=.05,
control_name=0):
"""Initialize an X-learner regressor.
Args:
learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment
groups
control_outcome_learner (optional): a model to estimate outcomes in the control group
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group
control_effect_learner (optional): a model to estimate treatment effects in the control group
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
super().__init__(
learner=learner,
control_outcome_learner=control_outcome_learner,
treatment_outcome_learner=treatment_outcome_learner,
control_effect_learner=control_effect_learner,
treatment_effect_learner=treatment_effect_learner,
ate_alpha=ate_alpha,
control_name=control_name)
[docs]class BaseXClassifier(BaseXLearner):
"""
A parent class for X-learner classifier classes.
"""
def __init__(self,
outcome_learner=None,
effect_learner=None,
control_outcome_learner=None,
treatment_outcome_learner=None,
control_effect_learner=None,
treatment_effect_learner=None,
ate_alpha=.05,
control_name=0):
"""Initialize an X-learner classifier.
Args:
outcome_learner (optional): a model to estimate outcomes in both the control and treatment groups.
Should be a classifier.
effect_learner (optional): a model to estimate treatment effects in both the control and treatment groups.
Should be a regressor.
control_outcome_learner (optional): a model to estimate outcomes in the control group.
Should be a classifier.
treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group.
Should be a classifier.
control_effect_learner (optional): a model to estimate treatment effects in the control group.
Should be a regressor.
treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group
Should be a regressor.
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
"""
if outcome_learner is not None:
control_outcome_learner = outcome_learner
treatment_outcome_learner = outcome_learner
if effect_learner is not None:
control_effect_learner = effect_learner
treatment_effect_learner = effect_learner
super().__init__(
learner=None,
control_outcome_learner=control_outcome_learner,
treatment_outcome_learner=treatment_outcome_learner,
control_effect_learner=control_effect_learner,
treatment_effect_learner=treatment_effect_learner,
ate_alpha=ate_alpha,
control_name=control_name)
if ((control_outcome_learner is None) or (treatment_outcome_learner is None)) and (
(control_effect_learner is None) or (treatment_effect_learner is None)):
raise ValueError("Either the outcome learner or the effect learner pair must be specified.")
[docs] def fit(self, X, treatment, y, p=None):
"""Fit the inference model.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups}
self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups}
self.models_tau_c = {group: deepcopy(self.model_tau_c) for group in self.t_groups}
self.models_tau_t = {group: deepcopy(self.model_tau_t) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
# Train outcome models
self.models_mu_c[group].fit(X_filt[w == 0], y_filt[w == 0])
self.models_mu_t[group].fit(X_filt[w == 1], y_filt[w == 1])
# Calculate variances and treatment effects
var_c = (y_filt[w == 0] - self.models_mu_c[group].predict_proba(X_filt[w == 0])[:, 1]).var()
self.vars_c[group] = var_c
var_t = (y_filt[w == 1] - self.models_mu_t[group].predict_proba(X_filt[w == 1])[:, 1]).var()
self.vars_t[group] = var_t
# Train treatment models
d_c = self.models_mu_t[group].predict_proba(X_filt[w == 0])[:, 1] - y_filt[w == 0]
d_t = y_filt[w == 1] - self.models_mu_c[group].predict_proba(X_filt[w == 1])[:, 1]
self.models_tau_c[group].fit(X_filt[w == 0], d_c)
self.models_tau_t[group].fit(X_filt[w == 1], d_t)
[docs] def predict(self, X, treatment=None, y=None, p=None, return_components=False,
verbose=True):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
return_components (bool, optional): whether to return outcome for treatment and control seperately
return_p_score (bool, optional): whether to return propensity score
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if p is None:
logger.info('Generating propensity score')
p = dict()
for group in self.t_groups:
p_model = self.propensity_model[group]
p[group] = p_model.predict(X)
else:
p = self._format_p(p, self.t_groups)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(-1, 1)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
yhat = np.zeros_like(y_filt, dtype=float)
yhat[w == 0] = self.models_mu_c[group].predict_proba(X_filt[w == 0])[:, 1]
yhat[w == 1] = self.models_mu_t[group].predict_proba(X_filt[w == 1])[:, 1]
logger.info('Error metrics for group {}'.format(group))
classification_metrics(y_filt, yhat, w)
if not return_components:
return te
else:
return te, dhat_cs, dhat_ts