from copy import deepcopy
import logging
import numpy as np
from tqdm import tqdm
from scipy.stats import norm
from sklearn.model_selection import cross_val_predict, KFold, train_test_split
from xgboost import XGBRegressor
from causalml.inference.meta.base import BaseLearner
from causalml.inference.meta.utils import (
check_treatment_vector,
get_xgboost_objective_metric,
convert_pd_to_np,
get_weighted_variance,
)
from causalml.propensity import ElasticNetPropensityModel
logger = logging.getLogger("causalml")
[docs]class BaseRLearner(BaseLearner):
"""A parent class for R-learner classes.
An R-learner estimates treatment effects with two machine learning models and the propensity score.
Details of R-learner are available at `Nie and Wager (2019) <https://arxiv.org/abs/1712.04912>`_.
"""
def __init__(
self,
learner=None,
outcome_learner=None,
effect_learner=None,
propensity_learner=ElasticNetPropensityModel(),
ate_alpha=0.05,
control_name=0,
n_fold=5,
random_state=None,
cv_n_jobs=-1,
):
"""Initialize an R-learner.
Args:
learner (optional): a model to estimate outcomes and treatment effects
outcome_learner (optional): a model to estimate outcomes
effect_learner (optional): a model to estimate treatment effects. It needs to take `sample_weight` as an
input argument for `fit()`
propensity_learner (optional): a model to estimate propensity scores. `ElasticNetPropensityModel()` will
be used by default.
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
n_fold (int, optional): the number of cross validation folds for outcome_learner
random_state (int or RandomState, optional): a seed (int) or random number generator (RandomState)
cv_n_jobs (int, optional): number of parallel jobs to run for cross_val_predict. -1 means using all
processors
"""
assert (learner is not None) or (
(outcome_learner is not None) and (effect_learner is not None)
)
assert propensity_learner is not None
self.model_mu = (
outcome_learner if outcome_learner is not None else deepcopy(learner)
)
self.model_tau = (
effect_learner if effect_learner is not None else deepcopy(learner)
)
self.model_p = propensity_learner
self.ate_alpha = ate_alpha
self.control_name = control_name
self.random_state = random_state
self.cv = KFold(n_splits=n_fold, shuffle=True, random_state=random_state)
self.cv_n_jobs = cv_n_jobs
self.propensity = None
self.propensity_model = None
def __repr__(self):
return (
f"{self.__class__.__name__}\n"
f"\toutcome_learner={self.model_mu.__repr__()}\n"
f"\teffect_learner={self.model_tau.__repr__()}\n"
f"\tpropensity_learner={self.model_p.__repr__()}"
)
[docs] def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True):
"""Fit the treatment effect and outcome models of the R learner.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
sample_weight (np.array or pd.Series, optional): an array of sample weights indicating the
weight of each observation for `effect_learner`. If None, it assumes equal weight.
verbose (bool, optional): whether to output progress logs
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
if sample_weight is not None:
assert len(sample_weight) == len(
y
), "Data length must be equal for sample_weight and the input data"
sample_weight = convert_pd_to_np(sample_weight)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_tau = {group: deepcopy(self.model_tau) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
if verbose:
logger.info("generating out-of-fold CV outcome estimates")
yhat = cross_val_predict(self.model_mu, X, y, cv=self.cv, n_jobs=self.cv_n_jobs)
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
yhat_filt = yhat[mask]
p_filt = p[group][mask]
w = (treatment_filt == group).astype(int)
weight = (w - p_filt) ** 2
diff_c = y_filt[w == 0] - yhat_filt[w == 0]
diff_t = y_filt[w == 1] - yhat_filt[w == 1]
if sample_weight is not None:
sample_weight_filt = sample_weight[mask]
sample_weight_filt_c = sample_weight_filt[w == 0]
sample_weight_filt_t = sample_weight_filt[w == 1]
self.vars_c[group] = get_weighted_variance(diff_c, sample_weight_filt_c)
self.vars_t[group] = get_weighted_variance(diff_t, sample_weight_filt_t)
weight *= sample_weight_filt # update weight
else:
self.vars_c[group] = diff_c.var()
self.vars_t[group] = diff_t.var()
if verbose:
logger.info(
"training the treatment effect model for {} with R-loss".format(
group
)
)
self.models_tau[group].fit(
X_filt, (y_filt - yhat_filt) / (w - p_filt), sample_weight=weight
)
[docs] def predict(self, X, p=None):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X = convert_pd_to_np(X)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
for i, group in enumerate(self.t_groups):
dhat = self.models_tau[group].predict(X)
te[:, i] = dhat
return te
[docs] def fit_predict(
self,
X,
treatment,
y,
p=None,
sample_weight=None,
return_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
verbose=True,
):
"""Fit the treatment effect and outcome models of the R learner and predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
sample_weight (np.array or pd.Series, optional): an array of sample weights indicating the
weight of each observation for `effect_learner`. If None, it assumes equal weight.
return_ci (bool): whether to return confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
verbose (bool): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment].
If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment],
UB [n_samples, n_treatment]
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
self.fit(X, treatment, y, p, sample_weight, verbose=verbose)
te = self.predict(X)
if not return_ci:
return te
else:
t_groups_global = self.t_groups
_classes_global = self._classes
model_mu_global = deepcopy(self.model_mu)
models_tau_global = deepcopy(self.models_tau)
te_bootstraps = np.zeros(
shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps)
)
logger.info("Bootstrap Confidence Intervals")
for i in tqdm(range(n_bootstraps)):
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
te_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
te_bootstraps[:, :, i] = te_b
te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2)
te_upper = np.percentile(
te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.model_mu = deepcopy(model_mu_global)
self.models_tau = deepcopy(models_tau_global)
return (te, te_lower, te_upper)
[docs] def estimate_ate(
self,
X,
treatment=None,
y=None,
p=None,
sample_weight=None,
bootstrap_ci=False,
n_bootstraps=1000,
bootstrap_size=10000,
pretrain=False,
):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): only needed when pretrain=False, a treatment vector
y (np.array or pd.Series):only needed when pretrain=False, an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
sample_weight (np.array or pd.Series, optional): an array of sample weights indicating the
weight of each observation for `effect_learner`. If None, it assumes equal weight.
bootstrap_ci (bool): whether run bootstrap for confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
pretrain (bool): whether a model has been fit, default False.
Returns:
The mean and confidence interval (LB, UB) of the ATE estimate.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
if pretrain:
te = self.predict(X, p)
else:
if not len(treatment) or not len(y):
raise ValueError("treatmeng and y must be provided when pretrain=False")
te = self.fit_predict(X, treatment, y, p, sample_weight, return_ci=False)
ate = np.zeros(self.t_groups.shape[0])
ate_lb = np.zeros(self.t_groups.shape[0])
ate_ub = np.zeros(self.t_groups.shape[0])
for i, group in enumerate(self.t_groups):
w = (treatment == group).astype(int)
prob_treatment = float(sum(w)) / X.shape[0]
_ate = te[:, i].mean()
se = (
np.sqrt(
(self.vars_t[group] / prob_treatment)
+ (self.vars_c[group] / (1 - prob_treatment))
+ te[:, i].var()
)
/ X.shape[0]
)
_ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2)
ate[i] = _ate
ate_lb[i] = _ate_lb
ate_ub[i] = _ate_ub
if not bootstrap_ci:
return ate, ate_lb, ate_ub
else:
t_groups_global = self.t_groups
_classes_global = self._classes
model_mu_global = deepcopy(self.model_mu)
models_tau_global = deepcopy(self.models_tau)
logger.info("Bootstrap Confidence Intervals for ATE")
ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps))
for n in tqdm(range(n_bootstraps)):
if p is None:
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
cate_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size)
ate_bootstraps[:, n] = cate_b.mean()
ate_lower = np.percentile(
ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1
)
ate_upper = np.percentile(
ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1
)
# set member variables back to global (currently last bootstrapped outcome)
self.t_groups = t_groups_global
self._classes = _classes_global
self.model_mu = deepcopy(model_mu_global)
self.models_tau = deepcopy(models_tau_global)
return ate, ate_lower, ate_upper
[docs]class BaseRRegressor(BaseRLearner):
"""
A parent class for R-learner regressor classes.
"""
def __init__(
self,
learner=None,
outcome_learner=None,
effect_learner=None,
propensity_learner=ElasticNetPropensityModel(),
ate_alpha=0.05,
control_name=0,
n_fold=5,
random_state=None,
):
"""Initialize an R-learner regressor.
Args:
learner (optional): a model to estimate outcomes and treatment effects
outcome_learner (optional): a model to estimate outcomes
effect_learner (optional): a model to estimate treatment effects. It needs to take `sample_weight` as an
input argument for `fit()`
propensity_learner (optional): a model to estimate propensity scores. `ElasticNetPropensityModel()` will
be used by default.
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
n_fold (int, optional): the number of cross validation folds for outcome_learner
random_state (int or RandomState, optional): a seed (int) or random number generator (RandomState)
"""
super().__init__(
learner=learner,
outcome_learner=outcome_learner,
effect_learner=effect_learner,
propensity_learner=propensity_learner,
ate_alpha=ate_alpha,
control_name=control_name,
n_fold=n_fold,
random_state=random_state,
)
[docs]class BaseRClassifier(BaseRLearner):
"""
A parent class for R-learner classifier classes.
"""
def __init__(
self,
outcome_learner=None,
effect_learner=None,
propensity_learner=ElasticNetPropensityModel(),
ate_alpha=0.05,
control_name=0,
n_fold=5,
random_state=None,
):
"""Initialize an R-learner classifier.
Args:
outcome_learner: a model to estimate outcomes. Should be a classifier.
effect_learner: a model to estimate treatment effects. It needs to take `sample_weight` as an
input argument for `fit()`. Should be a regressor.
propensity_learner (optional): a model to estimate propensity scores. `ElasticNetPropensityModel()` will
be used by default.
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): name of control group
n_fold (int, optional): the number of cross validation folds for outcome_learner
random_state (int or RandomState, optional): a seed (int) or random number generator (RandomState)
"""
super().__init__(
learner=None,
outcome_learner=outcome_learner,
effect_learner=effect_learner,
propensity_learner=propensity_learner,
ate_alpha=ate_alpha,
control_name=control_name,
n_fold=n_fold,
random_state=random_state,
)
if (outcome_learner is None) and (effect_learner is None):
raise ValueError(
"Either the outcome learner or the effect learner must be specified."
)
[docs] def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True):
"""Fit the treatment effect and outcome models of the R learner.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
sample_weight (np.array or pd.Series, optional): an array of sample weights indicating the
weight of each observation for `effect_learner`. If None, it assumes equal weight.
verbose (bool, optional): whether to output progress logs
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
if sample_weight is not None:
assert len(sample_weight) == len(
y
), "Data length must be equal for sample_weight and the input data"
sample_weight = convert_pd_to_np(sample_weight)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_tau = {group: deepcopy(self.model_tau) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
if verbose:
logger.info("generating out-of-fold CV outcome estimates")
yhat = cross_val_predict(
self.model_mu, X, y, cv=self.cv, method="predict_proba", n_jobs=-1
)[:, 1]
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
yhat_filt = yhat[mask]
p_filt = p[group][mask]
w = (treatment_filt == group).astype(int)
weight = (w - p_filt) ** 2
diff_c = y_filt[w == 0] - yhat_filt[w == 0]
diff_t = y_filt[w == 1] - yhat_filt[w == 1]
if sample_weight is not None:
sample_weight_filt = sample_weight[mask]
sample_weight_filt_c = sample_weight_filt[w == 0]
sample_weight_filt_t = sample_weight_filt[w == 1]
self.vars_c[group] = get_weighted_variance(diff_c, sample_weight_filt_c)
self.vars_t[group] = get_weighted_variance(diff_t, sample_weight_filt_t)
weight *= sample_weight_filt # update weight
else:
self.vars_c[group] = diff_c.var()
self.vars_t[group] = diff_t.var()
if verbose:
logger.info(
"training the treatment effect model for {} with R-loss".format(
group
)
)
self.models_tau[group].fit(
X_filt, (y_filt - yhat_filt) / (w - p_filt), sample_weight=weight
)
[docs] def predict(self, X, p=None):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X = convert_pd_to_np(X)
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
for i, group in enumerate(self.t_groups):
dhat = self.models_tau[group].predict(X)
te[:, i] = dhat
return te
[docs]class XGBRRegressor(BaseRRegressor):
def __init__(
self,
early_stopping=True,
test_size=0.3,
early_stopping_rounds=30,
effect_learner_objective="reg:squarederror",
effect_learner_n_estimators=500,
random_state=42,
*args,
**kwargs,
):
"""Initialize an R-learner regressor with XGBoost model using pairwise ranking objective.
Args:
early_stopping: whether or not to use early stopping when fitting effect learner
test_size (float, optional): the proportion of the dataset to use as validation set when early stopping is
enabled
early_stopping_rounds (int, optional): validation metric needs to improve at least once in every
early_stopping_rounds round(s) to continue training
effect_learner_objective (str, optional): the learning objective for the effect learner
(default = 'reg:squarederror')
effect_learner_n_estimators (int, optional): number of trees to fit for the effect learner (default = 500)
"""
assert isinstance(random_state, int), "random_state should be int."
objective, metric = get_xgboost_objective_metric(effect_learner_objective)
self.effect_learner_objective = objective
self.effect_learner_eval_metric = metric
self.effect_learner_n_estimators = effect_learner_n_estimators
self.early_stopping = early_stopping
if self.early_stopping:
self.test_size = test_size
self.early_stopping_rounds = early_stopping_rounds
super().__init__(
outcome_learner=XGBRegressor(random_state=random_state, *args, **kwargs),
effect_learner=XGBRegressor(
objective=self.effect_learner_objective,
n_estimators=self.effect_learner_n_estimators,
random_state=random_state,
*args,
**kwargs,
),
)
[docs] def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True):
"""Fit the treatment effect and outcome models of the R learner.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the
single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of
float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores.
sample_weight (np.array or pd.Series, optional): an array of sample weights indicating the
weight of each observation for `effect_learner`. If None, it assumes equal weight.
verbose (bool, optional): whether to output progress logs
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
# initialize equal sample weight if it's not provided, for simplicity purpose
sample_weight = (
convert_pd_to_np(sample_weight)
if sample_weight is not None
else convert_pd_to_np(np.ones(len(y)))
)
assert len(sample_weight) == len(
y
), "Data length must be equal for sample_weight and the input data"
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
if p is None:
self._set_propensity_models(X=X, treatment=treatment, y=y)
p = self.propensity
else:
p = self._format_p(p, self.t_groups)
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_tau = {group: deepcopy(self.model_tau) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
if verbose:
logger.info("generating out-of-fold CV outcome estimates")
yhat = cross_val_predict(self.model_mu, X, y, cv=self.cv, n_jobs=-1)
for group in self.t_groups:
treatment_mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[treatment_mask]
w = (treatment_filt == group).astype(int)
X_filt = X[treatment_mask]
y_filt = y[treatment_mask]
yhat_filt = yhat[treatment_mask]
p_filt = p[group][treatment_mask]
sample_weight_filt = sample_weight[treatment_mask]
if verbose:
logger.info(
"training the treatment effect model for {} with R-loss".format(
group
)
)
if self.early_stopping:
(
X_train_filt,
X_test_filt,
y_train_filt,
y_test_filt,
yhat_train_filt,
yhat_test_filt,
w_train,
w_test,
p_train_filt,
p_test_filt,
sample_weight_train_filt,
sample_weight_test_filt,
) = train_test_split(
X_filt,
y_filt,
yhat_filt,
w,
p_filt,
sample_weight_filt,
test_size=self.test_size,
random_state=self.random_state,
)
self.models_tau[group].fit(
X=X_train_filt,
y=(y_train_filt - yhat_train_filt) / (w_train - p_train_filt),
sample_weight=sample_weight_train_filt
* ((w_train - p_train_filt) ** 2),
eval_set=[
(
X_test_filt,
(y_test_filt - yhat_test_filt) / (w_test - p_test_filt),
)
],
sample_weight_eval_set=[
sample_weight_test_filt * ((w_test - p_test_filt) ** 2)
],
eval_metric=self.effect_learner_eval_metric,
early_stopping_rounds=self.early_stopping_rounds,
verbose=verbose,
)
else:
self.models_tau[group].fit(
X_filt,
(y_filt - yhat_filt) / (w - p_filt),
sample_weight=sample_weight_filt * ((w - p_filt) ** 2),
eval_metric=self.effect_learner_eval_metric,
)
diff_c = y_filt[w == 0] - yhat_filt[w == 0]
diff_t = y_filt[w == 1] - yhat_filt[w == 1]
sample_weight_filt_c = sample_weight_filt[w == 0]
sample_weight_filt_t = sample_weight_filt[w == 1]
self.vars_c[group] = get_weighted_variance(diff_c, sample_weight_filt_c)
self.vars_t[group] = get_weighted_variance(diff_t, sample_weight_filt_t)