Source code for causalml.inference.meta.tlearner

import copy
from copy import deepcopy
import logging
import numpy as np
from packaging import version
from scipy.stats import norm
import sklearn
from sklearn.exceptions import ConvergenceWarning
from sklearn.neural_network import MLPRegressor

if version.parse(sklearn.__version__) >= version.parse("0.22.0"):
    from sklearn.utils._testing import ignore_warnings
else:
    from sklearn.utils.testing import ignore_warnings
from tqdm import tqdm
from xgboost import XGBRegressor

from causalml.inference.meta.base import BaseLearner
from causalml.inference.meta.utils import check_treatment_vector, convert_pd_to_np
from causalml.metrics import regression_metrics, classification_metrics

logger = logging.getLogger("causalml")


[docs] class BaseTLearner(BaseLearner): """A parent class for T-learner regressor classes. A T-learner estimates treatment effects with two machine learning models. Details of T-learner are available at `Kunzel et al. (2018) <https://arxiv.org/abs/1706.03461>`_. """ def __init__( self, learner=None, control_learner=None, treatment_learner=None, ate_alpha=0.05, control_name=0, ): """Initialize a T-learner. Args: learner (model): a model to estimate control and treatment outcomes. control_learner (model, optional): a model to estimate control outcomes treatment_learner (model, optional): a model to estimate treatment outcomes ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ assert (learner is not None) or ( (control_learner is not None) and (treatment_learner is not None) ) if control_learner is None: self.model_c = deepcopy(learner) else: self.model_c = control_learner # Preserve the unfitted template so repeated fit() calls always start fresh. self._model_c_template = self.model_c if treatment_learner is None: self.model_t = deepcopy(learner) else: self.model_t = treatment_learner # Preserve the unfitted template so repeated fit() calls always start fresh. self._model_t_template = self.model_t self.ate_alpha = ate_alpha self.control_name = control_name self.bootstrap_models_ = None def __repr__(self): return "{}(model_c={}, model_t={})".format( self.__class__.__name__, self.model_c.__repr__(), self.model_t.__repr__() ) def _unfitted_clone(self): template = copy.copy(self) for attr in ("models_c", "models_t", "bootstrap_models_"): if hasattr(template, attr): delattr(template, attr) template.model_c = self._model_c_template template.model_t = self._model_t_template return template
[docs] @ignore_warnings(category=ConvergenceWarning) def fit( self, X, treatment, y, p=None, store_bootstraps=False, n_bootstraps=200, bootstrap_size=10000, random_state=None, n_jobs=1, ): """Fit the inference model Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector p: unused, kept for API consistency store_bootstraps (bool, optional): if True, trains a bootstrap ensemble during fit and stores it in self.bootstrap_models_ for post-fit CI estimation via predict(return_ci=True). Default: False. n_bootstraps (int, optional): number of bootstrap iterations. Default: 200. Note: storing N bootstraps of a GBM-based learner with k treatment groups holds 2*N*k model objects in memory. Monitor RAM for large N or heavy base learners. n_jobs (int, optional): number of parallel jobs for bootstrap fitting. -1 uses all available cores. Default: 1. bootstrap_size (int, optional): number of samples per bootstrap. Default: 10000. random_state (int, optional): random seed for reproducible bootstrap sampling. """ X, treatment, y = convert_pd_to_np(X, treatment, y) check_treatment_vector(treatment, self.control_name) self.t_groups = np.unique(treatment[treatment != self.control_name]) self.t_groups.sort() self._classes = {group: i for i, group in enumerate(self.t_groups)} self.models_t = {group: deepcopy(self.model_t) for group in self.t_groups} # model_c is trained on the control group, which is identical for every # treatment group, so fit it once. Deepcopy from the unfitted template so # re-calling fit() always starts from a clean state (safe with warm_start). control_mask = treatment == self.control_name self.model_c = deepcopy(self._model_c_template) self.model_c.fit(X[control_mask], y[control_mask]) # Expose as a shared-reference dict to preserve the public models_c API. self.models_c = {group: self.model_c for group in self.t_groups} for group in self.t_groups: treatment_mask = treatment == group self.models_t[group].fit(X[treatment_mask], y[treatment_mask]) if store_bootstraps: self.fit_bootstrap_ensemble( X=X, treatment=treatment, y=y, n_bootstraps=n_bootstraps, bootstrap_size=bootstrap_size, random_state=random_state, n_jobs=n_jobs, ) else: self.bootstrap_models_ = None
def _compute_bootstrap_ci(self, X): """Compute bootstrap CI using stored ensemble. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix Returns: (te_lower, te_upper): percentile CI bounds, each of shape [n_samples, n_treatment] """ if self.bootstrap_models_ is None: raise ValueError( "No bootstrap ensemble found. Call fit(..., store_bootstraps=True) first." ) te_bootstraps = np.zeros( (X.shape[0], self.t_groups.shape[0], len(self.bootstrap_models_)) ) for b, learner_b in enumerate(self.bootstrap_models_): te_bootstraps[:, :, b] = learner_b.predict(X) te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) te_upper = np.percentile(te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2) return te_lower, te_upper
[docs] def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True, return_ci=False, ): """Predict treatment effects. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series, optional): a treatment vector y (np.array or pd.Series, optional): an outcome vector return_components (bool, optional): whether to return outcome for treatment and control separately verbose (bool, optional): whether to output progress logs return_ci (bool, optional): whether to return confidence intervals using the stored bootstrap ensemble. Requires fit() to have been called with store_bootstraps=True. CI width is controlled by self.ate_alpha set at init time. Returns: (numpy.ndarray): Predictions of treatment effects. If return_ci=True, returns (te, te_lower, te_upper) each of shape [n_samples, n_treatment]. return_ci=True and return_components=True cannot be used together. """ if return_ci and return_components: raise ValueError("return_ci and return_components cannot both be True.") X, treatment, y = convert_pd_to_np(X, treatment, y) yhat_ts = {} yhat_c = self.model_c.predict(X) # Build a shared-reference dict so return_components callers keep the # yhat_cs[group] indexing API without duplicating the underlying array. yhat_cs = {group: yhat_c for group in self.t_groups} for group in self.t_groups: yhat_ts[group] = self.models_t[group].predict(X) if (y is not None) and (treatment is not None) and verbose: mask = (treatment == group) | (treatment == self.control_name) treatment_filt = treatment[mask] y_filt = y[mask] w = (treatment_filt == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] yhat[w == 1] = yhat_ts[group][mask][w == 1] logger.info("Error metrics for group {}".format(group)) regression_metrics(y_filt, yhat, w) te = np.zeros((X.shape[0], self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_c if return_ci: te_lower, te_upper = self._compute_bootstrap_ci(X) return te, te_lower, te_upper if not return_components: return te else: return te, yhat_cs, yhat_ts
[docs] def fit_predict( self, X, treatment, y, p=None, return_ci=False, n_bootstraps=1000, bootstrap_size=10000, return_components=False, verbose=True, ): """Fit the inference model of the T learner and predict treatment effects. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (str): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment]. If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment], UB [n_samples, n_treatment] """ X, treatment, y = convert_pd_to_np(X, treatment, y) self.fit(X, treatment, y) te = self.predict(X, treatment, y, return_components=return_components) if not return_ci: return te else: t_groups_global = self.t_groups _classes_global = self._classes model_c_global = deepcopy(self.model_c) models_t_global = deepcopy(self.models_t) te_bootstraps = np.zeros( shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") for i in tqdm(range(n_bootstraps)): te_b = self.bootstrap(X, treatment, y, size=bootstrap_size) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) te_upper = np.percentile( te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2 ) # set member variables back to global (currently last bootstrapped outcome) self.t_groups = t_groups_global self._classes = _classes_global self.model_c = deepcopy(model_c_global) self.models_c = {group: self.model_c for group in self.t_groups} self.models_t = deepcopy(models_t_global) return (te, te_lower, te_upper)
[docs] def estimate_ate( self, X, treatment, y, p=None, bootstrap_ci=False, n_bootstraps=1000, bootstrap_size=10000, pretrain=False, ): """Estimate the Average Treatment Effect (ATE). Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector bootstrap_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap Returns: The mean and confidence interval (LB, UB) of the ATE estimate. pretrain (bool): whether a model has been fit, default False. """ X, treatment, y = convert_pd_to_np(X, treatment, y) if pretrain: te, yhat_cs, yhat_ts = self.predict(X, treatment, y, return_components=True) else: te, yhat_cs, yhat_ts = self.fit_predict( X, treatment, y, return_components=True ) ate = np.zeros(self.t_groups.shape[0]) ate_lb = np.zeros(self.t_groups.shape[0]) ate_ub = np.zeros(self.t_groups.shape[0]) for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() mask = (treatment == group) | (treatment == self.control_name) treatment_filt = treatment[mask] y_filt = y[mask] w = (treatment_filt == group).astype(int) prob_treatment = float(sum(w)) / w.shape[0] yhat_c = yhat_cs[group][mask] yhat_t = yhat_ts[group][mask] se = np.sqrt( ( (y_filt[w == 0] - yhat_c[w == 0]).var() / (1 - prob_treatment) + (y_filt[w == 1] - yhat_t[w == 1]).var() / prob_treatment + (yhat_t - yhat_c).var() ) / y_filt.shape[0] ) _ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2) _ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2) ate[i] = _ate ate_lb[i] = _ate_lb ate_ub[i] = _ate_ub if not bootstrap_ci: return ate, ate_lb, ate_ub else: t_groups_global = self.t_groups _classes_global = self._classes model_c_global = deepcopy(self.model_c) models_t_global = deepcopy(self.models_t) logger.info("Bootstrap Confidence Intervals for ATE") ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps)) for n in tqdm(range(n_bootstraps)): ate_b = self.bootstrap(X, treatment, y, size=bootstrap_size) ate_bootstraps[:, n] = ate_b.mean(axis=0) ate_lower = np.percentile( ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1 ) ate_upper = np.percentile( ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1 ) # set member variables back to global (currently last bootstrapped outcome) self.t_groups = t_groups_global self._classes = _classes_global self.model_c = deepcopy(model_c_global) self.models_c = {group: self.model_c for group in self.t_groups} self.models_t = deepcopy(models_t_global) return ate, ate_lower, ate_upper
[docs] class BaseTRegressor(BaseTLearner): """ A parent class for T-learner regressor classes. """ def __init__( self, learner=None, control_learner=None, treatment_learner=None, ate_alpha=0.05, control_name=0, ): """Initialize a T-learner regressor. Args: learner (model): a model to estimate control and treatment outcomes. control_learner (model, optional): a model to estimate control outcomes treatment_learner (model, optional): a model to estimate treatment outcomes ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ super().__init__( learner=learner, control_learner=control_learner, treatment_learner=treatment_learner, ate_alpha=ate_alpha, control_name=control_name, )
[docs] class BaseTClassifier(BaseTLearner): """ A parent class for T-learner classifier classes. """ def __init__( self, learner=None, control_learner=None, treatment_learner=None, ate_alpha=0.05, control_name=0, ): """Initialize a T-learner classifier. Args: learner (model): a model to estimate control and treatment outcomes. control_learner (model, optional): a model to estimate control outcomes treatment_learner (model, optional): a model to estimate treatment outcomes ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ super().__init__( learner=learner, control_learner=control_learner, treatment_learner=treatment_learner, ate_alpha=ate_alpha, control_name=control_name, )
[docs] def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True, return_ci=False, ): """Predict treatment effects. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series, optional): a treatment vector y (np.array or pd.Series, optional): an outcome vector verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. """ if return_ci and return_components: raise ValueError("return_ci and return_components cannot both be True.") yhat_ts = {} yhat_c = self.model_c.predict_proba(X)[:, 1] yhat_cs = {group: yhat_c for group in self.t_groups} for group in self.t_groups: yhat_ts[group] = self.models_t[group].predict_proba(X)[:, 1] if (y is not None) and (treatment is not None) and verbose: mask = (treatment == group) | (treatment == self.control_name) treatment_filt = treatment[mask] y_filt = y[mask] w = (treatment_filt == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] yhat[w == 1] = yhat_ts[group][mask][w == 1] logger.info("Error metrics for group {}".format(group)) classification_metrics(y_filt, yhat, w) te = np.zeros((X.shape[0], self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_c if return_ci: te_lower, te_upper = self._compute_bootstrap_ci(X) return te, te_lower, te_upper if not return_components: return te else: return te, yhat_cs, yhat_ts
[docs] class XGBTRegressor(BaseTRegressor): def __init__(self, ate_alpha=0.05, control_name=0, *args, **kwargs): """Initialize a T-learner with two XGBoost models.""" super().__init__( learner=XGBRegressor(*args, **kwargs), ate_alpha=ate_alpha, control_name=control_name, )
[docs] class MLPTRegressor(BaseTRegressor): def __init__(self, ate_alpha=0.05, control_name=0, *args, **kwargs): """Initialize a T-learner with two MLP models.""" super().__init__( learner=MLPRegressor(*args, **kwargs), ate_alpha=ate_alpha, control_name=control_name, )