Source code for causalml.metrics.sensitivity

import logging
import numpy as np
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
from importlib import import_module

logger = logging.getLogger('sensitivity')


def one_sided(alpha, p, treatment):
    """One sided confounding function.
    Reference:  Blackwell, Matthew. "A selection bias approach to sensitivity analysis
    for causal effects." Political Analysis 22.2 (2014): 169-182.
    https://www.mattblackwell.org/files/papers/causalsens.pdf

    Args:
        alpha (np.array): a confounding values vector
        p (np.array): a propensity score vector between 0 and 1
        treatment (np.array): a treatment vector (1 if treated, otherwise 0)
    """
    assert p.shape[0] == treatment.shape[0]
    adj = alpha * (1 - p) * treatment - alpha * p * (1 - treatment)
    return adj


def alignment(alpha, p, treatment):
    """Alignment confounding function.
    Reference:  Blackwell, Matthew. "A selection bias approach to sensitivity analysis
    for causal effects." Political Analysis 22.2 (2014): 169-182.
    https://www.mattblackwell.org/files/papers/causalsens.pdf

    Args:
        alpha (np.array): a confounding values vector
        p (np.array): a propensity score vector between 0 and 1
        treatment (np.array): a treatment vector (1 if treated, otherwise 0)
    """

    assert p.shape[0] == treatment.shape[0]
    adj = alpha * (1 - p) * treatment + alpha * p * (1 - treatment)
    return adj


def one_sided_att(alpha, p, treatment):
    """One sided confounding function for the average effect of the treatment among the treated units (ATT)

    Reference:  Blackwell, Matthew. "A selection bias approach to sensitivity analysis
    for causal effects." Political Analysis 22.2 (2014): 169-182.
    https://www.mattblackwell.org/files/papers/causalsens.pdf

    Args:
        alpha (np.array): a confounding values vector
        p (np.array): a propensity score vector between 0 and 1
        treatment (np.array): a treatment vector (1 if treated, otherwise 0)
    """
    assert p.shape[0] == treatment.shape[0]
    adj = alpha * (1 - treatment)
    return adj


def alignment_att(alpha, p, treatment):
    """Alignment confounding function for the average effect of the treatment among the treated units (ATT)

    Reference:  Blackwell, Matthew. "A selection bias approach to sensitivity analysis
    for causal effects." Political Analysis 22.2 (2014): 169-182.
    https://www.mattblackwell.org/files/papers/causalsens.pdf

    Args:
        alpha (np.array): a confounding values vector
        p (np.array): a propensity score vector between 0 and 1
        treatment (np.array): a treatment vector (1 if treated, otherwise 0)
    """
    assert p.shape[0] == treatment.shape[0]
    adj = alpha * (1 - treatment)
    return adj


