Source code for causalml.inference.iv.drivlearner

import logging
from copy import deepcopy

import numpy as np
import pandas as pd
from causalml.inference.meta.explainer import Explainer
from causalml.inference.meta.utils import (
    check_treatment_vector,
    check_p_conditions,
    convert_pd_to_np,
)
from causalml.metrics import regression_metrics
from causalml.propensity import compute_propensity_score
from scipy.stats import norm
from sklearn.model_selection import KFold
from tqdm import tqdm
from xgboost import XGBRegressor

logger = logging.getLogger("causalml")


[docs]class BaseDRIVLearner: """A parent class for DRIV-learner regressor classes. A DRIV-learner estimates endogenous treatment effects for compliers with machine learning models. Details of DR-learner are available at `Kennedy (2020) <https://arxiv.org/abs/2004.14497>`_. The DR moment condition for LATE comes from `Chernozhukov et al (2018) <https://academic.oup.com/ectj/article/21/1/C1/5056401>`_. """ def __init__( self, learner=None, control_outcome_learner=None, treatment_outcome_learner=None, treatment_effect_learner=None, ate_alpha=0.05, control_name=0, ): """Initialize a DR-learner. Args: learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment groups control_outcome_learner (optional): a model to estimate outcomes in the control group treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group. It needs to take `sample_weight` as an input argument in `fit()`. ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ assert (learner is not None) or ( (control_outcome_learner is not None) and (treatment_outcome_learner is not None) and (treatment_effect_learner is not None) ) if control_outcome_learner is None: self.model_mu_c = deepcopy(learner) else: self.model_mu_c = control_outcome_learner if treatment_outcome_learner is None: self.model_mu_t = deepcopy(learner) else: self.model_mu_t = treatment_outcome_learner if treatment_effect_learner is None: self.model_tau = deepcopy(learner) else: self.model_tau = treatment_effect_learner self.ate_alpha = ate_alpha self.control_name = control_name self.propensity_1 = None self.propensity_0 = None self.propensity_assign = None def __repr__(self): return ( "{}(control_outcome_learner={},\n" "\ttreatment_outcome_learner={},\n" "\ttreatment_effect_learner={})".format( self.__class__.__name__, self.model_mu_c.__repr__(), self.model_mu_t.__repr__(), self.model_tau.__repr__(), ) )
[docs] def fit( self, X, assignment, treatment, y, p=None, pZ=None, seed=None, calibrate=True ): """Fit the inference model. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix assignment (np.array or pd.Series): a (0,1)-valued assignment vector. The assignment is the instrumental variable that does not depend on unknown confounders. The assignment status influences treatment in a monotonic way, i.e. one can only be more likely to take the treatment if assigned. treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector p (2-tuple of np.ndarray or pd.Series or dict, optional): The first (second) element corresponds to unassigned (assigned) units. Each is an array of propensity scores of float (0,1) in the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1). If None will run ElasticNetPropensityModel() to generate the propensity scores. pZ (np.array or pd.Series, optional): an array of assignment probability of float (0,1); if None will run ElasticNetPropensityModel() to generate the assignment probability score. seed (int): random seed for cross-fitting """ X, treatment, assignment, y = convert_pd_to_np(X, treatment, assignment, y) check_treatment_vector(treatment, self.control_name) self.t_groups = np.unique(treatment[treatment != self.control_name]) self.t_groups.sort() self._classes = {group: i for i, group in enumerate(self.t_groups)} # The estimator splits the data into 3 partitions for cross-fit on the propensity score estimation, # the outcome regression, and the treatment regression on the doubly robust estimates. The use of # the partitions is rotated so we do not lose on the sample size. We do not cross-fit the assignment # score estimation as the assignment process is usually simple. cv = KFold(n_splits=3, shuffle=True, random_state=seed) split_indices = [index for _, index in cv.split(y)] self.models_mu_c = { group: [ deepcopy(self.model_mu_c), deepcopy(self.model_mu_c), deepcopy(self.model_mu_c), ] for group in self.t_groups } self.models_mu_t = { group: [ deepcopy(self.model_mu_t), deepcopy(self.model_mu_t), deepcopy(self.model_mu_t), ] for group in self.t_groups } self.models_tau = { group: [ deepcopy(self.model_tau), deepcopy(self.model_tau), deepcopy(self.model_tau), ] for group in self.t_groups } if p is None: self.propensity_1 = { group: np.zeros(y.shape[0]) for group in self.t_groups } # propensity scores for those assigned self.propensity_0 = { group: np.zeros(y.shape[0]) for group in self.t_groups } # propensity scores for those not assigned if pZ is None: self.propensity_assign, _ = compute_propensity_score( X=X, treatment=assignment, X_pred=X, treatment_pred=assignment, calibrate_p=calibrate, ) else: self.propensity_assign = pZ for ifold in range(3): treatment_idx = split_indices[ifold] outcome_idx = split_indices[(ifold + 1) % 3] tau_idx = split_indices[(ifold + 2) % 3] treatment_treat, treatment_out, treatment_tau = ( treatment[treatment_idx], treatment[outcome_idx], treatment[tau_idx], ) assignment_treat, assignment_out, assignment_tau = ( assignment[treatment_idx], assignment[outcome_idx], assignment[tau_idx], ) y_out, y_tau = y[outcome_idx], y[tau_idx] X_treat, X_out, X_tau = X[treatment_idx], X[outcome_idx], X[tau_idx] pZ_tau = self.propensity_assign[tau_idx] if p is None: logger.info("Generating propensity score") cur_p_1 = dict() cur_p_0 = dict() for group in self.t_groups: mask = (treatment_treat == group) | ( treatment_treat == self.control_name ) mask_1, mask_0 = ( mask & (assignment_treat == 1), mask & (assignment_treat == 0), ) cur_p_1[group], _ = compute_propensity_score( X=X_treat[mask_1], treatment=(treatment_treat[mask_1] == group).astype(int), X_pred=X_tau, treatment_pred=(treatment_tau == group).astype(int), ) if (treatment_treat[mask_0] == group).sum() == 0: cur_p_0[group] = np.zeros(X_tau.shape[0]) else: cur_p_0[group], _ = compute_propensity_score( X=X_treat[mask_0], treatment=(treatment_treat[mask_0] == group).astype(int), X_pred=X_tau, treatment_pred=(treatment_tau == group).astype(int), ) self.propensity_1[group][tau_idx] = cur_p_1[group] self.propensity_0[group][tau_idx] = cur_p_0[group] else: cur_p_1 = dict() cur_p_0 = dict() if isinstance(p[0], (np.ndarray, pd.Series)): cur_p_0 = {self.t_groups[0]: convert_pd_to_np(p[0][tau_idx])} else: cur_p_0 = {g: prop[tau_idx] for g, prop in p[0].items()} check_p_conditions(cur_p_0, self.t_groups) if isinstance(p[1], (np.ndarray, pd.Series)): cur_p_1 = {self.t_groups[0]: convert_pd_to_np(p[1][tau_idx])} else: cur_p_1 = {g: prop[tau_idx] for g, prop in p[1].items()} check_p_conditions(cur_p_1, self.t_groups) logger.info("Generate outcome regressions") for group in self.t_groups: mask = (treatment_out == group) | (treatment_out == self.control_name) mask_1, mask_0 = ( mask & (assignment_out == 1), mask & (assignment_out == 0), ) self.models_mu_c[group][ifold].fit(X_out[mask_0], y_out[mask_0]) self.models_mu_t[group][ifold].fit(X_out[mask_1], y_out[mask_1]) logger.info("Fit pseudo outcomes from the DR formula") for group in self.t_groups: mask = (treatment_tau == group) | (treatment_tau == self.control_name) treatment_filt = treatment_tau[mask] X_filt = X_tau[mask] y_filt = y_tau[mask] w_filt = (treatment_filt == group).astype(int) p_1_filt = cur_p_1[group][mask] p_0_filt = cur_p_0[group][mask] z_filt = assignment_tau[mask] pZ_filt = pZ_tau[mask] mu_t = self.models_mu_t[group][ifold].predict(X_filt) mu_c = self.models_mu_c[group][ifold].predict(X_filt) dr = ( z_filt * (y_filt - mu_t) / pZ_filt - (1 - z_filt) * (y_filt - mu_c) / (1 - pZ_filt) + mu_t - mu_c ) weight = ( z_filt * (w_filt - p_1_filt) / pZ_filt - (1 - z_filt) * (w_filt - p_0_filt) / (1 - pZ_filt) + p_1_filt - p_0_filt ) dr /= weight self.models_tau[group][ifold].fit(X_filt, dr, sample_weight=weight**2)
[docs] def predict(self, X, treatment=None, y=None, return_components=False, verbose=True): """Predict treatment effects. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix treatment (np.array or pd.Series, optional): a treatment vector y (np.array or pd.Series, optional): an outcome vector verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects for compliers, i.e. those individuals who take the treatment only if they are assigned. """ X, treatment, y = convert_pd_to_np(X, treatment, y) te = np.zeros((X.shape[0], self.t_groups.shape[0])) yhat_cs = {} yhat_ts = {} for i, group in enumerate(self.t_groups): models_tau = self.models_tau[group] _te = np.r_[[model.predict(X) for model in models_tau]].mean(axis=0) te[:, i] = np.ravel(_te) yhat_cs[group] = np.r_[ [model.predict(X) for model in self.models_mu_c[group]] ].mean(axis=0) yhat_ts[group] = np.r_[ [model.predict(X) for model in self.models_mu_t[group]] ].mean(axis=0) if (y is not None) and (treatment is not None) and verbose: mask = (treatment == group) | (treatment == self.control_name) treatment_filt = treatment[mask] y_filt = y[mask] w = (treatment_filt == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_cs[group][mask][w == 0] yhat[w == 1] = yhat_ts[group][mask][w == 1] logger.info("Error metrics for group {}".format(group)) regression_metrics(y_filt, yhat, w) if not return_components: return te else: return te, yhat_cs, yhat_ts
[docs] def fit_predict( self, X, assignment, treatment, y, p=None, pZ=None, return_ci=False, n_bootstraps=1000, bootstrap_size=10000, return_components=False, verbose=True, seed=None, calibrate=True, ): """Fit the treatment effect and outcome models of the R learner and predict treatment effects. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix assignment (np.array or pd.Series): a (0,1)-valued assignment vector. The assignment is the instrumental variable that does not depend on unknown confounders. The assignment status influences treatment in a monotonic way, i.e. one can only be more likely to take the treatment if assigned. treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector p (2-tuple of np.ndarray or pd.Series or dict, optional): The first (second) element corresponds to unassigned (assigned) units. Each is an array of propensity scores of float (0,1) in the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1). If None will run ElasticNetPropensityModel() to generate the propensity scores. pZ (np.array or pd.Series, optional): an array of assignment probability of float (0,1); if None will run ElasticNetPropensityModel() to generate the assignment probability score. return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (str): whether to output progress logs seed (int): random seed for cross-fitting Returns: (numpy.ndarray): Predictions of treatment effects for compliers, , i.e. those individuals who take the treatment only if they are assigned. Output dim: [n_samples, n_treatment] If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment], UB [n_samples, n_treatment] """ X, assignment, treatment, y = convert_pd_to_np(X, assignment, treatment, y) self.fit(X, assignment, treatment, y, p, seed, calibrate) if p is None: p = (self.propensity_0, self.propensity_1) else: check_p_conditions(p[0], self.t_groups) check_p_conditions(p[1], self.t_groups) if isinstance(p[0], (np.ndarray, pd.Series)): treatment_name = self.t_groups[0] p = ( {treatment_name: convert_pd_to_np(p[0])}, {treatment_name: convert_pd_to_np(p[1])}, ) elif isinstance(p[0], dict): p = ( { treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p[0].items() }, { treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p[1].items() }, ) if pZ is None: pZ = self.propensity_assign te = self.predict( X, treatment=treatment, y=y, return_components=return_components ) if not return_ci: return te else: t_groups_global = self.t_groups _classes_global = self._classes models_mu_c_global = deepcopy(self.models_mu_c) models_mu_t_global = deepcopy(self.models_mu_t) models_tau_global = deepcopy(self.models_tau) te_bootstraps = np.zeros( shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") for i in tqdm(range(n_bootstraps)): te_b = self.bootstrap( X, assignment, treatment, y, p, pZ, size=bootstrap_size, seed=seed ) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) te_upper = np.percentile( te_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=2 ) # set member variables back to global (currently last bootstrapped outcome) self.t_groups = t_groups_global self._classes = _classes_global self.models_mu_c = deepcopy(models_mu_c_global) self.models_mu_t = deepcopy(models_mu_t_global) self.models_tau = deepcopy(models_tau_global) return (te, te_lower, te_upper)
[docs] def estimate_ate( self, X, assignment, treatment, y, p=None, pZ=None, bootstrap_ci=False, n_bootstraps=1000, bootstrap_size=10000, seed=None, calibrate=True, ): """Estimate the Average Treatment Effect (ATE) for compliers. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix assignment (np.array or pd.Series): an assignment vector. The assignment is the instrumental variable that does not depend on unknown confounders. The assignment status influences treatment in a monotonic way, i.e. one can only be more likely to take the treatment if assigned. treatment (np.array or pd.Series): a treatment vector y (np.array or pd.Series): an outcome vector p (2-tuple of np.ndarray or pd.Series or dict, optional): The first (second) element corresponds to unassigned (assigned) units. Each is an array of propensity scores of float (0,1) in the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1). If None will run ElasticNetPropensityModel() to generate the propensity scores. pZ (np.array or pd.Series, optional): an array of assignment probability of float (0,1); if None will run ElasticNetPropensityModel() to generate the assignment probability score. bootstrap_ci (bool): whether run bootstrap for confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap seed (int): random seed for cross-fitting Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ te, yhat_cs, yhat_ts = self.fit_predict( X, assignment, treatment, y, p, return_components=True, seed=seed, calibrate=calibrate, ) X, assignment, treatment, y = convert_pd_to_np(X, assignment, treatment, y) if p is None: p = (self.propensity_0, self.propensity_1) else: check_p_conditions(p[0], self.t_groups) check_p_conditions(p[1], self.t_groups) if isinstance(p[0], (np.ndarray, pd.Series)): treatment_name = self.t_groups[0] p = ( {treatment_name: convert_pd_to_np(p[0])}, {treatment_name: convert_pd_to_np(p[1])}, ) elif isinstance(p[0], dict): p = ( { treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p[0].items() }, { treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p[1].items() }, ) ate = np.zeros(self.t_groups.shape[0]) ate_lb = np.zeros(self.t_groups.shape[0]) ate_ub = np.zeros(self.t_groups.shape[0]) for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() mask = (treatment == group) | (treatment == self.control_name) mask_1, mask_0 = mask & (assignment == 1), mask & (assignment == 0) Gamma = (treatment[mask_1] == group).mean() - ( treatment[mask_0] == group ).mean() y_filt_1, y_filt_0 = y[mask_1], y[mask_0] yhat_0 = yhat_cs[group][mask_0] yhat_1 = yhat_ts[group][mask_1] treatment_filt_1, treatment_filt_0 = treatment[mask_1], treatment[mask_0] prob_treatment_1, prob_treatment_0 = ( p[1][group][mask_1], p[0][group][mask_0], ) w = (assignment[mask]).mean() part_1 = ( (y_filt_1 - yhat_1).var() + _ate**2 * (treatment_filt_1 - prob_treatment_1).var() - 2 * _ate * (y_filt_1 * treatment_filt_1 - yhat_1 * prob_treatment_1).mean() ) part_0 = ( (y_filt_0 - yhat_0).var() + _ate**2 * (treatment_filt_0 - prob_treatment_0).var() - 2 * _ate * (y_filt_0 * treatment_filt_0 - yhat_0 * prob_treatment_0).mean() ) part_2 = np.mean( ( yhat_ts[group][mask] - yhat_cs[group][mask] - _ate * (p[1][group][mask] - p[0][group][mask]) ) ** 2 ) # SE formula is based on the lower bound formula (9) from Frölich, Markus. 2006. # "Nonparametric IV estimation of local average treatment effects wth covariates." # Journal of Econometrics. se = np.sqrt((part_1 / w + part_0 / (1 - w)) + part_2) / Gamma _ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2) _ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2) ate[i] = _ate ate_lb[i] = _ate_lb ate_ub[i] = _ate_ub if not bootstrap_ci: return ate, ate_lb, ate_ub else: t_groups_global = self.t_groups _classes_global = self._classes models_mu_c_global = deepcopy(self.models_mu_c) models_mu_t_global = deepcopy(self.models_mu_t) models_tau_global = deepcopy(self.models_tau) logger.info("Bootstrap Confidence Intervals for ATE") ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps)) for n in tqdm(range(n_bootstraps)): cate_b = self.bootstrap( X, assignment, treatment, y, p, pZ, size=bootstrap_size, seed=seed ) ate_bootstraps[:, n] = cate_b.mean() ate_lower = np.percentile( ate_bootstraps, (self.ate_alpha / 2) * 100, axis=1 ) ate_upper = np.percentile( ate_bootstraps, (1 - self.ate_alpha / 2) * 100, axis=1 ) # set member variables back to global (currently last bootstrapped outcome) self.t_groups = t_groups_global self._classes = _classes_global self.models_mu_c = deepcopy(models_mu_c_global) self.models_mu_t = deepcopy(models_mu_t_global) self.models_tau = deepcopy(models_tau_global) return ate, ate_lower, ate_upper
[docs] def bootstrap(self, X, assignment, treatment, y, p, pZ, size=10000, seed=None): """Runs a single bootstrap. Fits on bootstrapped sample, then predicts on whole population.""" idxs = np.random.choice(np.arange(0, X.shape[0]), size=size) X_b = X[idxs] if isinstance(p[0], (np.ndarray, pd.Series)): p0_b = {self.t_groups[0]: convert_pd_to_np(p[0][idxs])} else: p0_b = {g: prop[idxs] for g, prop in p[0].items()} if isinstance(p[1], (np.ndarray, pd.Series)): p1_b = {self.t_groups[0]: convert_pd_to_np(p[1][idxs])} else: p1_b = {g: prop[idxs] for g, prop in p[1].items()} pZ_b = pZ[idxs] assignment_b = assignment[idxs] treatment_b = treatment[idxs] y_b = y[idxs] self.fit( X=X_b, assignment=assignment_b, treatment=treatment_b, y=y_b, p=(p0_b, p1_b), pZ=pZ_b, seed=seed, ) te_b = self.predict(X=X) return te_b
[docs] def get_importance( self, X=None, tau=None, model_tau_feature=None, features=None, method="auto", normalize=True, test_size=0.3, random_state=None, ): """ Builds a model (using X to predict estimated/actual tau), and then calculates feature importances based on a specified method. Currently supported methods are: - auto (calculates importance based on estimator's default implementation of feature importance; estimator must be tree-based) Note: if none provided, it uses lightgbm's LGBMRegressor as estimator, and "gain" as importance type - permutation (calculates importance based on mean decrease in accuracy when a feature column is permuted; estimator can be any form) Hint: for permutation, downsample data for better performance especially if X.shape[1] is large Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix tau (np.array): a treatment effect vector (estimated/actual) model_tau_feature (sklearn/lightgbm/xgboost model object): an unfitted model object features (np.array): list/array of feature names. If None, an enumerated list will be used method (str): auto, permutation normalize (bool): normalize by sum of importances if method=auto (defaults to True) test_size (float/int): if float, represents the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples (used for estimating permutation importance) random_state (int/RandomState instance/None): random state used in permutation importance estimation """ explainer = Explainer( method=method, control_name=self.control_name, X=X, tau=tau, model_tau=model_tau_feature, features=features, classes=self._classes, normalize=normalize, test_size=test_size, random_state=random_state, ) return explainer.get_importance()
[docs] def get_shap_values(self, X=None, model_tau_feature=None, tau=None, features=None): """ Builds a model (using X to predict estimated/actual tau), and then calculates shapley values. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix tau (np.array): a treatment effect vector (estimated/actual) model_tau_feature (sklearn/lightgbm/xgboost model object): an unfitted model object features (optional, np.array): list/array of feature names. If None, an enumerated list will be used. """ explainer = Explainer( method="shapley", control_name=self.control_name, X=X, tau=tau, model_tau=model_tau_feature, features=features, classes=self._classes, ) return explainer.get_shap_values()
[docs] def plot_importance( self, X=None, tau=None, model_tau_feature=None, features=None, method="auto", normalize=True, test_size=0.3, random_state=None, ): """ Builds a model (using X to predict estimated/actual tau), and then plots feature importances based on a specified method. Currently supported methods are: - auto (calculates importance based on estimator's default implementation of feature importance; estimator must be tree-based) Note: if none provided, it uses lightgbm's LGBMRegressor as estimator, and "gain" as importance type - permutation (calculates importance based on mean decrease in accuracy when a feature column is permuted; estimator can be any form) Hint: for permutation, downsample data for better performance especially if X.shape[1] is large Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix tau (np.array): a treatment effect vector (estimated/actual) model_tau_feature (sklearn/lightgbm/xgboost model object): an unfitted model object features (optional, np.array): list/array of feature names. If None, an enumerated list will be used method (str): auto, permutation normalize (bool): normalize by sum of importances if method=auto (defaults to True) test_size (float/int): if float, represents the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples (used for estimating permutation importance) random_state (int/RandomState instance/None): random state used in permutation importance estimation """ explainer = Explainer( method=method, control_name=self.control_name, X=X, tau=tau, model_tau=model_tau_feature, features=features, classes=self._classes, normalize=normalize, test_size=test_size, random_state=random_state, ) explainer.plot_importance()
[docs] def plot_shap_values( self, X=None, tau=None, model_tau_feature=None, features=None, shap_dict=None, **kwargs, ): """ Plots distribution of shapley values. If shapley values have been pre-computed, pass it through the shap_dict parameter. If shap_dict is not provided, this builds a new model (using X to predict estimated/actual tau), and then calculates shapley values. Args: X (np.matrix or np.array or pd.Dataframe): a feature matrix. Required if shap_dict is None. tau (np.array): a treatment effect vector (estimated/actual) model_tau_feature (sklearn/lightgbm/xgboost model object): an unfitted model object features (optional, np.array): list/array of feature names. If None, an enumerated list will be used. shap_dict (optional, dict): a dict of shapley value matrices. If None, shap_dict will be computed. """ override_checks = False if shap_dict is None else True explainer = Explainer( method="shapley", control_name=self.control_name, X=X, tau=tau, model_tau=model_tau_feature, features=features, override_checks=override_checks, classes=self._classes, ) explainer.plot_shap_values(shap_dict=shap_dict)
[docs] def plot_shap_dependence( self, treatment_group, feature_idx, X, tau, model_tau_feature=None, features=None, shap_dict=None, interaction_idx="auto", **kwargs, ): """ Plots dependency of shapley values for a specified feature, colored by an interaction feature. If shapley values have been pre-computed, pass it through the shap_dict parameter. If shap_dict is not provided, this builds a new model (using X to predict estimated/actual tau), and then calculates shapley values. This plots the value of the feature on the x-axis and the SHAP value of the same feature on the y-axis. This shows how the model depends on the given feature, and is like a richer extension of the classical partial dependence plots. Vertical dispersion of the data points represents interaction effects. Args: treatment_group (str or int): name of treatment group to create dependency plot on feature_idx (str or int): feature index / name to create dependency plot on X (np.matrix or np.array or pd.Dataframe): a feature matrix tau (np.array): a treatment effect vector (estimated/actual) model_tau_feature (sklearn/lightgbm/xgboost model object): an unfitted model object features (optional, np.array): list/array of feature names. If None, an enumerated list will be used. shap_dict (optional, dict): a dict of shapley value matrices. If None, shap_dict will be computed. interaction_idx (optional, str or int): feature index / name used in coloring scheme as interaction feature. If "auto" then shap.common.approximate_interactions is used to pick what seems to be the strongest interaction (note that to find to true strongest interaction you need to compute the SHAP interaction values). """ override_checks = False if shap_dict is None else True explainer = Explainer( method="shapley", control_name=self.control_name, X=X, tau=tau, model_tau=model_tau_feature, features=features, override_checks=override_checks, classes=self._classes, ) explainer.plot_shap_dependence( treatment_group=treatment_group, feature_idx=feature_idx, shap_dict=shap_dict, interaction_idx=interaction_idx, **kwargs, )
[docs]class BaseDRIVRegressor(BaseDRIVLearner): """ A parent class for DRIV-learner regressor classes. """ def __init__( self, learner=None, control_outcome_learner=None, treatment_outcome_learner=None, treatment_effect_learner=None, ate_alpha=0.05, control_name=0, ): """Initialize a DRIV-learner regressor. Args: learner (optional): a model to estimate outcomes and treatment effects in both the control and treatment groups control_outcome_learner (optional): a model to estimate outcomes in the control group treatment_outcome_learner (optional): a model to estimate outcomes in the treatment group control_effect_learner (optional): a model to estimate treatment effects in the control group treatment_effect_learner (optional): a model to estimate treatment effects in the treatment group. It needs to take `sample_weight` as an input argument in `fit()`. ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ super().__init__( learner=learner, control_outcome_learner=control_outcome_learner, treatment_outcome_learner=treatment_outcome_learner, treatment_effect_learner=treatment_effect_learner, ate_alpha=ate_alpha, control_name=control_name, )
[docs]class XGBDRIVRegressor(BaseDRIVRegressor): def __init__(self, ate_alpha=0.05, control_name=0, *args, **kwargs): """Initialize a DRIV-learner with two XGBoost models.""" super().__init__( learner=XGBRegressor(*args, **kwargs), ate_alpha=ate_alpha, control_name=control_name, )