[docs]class Sensitivity(object): """ A Sensitivity Check class to support Placebo Treatment, Irrelevant Additional Confounder and Subset validation refutation methods to verify causal inference. Reference: https://github.com/microsoft/dowhy/blob/master/dowhy/causal_refuters/ """ def __init__(self, df, inference_features, p_col, treatment_col, outcome_col, learner, *args, **kwargs): """Initialize. Args: df (pd.DataFrame): input data frame inferenece_features (list of str): a list of columns that used in learner for inference p_col (str): column name of propensity score treatment_col (str): column name of whether in treatment of control outcome_col (str): column name of outcome learner (model): a model to estimate outcomes and treatment effects """ self.df = df self.inference_features = inference_features self.p_col = p_col self.treatment_col = treatment_col self.outcome_col = outcome_col self.learner = learner
[docs] def get_prediction(self, X, p, treatment, y): """Return the treatment effects prediction. Args: X (np.matrix): a feature matrix p (np.array): a propensity score vector between 0 and 1 treatment (np.array): a treatment vector (1 if treated, otherwise 0) y (np.array): an outcome vector Returns: (numpy.ndarray): Predictions of treatment effects """ learner = self.learner try: preds = learner.fit_predict(X=X, p=p, treatment=treatment, y=y).flatten() except TypeError: preds = learner.fit_predict(X=X, treatment=treatment, y=y).flatten() return preds
[docs] def get_ate_ci(self, X, p, treatment, y): """Return the confidence intervals for treatment effects prediction. Args: X (np.matrix): a feature matrix p (np.array): a propensity score vector between 0 and 1 treatment (np.array): a treatment vector (1 if treated, otherwise 0) y (np.array): an outcome vector Returns: (numpy.ndarray): Mean and confidence interval (LB, UB) of the ATE estimate. """ learner = self.learner from ..inference.meta.tlearner import BaseTLearner if isinstance(learner, BaseTLearner): ate, ate_lower, ate_upper = learner.estimate_ate(X=X, treatment=treatment, y=y) else: try: ate, ate_lower, ate_upper = learner.estimate_ate(X=X, p=p, treatment=treatment, y=y) except TypeError: ate, ate_lower, ate_upper = learner.estimate_ate(X=X, treatment=treatment, y=y, return_ci=True) return ate[0], ate_lower[0], ate_upper[0]
[docs] @staticmethod def get_class_object(method_name, *args, **kwargs): """Return class object based on input method Args: method_name (list of str): a list of sensitivity analysis method Returns: (class): Sensitivy Class """ method_list = ['Placebo Treatment', 'Random Cause', 'Subset Data', 'Random Replace', 'Selection Bias'] class_name = 'Sensitivity' + method_name.replace(' ', '') try: getattr(import_module('causalml.metrics.sensitivity'), class_name) return getattr(import_module('causalml.metrics.sensitivity'), class_name) except AttributeError: raise AttributeError('{} is not an existing method for sensitiviy analysis.'.format(method_name) + ' Select one of {}'.format(method_list))
[docs] def sensitivity_analysis(self, methods, sample_size=None, confound='one_sided', alpha_range=None): """Return the sensitivity data by different method Args: method (list of str): a list of sensitivity analysis method sample_size (float, optional): ratio for subset the original data confound (string, optional): the name of confouding function alpha_range (np.array, optional): a parameter to pass the confounding function Returns: X (np.matrix): a feature matrix p (np.array): a propensity score vector between 0 and 1 treatment (np.array): a treatment vector (1 if treated, otherwise 0) y (np.array): an outcome vector """ if alpha_range is None: y = self.df[self.outcome_col] iqr = y.quantile(.75) - y.quantile(.25) alpha_range = np.linspace(-iqr/2, iqr/2, 11) if 0 not in alpha_range: alpha_range = np.append(alpha_range, 0) else: alpha_range = alpha_range alpha_range.sort() summary_df = pd.DataFrame(columns=['Method', 'ATE', 'New ATE', 'New ATE LB', 'New ATE UB']) for method in methods: sens = self.get_class_object(method) sens = sens(self.df, self.inference_features, self.p_col, self.treatment_col, self.outcome_col, self.learner, sample_size=sample_size, confound=confound, alpha_range=alpha_range) if method == 'Subset Data': method = method + '(sample size @{})'.format(sample_size) sens_df = sens.summary(method=method) summary_df = summary_df.append(sens_df) return summary_df
[docs] def summary(self, method): """Summary report Args: method_name (str): sensitivity analysis method Returns: (pd.DataFrame): a summary dataframe """ method_name = method X = self.df[self.inference_features].values p = self.df[self.p_col].values treatment = self.df[self.treatment_col].values y = self.df[self.outcome_col].values preds = self.get_prediction(X, p, treatment, y) ate = preds.mean() ate_new, ate_new_lower, ate_new_upper = self.sensitivity_estimate() sensitivity_summary = pd.DataFrame([method_name, ate, ate_new, ate_new_lower, ate_new_upper]).T sensitivity_summary.columns = ['Method', 'ATE', 'New ATE', 'New ATE LB', 'New ATE UB'] return sensitivity_summary
[docs] def sensitivity_estimate(self): raise NotImplementedError
[docs]class SensitivityPlaceboTreatment(Sensitivity): """Replaces the treatment variable with a new variable randomly generated. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def sensitivity_estimate(self): """Summary report Args: return_ci (str): sensitivity analysis method Returns: (pd.DataFrame): a summary dataframe """ num_rows = self.df.shape[0] X = self.df[self.inference_features].values p = self.df[self.p_col].values treatment_new = np.random.randint(2, size=num_rows) y = self.df[self.outcome_col].values ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X, p, treatment_new, y) return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivityRandomCause(Sensitivity): """Adds an irrelevant random covariate to the dataframe. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def sensitivity_estimate(self): num_rows = self.df.shape[0] new_data = np.random.randn(num_rows) X = self.df[self.inference_features].values p = self.df[self.p_col].values treatment = self.df[self.treatment_col].values y = self.df[self.outcome_col].values X_new = np.hstack((X, new_data.reshape((-1, 1)))) ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X_new, p, treatment, y) return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivityRandomReplace(Sensitivity): """Replaces a random covariate with an irrelevant variable. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if 'replaced_feature' not in kwargs: replaced_feature_index = np.random.randint(len(self.inference_features)) self.replaced_feature = self.inference_features[replaced_feature_index] else: self.replaced_feature = kwargs["replaced_feature"]
[docs] def sensitivity_estimate(self): """Replaces a random covariate with an irrelevant variable. """ logger.info('Replace feature {} with an random irrelevant variable'.format(self.replaced_feature)) df_new = self.df.copy() num_rows = self.df.shape[0] df_new[self.replaced_feature] = np.random.randn(num_rows) X_new = df_new[self.inference_features].values p_new = df_new[self.p_col].values treatment_new = df_new[self.treatment_col].values y_new = df_new[self.outcome_col].values ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X_new, p_new, treatment_new, y_new) return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivitySubsetData(Sensitivity): """Takes a random subset of size sample_size of the data. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.sample_size = kwargs["sample_size"] assert (self.sample_size is not None)
[docs] def sensitivity_estimate(self): df_new = self.df.sample(frac=self.sample_size).copy() X_new = df_new[self.inference_features].values p_new = df_new[self.p_col].values treatment_new = df_new[self.treatment_col].values y_new = df_new[self.outcome_col].values ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X_new, p_new, treatment_new, y_new) return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivitySelectionBias(Sensitivity): """Reference: [1] Blackwell, Matthew. "A selection bias approach to sensitivity analysis for causal effects." Political Analysis 22.2 (2014): 169-182. https://www.mattblackwell.org/files/papers/causalsens.pdf [2] Confouding parameter alpha_range using the same range as in: https://github.com/mattblackwell/causalsens/blob/master/R/causalsens.R """ def __init__(self, *args, confound='one_sided', alpha_range=None, sensitivity_features=None, **kwargs): super().__init__(*args, **kwargs) """Initialize. Args: confound (string): the name of confouding function alpha_range (np.array): a parameter to pass the confounding function sensitivity_features (list of str): ): a list of columns that to check each individual partial r-square """ logger.info('Only works for linear outcome models right now. Check back soon.') confounding_functions = {'one_sided': one_sided, 'alignment': alignment, 'one_sided_att': one_sided_att, 'alignment_att': alignment_att} try: confound_func = confounding_functions[confound] except KeyError: raise NotImplementedError(f'Confounding function, {confound} is not implemented. \ Use one of {confounding_functions.keys()}') self.confound = confound_func if sensitivity_features is None: self.sensitivity_features = self.inference_features else: self.sensitivity_features = sensitivity_features if alpha_range is None: y = self.df[self.outcome_col] iqr = y.quantile(.75) - y.quantile(.25) self.alpha_range = np.linspace(-iqr/2, iqr/2, 11) if 0 not in self.alpha_range: self.alpha_range = np.append(self.alpha_range, 0) else: self.alpha_range = alpha_range self.alpha_range.sort()
[docs] def causalsens(self): alpha_range = self.alpha_range confound = self.confound df = self.df X = df[self.inference_features].values p = df[self.p_col].values treatment = df[self.treatment_col].values y = df[self.outcome_col].values preds = self.get_prediction(X, p, treatment, y) sens_df = pd.DataFrame() for a in alpha_range: sens = defaultdict(list) sens['alpha'] = a adj = confound(a, p, treatment) preds_adj = y - adj s_preds = self.get_prediction(X, p, treatment, preds_adj) ate, ate_lb, ate_ub = self.get_ate_ci(X, p, treatment, preds_adj) s_preds_residul = preds_adj - s_preds sens['rsqs'] = a**2*np.var(treatment)/np.var(s_preds_residul) sens['New ATE'] = ate sens['New ATE LB'] = ate_lb sens['New ATE UB'] = ate_ub sens_df = sens_df.append(pd.DataFrame(sens, index=[0])) rss = np.sum(np.square(y - preds)) partial_rsqs = [] for feature in self.sensitivity_features: df_new = df.copy() X_new = df_new[self.inference_features].drop(feature, axis=1).copy() y_new_preds = self.get_prediction(X_new, p, treatment, y) rss_new = np.sum(np.square(y - y_new_preds)) partial_rsqs.append(((rss_new - rss)/rss)) partial_rsqs_df = pd.DataFrame([self.sensitivity_features, partial_rsqs]).T partial_rsqs_df.columns = ['feature', 'partial_rsqs'] return sens_df, partial_rsqs_df
[docs] def summary(self, method='Selection Bias'): """Summary report for Selection Bias Method Args: method_name (str): sensitivity analysis method Returns: (pd.DataFrame): a summary dataframe """ method_name = method sensitivity_summary = self.causalsens()[0] sensitivity_summary['Method'] = [method_name + ' (alpha@' + str(round(i, 5)) + ', with r-sqaure:' for i in sensitivity_summary.alpha] sensitivity_summary['Method'] = sensitivity_summary['Method'] + sensitivity_summary['rsqs'].round(5).astype(str) sensitivity_summary['ATE'] = sensitivity_summary[sensitivity_summary.alpha == 0]['New ATE'] return sensitivity_summary[['Method', 'ATE', 'New ATE', 'New ATE LB', 'New ATE UB']]
[docs] @staticmethod def plot(sens_df, partial_rsqs_df=None, type='raw', ci=False, partial_rsqs=False): """Plot the results of a sensitivity analysis against unmeasured Args: sens_df (pandas.DataFrame): a data frame output from causalsens partial_rsqs_d (pandas.DataFrame) : a data frame output from causalsens including partial rsqure type (str, optional): the type of plot to draw, 'raw' or 'r.squared' are supported ci (bool, optional): whether plot confidence intervals partial_rsqs (bool, optional): whether plot partial rsquare results """ if type == 'raw' and not ci: fig, ax = plt.subplots() y_max = round(sens_df['New ATE UB'].max()*1.1, 4) y_min = round(sens_df['New ATE LB'].min()*0.9, 4) x_max = round(sens_df.alpha.max()*1.1, 4) x_min = round(sens_df.alpha.min()*0.9, 4) plt.ylim(y_min, y_max) plt.xlim(x_min, x_max) ax.plot(sens_df.alpha, sens_df['New ATE']) elif type == 'raw' and ci: fig, ax = plt.subplots() y_max = round(sens_df['New ATE UB'].max()*1.1, 4) y_min = round(sens_df['New ATE LB'].min()*0.9, 4) x_max = round(sens_df.alpha.max()*1.1, 4) x_min = round(sens_df.alpha.min()*0.9, 4) plt.ylim(y_min, y_max) plt.xlim(x_min, x_max) ax.fill_between(sens_df.alpha, sens_df['New ATE LB'], sens_df['New ATE UB'], color='gray', alpha=0.5) ax.plot(sens_df.alpha, sens_df['New ATE']) elif type == 'r.squared' and ci: fig, ax = plt.subplots() y_max = round(sens_df['New ATE UB'].max()*1.1, 4) y_min = round(sens_df['New ATE LB'].min()*0.9, 4) plt.ylim(y_min, y_max) ax.fill_between(sens_df.rsqs, sens_df['New ATE LB'], sens_df['New ATE UB'], color='gray', alpha=0.5) ax.plot(sens_df.rsqs, sens_df['New ATE']) if partial_rsqs: plt.scatter(partial_rsqs_df.partial_rsqs, list(sens_df[sens_df.alpha == 0]['New ATE']) * partial_rsqs_df.shape[0], marker='x', color="red", linewidth=10) elif type == 'r.squared' and not ci: fig, ax = plt.subplots() y_max = round(sens_df['New ATE UB'].max()*1.1, 4) y_min = round(sens_df['New ATE LB'].min()*0.9, 4) plt.ylim(y_min, y_max) plt.plot(sens_df.rsqs, sens_df['New ATE']) if partial_rsqs: plt.scatter(partial_rsqs_df.partial_rsqs, list(sens_df[sens_df.alpha == 0]['New ATE']) * partial_rsqs_df.shape[0], marker='x', color="red", linewidth=10)
[docs] @staticmethod def partial_rsqs_confounding(sens_df, feature_name, partial_rsqs_value, range=0.01): """Check partial rsqs values of feature corresponding confounding amonunt of ATE Args: sens_df (pandas.DataFrame): a data frame output from causalsens feature_name (str): feature name to check partial_rsqs_value (float) : partial rsquare value of feature range (float) : range to search from sens_df Return: min and max value of confounding amount """ rsqs_dict = [] for i in sens_df.rsqs: if partial_rsqs_value - partial_rsqs_value*range < i < partial_rsqs_value + partial_rsqs_value*range: rsqs_dict.append(i) if rsqs_dict: confounding_min = sens_df[sens_df.rsqs.isin(rsqs_dict)].alpha.min() confounding_max = sens_df[sens_df.rsqs.isin(rsqs_dict)].alpha.max() logger.info('Only works for linear outcome models right now. Check back soon.') logger.info('For feature {} with partial rsquare {} confounding amount with possible values: {}, {}'.format( feature_name, partial_rsqs_value, confounding_min, confounding_max)) return [confounding_min, confounding_max] else: logger.info('Cannot find correponding rsquare value within the range for input, please edit confounding', 'values vector or use a larger range and try again')