import logging
import numpy as np
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
from importlib import import_module
logger = logging.getLogger("sensitivity")
SUMMARY_COLS = ["Method", "ATE", "New ATE", "New ATE LB", "New ATE UB"]
def one_sided(alpha, p, treatment):
"""One sided confounding function.
Reference: Blackwell, Matthew. "A selection bias approach to sensitivity analysis
for causal effects." Political Analysis 22.2 (2014): 169-182.
https://www.mattblackwell.org/files/papers/causalsens.pdf
Args:
alpha (np.array): a confounding values vector
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
"""
assert p.shape[0] == treatment.shape[0]
adj = alpha * (1 - p) * treatment - alpha * p * (1 - treatment)
return adj
def alignment(alpha, p, treatment):
"""Alignment confounding function.
Reference: Blackwell, Matthew. "A selection bias approach to sensitivity analysis
for causal effects." Political Analysis 22.2 (2014): 169-182.
https://www.mattblackwell.org/files/papers/causalsens.pdf
Args:
alpha (np.array): a confounding values vector
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
"""
assert p.shape[0] == treatment.shape[0]
adj = alpha * (1 - p) * treatment + alpha * p * (1 - treatment)
return adj
def one_sided_att(alpha, p, treatment):
"""One sided confounding function for the average effect of the treatment among the treated units (ATT)
Reference: Blackwell, Matthew. "A selection bias approach to sensitivity analysis
for causal effects." Political Analysis 22.2 (2014): 169-182.
https://www.mattblackwell.org/files/papers/causalsens.pdf
Args:
alpha (np.array): a confounding values vector
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
"""
assert p.shape[0] == treatment.shape[0]
adj = alpha * (1 - treatment)
return adj
def alignment_att(alpha, p, treatment):
"""Alignment confounding function for the average effect of the treatment among the treated units (ATT)
Reference: Blackwell, Matthew. "A selection bias approach to sensitivity analysis
for causal effects." Political Analysis 22.2 (2014): 169-182.
https://www.mattblackwell.org/files/papers/causalsens.pdf
Args:
alpha (np.array): a confounding values vector
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
"""
assert p.shape[0] == treatment.shape[0]
adj = alpha * (1 - treatment)
return adj
[docs]class Sensitivity(object):
"""A Sensitivity Check class to support Placebo Treatment, Irrelevant Additional Confounder
and Subset validation refutation methods to verify causal inference.
Reference: https://github.com/microsoft/dowhy/blob/master/dowhy/causal_refuters/
"""
def __init__(
self,
df,
inference_features,
p_col,
treatment_col,
outcome_col,
learner,
*args,
**kwargs,
):
"""Initialize.
Args:
df (pd.DataFrame): input data frame
inferenece_features (list of str): a list of columns that used in learner for inference
p_col (str): column name of propensity score
treatment_col (str): column name of whether in treatment of control
outcome_col (str): column name of outcome
learner (model): a model to estimate outcomes and treatment effects
"""
self.df = df
self.inference_features = inference_features
self.p_col = p_col
self.treatment_col = treatment_col
self.outcome_col = outcome_col
self.learner = learner
[docs] def get_prediction(self, X, p, treatment, y):
"""Return the treatment effects prediction.
Args:
X (np.matrix): a feature matrix
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
y (np.array): an outcome vector
Returns:
(numpy.ndarray): Predictions of treatment effects
"""
learner = self.learner
try:
preds = learner.fit_predict(X=X, p=p, treatment=treatment, y=y).flatten()
except TypeError:
preds = learner.fit_predict(X=X, treatment=treatment, y=y).flatten()
return preds
[docs] def get_ate_ci(self, X, p, treatment, y):
"""Return the confidence intervals for treatment effects prediction.
Args:
X (np.matrix): a feature matrix
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
y (np.array): an outcome vector
Returns:
(numpy.ndarray): Mean and confidence interval (LB, UB) of the ATE estimate.
"""
learner = self.learner
from ..inference.meta.tlearner import BaseTLearner
if isinstance(learner, BaseTLearner):
ate, ate_lower, ate_upper = learner.estimate_ate(
X=X, treatment=treatment, y=y
)
else:
try:
ate, ate_lower, ate_upper = learner.estimate_ate(
X=X, p=p, treatment=treatment, y=y
)
except TypeError:
ate, ate_lower, ate_upper = learner.estimate_ate(
X=X, treatment=treatment, y=y, return_ci=True
)
return ate[0], ate_lower[0], ate_upper[0]
[docs] @staticmethod
def get_class_object(method_name, *args, **kwargs):
"""Return class object based on input method
Args:
method_name (list of str): a list of sensitivity analysis method
Returns:
(class): Sensitivy Class
"""
method_list = [
"Placebo Treatment",
"Random Cause",
"Subset Data",
"Random Replace",
"Selection Bias",
]
class_name = "Sensitivity" + method_name.replace(" ", "")
try:
getattr(import_module("causalml.metrics.sensitivity"), class_name)
return getattr(import_module("causalml.metrics.sensitivity"), class_name)
except AttributeError:
raise AttributeError(
"{} is not an existing method for sensitiviy analysis.".format(
method_name
)
+ " Select one of {}".format(method_list)
)
[docs] def sensitivity_analysis(
self, methods, sample_size=None, confound="one_sided", alpha_range=None
):
"""Return the sensitivity data by different method
Args:
method (list of str): a list of sensitivity analysis method
sample_size (float, optional): ratio for subset the original data
confound (string, optional): the name of confouding function
alpha_range (np.array, optional): a parameter to pass the confounding function
Returns:
X (np.matrix): a feature matrix
p (np.array): a propensity score vector between 0 and 1
treatment (np.array): a treatment vector (1 if treated, otherwise 0)
y (np.array): an outcome vector
"""
if alpha_range is None:
y = self.df[self.outcome_col]
iqr = y.quantile(0.75) - y.quantile(0.25)
alpha_range = np.linspace(-iqr / 2, iqr / 2, 11)
if 0 not in alpha_range:
alpha_range = np.append(alpha_range, 0)
else:
alpha_range = alpha_range
alpha_range.sort()
summary = []
for method in methods:
sens = self.get_class_object(method)
sens = sens(
self.df,
self.inference_features,
self.p_col,
self.treatment_col,
self.outcome_col,
self.learner,
sample_size=sample_size,
confound=confound,
alpha_range=alpha_range,
)
if method == "Subset Data":
method = method + "(sample size @{})".format(sample_size)
sens_df = sens.summary(method=method)
summary.append(sens_df.values.tolist()[0])
summary_df = pd.DataFrame(summary, columns=SUMMARY_COLS)
return summary_df
[docs] def summary(self, method):
"""Summary report
Args:
method_name (str): sensitivity analysis method
Returns:
(pd.DataFrame): a summary dataframe
"""
method_name = method
X = self.df[self.inference_features].values
p = self.df[self.p_col].values
treatment = self.df[self.treatment_col].values
y = self.df[self.outcome_col].values
preds = self.get_prediction(X, p, treatment, y)
ate = preds.mean()
ate_new, ate_new_lower, ate_new_upper = self.sensitivity_estimate()
sensitivity_summary = pd.DataFrame(
[method_name, ate, ate_new, ate_new_lower, ate_new_upper]
).T
sensitivity_summary.columns = SUMMARY_COLS
return sensitivity_summary
[docs] def sensitivity_estimate(self):
raise NotImplementedError
[docs]class SensitivityPlaceboTreatment(Sensitivity):
"""Replaces the treatment variable with a new variable randomly generated."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] def sensitivity_estimate(self):
"""Summary report
Args:
return_ci (str): sensitivity analysis method
Returns:
(pd.DataFrame): a summary dataframe
"""
num_rows = self.df.shape[0]
X = self.df[self.inference_features].values
p = self.df[self.p_col].values
treatment_new = np.random.randint(2, size=num_rows)
y = self.df[self.outcome_col].values
ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X, p, treatment_new, y)
return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivityRandomCause(Sensitivity):
"""Adds an irrelevant random covariate to the dataframe."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] def sensitivity_estimate(self):
num_rows = self.df.shape[0]
new_data = np.random.randn(num_rows)
X = self.df[self.inference_features].values
p = self.df[self.p_col].values
treatment = self.df[self.treatment_col].values
y = self.df[self.outcome_col].values
X_new = np.hstack((X, new_data.reshape((-1, 1))))
ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(X_new, p, treatment, y)
return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivityRandomReplace(Sensitivity):
"""Replaces a random covariate with an irrelevant variable."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if "replaced_feature" not in kwargs:
replaced_feature_index = np.random.randint(len(self.inference_features))
self.replaced_feature = self.inference_features[replaced_feature_index]
else:
self.replaced_feature = kwargs["replaced_feature"]
[docs] def sensitivity_estimate(self):
"""Replaces a random covariate with an irrelevant variable."""
logger.info(
"Replace feature {} with an random irrelevant variable".format(
self.replaced_feature
)
)
df_new = self.df.copy()
num_rows = self.df.shape[0]
df_new[self.replaced_feature] = np.random.randn(num_rows)
X_new = df_new[self.inference_features].values
p_new = df_new[self.p_col].values
treatment_new = df_new[self.treatment_col].values
y_new = df_new[self.outcome_col].values
ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(
X_new, p_new, treatment_new, y_new
)
return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivitySubsetData(Sensitivity):
"""Takes a random subset of size sample_size of the data."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sample_size = kwargs["sample_size"]
assert self.sample_size is not None
[docs] def sensitivity_estimate(self):
df_new = self.df.sample(frac=self.sample_size).copy()
X_new = df_new[self.inference_features].values
p_new = df_new[self.p_col].values
treatment_new = df_new[self.treatment_col].values
y_new = df_new[self.outcome_col].values
ate_new, ate_new_lower, ate_new_upper = self.get_ate_ci(
X_new, p_new, treatment_new, y_new
)
return ate_new, ate_new_lower, ate_new_upper
[docs]class SensitivitySelectionBias(Sensitivity):
"""Reference:
[1] Blackwell, Matthew. "A selection bias approach to sensitivity analysis
for causal effects." Political Analysis 22.2 (2014): 169-182.
https://www.mattblackwell.org/files/papers/causalsens.pdf
[2] Confouding parameter alpha_range using the same range as in:
https://github.com/mattblackwell/causalsens/blob/master/R/causalsens.R
"""
def __init__(
self,
*args,
confound="one_sided",
alpha_range=None,
sensitivity_features=None,
**kwargs,
):
super().__init__(*args, **kwargs)
"""Initialize.
Args:
confound (string): the name of confouding function
alpha_range (np.array): a parameter to pass the confounding function
sensitivity_features (list of str): ): a list of columns that to check each individual partial r-square
"""
logger.info("Only works for linear outcome models right now. Check back soon.")
confounding_functions = {
"one_sided": one_sided,
"alignment": alignment,
"one_sided_att": one_sided_att,
"alignment_att": alignment_att,
}
try:
confound_func = confounding_functions[confound]
except KeyError:
raise NotImplementedError(
f"Confounding function, {confound} is not implemented. \
Use one of {confounding_functions.keys()}"
)
self.confound = confound_func
if sensitivity_features is None:
self.sensitivity_features = self.inference_features
else:
self.sensitivity_features = sensitivity_features
if alpha_range is None:
y = self.df[self.outcome_col]
iqr = y.quantile(0.75) - y.quantile(0.25)
self.alpha_range = np.linspace(-iqr / 2, iqr / 2, 11)
if 0 not in self.alpha_range:
self.alpha_range = np.append(self.alpha_range, 0)
else:
self.alpha_range = alpha_range
self.alpha_range.sort()
[docs] def causalsens(self):
alpha_range = self.alpha_range
confound = self.confound
df = self.df
X = df[self.inference_features].values
p = df[self.p_col].values
treatment = df[self.treatment_col].values
y = df[self.outcome_col].values
preds = self.get_prediction(X, p, treatment, y)
sens_df = pd.DataFrame()
sens = []
for a in alpha_range:
adj = confound(a, p, treatment)
preds_adj = y - adj
s_preds = self.get_prediction(X, p, treatment, preds_adj)
ate, ate_lb, ate_ub = self.get_ate_ci(X, p, treatment, preds_adj)
s_preds_residul = preds_adj - s_preds
rsqs = a**2 * np.var(treatment) / np.var(s_preds_residul)
sens.append([a, rsqs, ate, ate_lb, ate_ub])
sens_df = pd.DataFrame(
sens, columns=["alpha", "rsqs", "New ATE", "New ATE LB", "New ATE UB"]
)
rss = np.sum(np.square(y - preds))
partial_rsqs = []
for feature in self.sensitivity_features:
df_new = df.copy()
X_new = df_new[self.inference_features].drop(feature, axis=1).copy()
y_new_preds = self.get_prediction(X_new, p, treatment, y)
rss_new = np.sum(np.square(y - y_new_preds))
partial_rsqs.append(((rss_new - rss) / rss))
partial_rsqs_df = pd.DataFrame([self.sensitivity_features, partial_rsqs]).T
partial_rsqs_df.columns = ["feature", "partial_rsqs"]
return sens_df, partial_rsqs_df
[docs] def summary(self, method="Selection Bias"):
"""Summary report for Selection Bias Method
Args:
method_name (str): sensitivity analysis method
Returns:
(pd.DataFrame): a summary dataframe
"""
method_name = method
sensitivity_summary = self.causalsens()[0]
sensitivity_summary["Method"] = [
method_name + " (alpha@" + str(round(i, 5)) + ", with r-sqaure:"
for i in sensitivity_summary.alpha
]
sensitivity_summary["Method"] = sensitivity_summary[
"Method"
] + sensitivity_summary["rsqs"].round(5).astype(str)
sensitivity_summary["ATE"] = sensitivity_summary[
sensitivity_summary.alpha == 0
]["New ATE"]
return sensitivity_summary[SUMMARY_COLS]
[docs] @staticmethod
def plot(sens_df, partial_rsqs_df=None, type="raw", ci=False, partial_rsqs=False):
"""Plot the results of a sensitivity analysis against unmeasured
Args:
sens_df (pandas.DataFrame): a data frame output from causalsens
partial_rsqs_d (pandas.DataFrame) : a data frame output from causalsens including partial rsqure
type (str, optional): the type of plot to draw, 'raw' or 'r.squared' are supported
ci (bool, optional): whether plot confidence intervals
partial_rsqs (bool, optional): whether plot partial rsquare results
"""
if type == "raw" and not ci:
fig, ax = plt.subplots()
y_max = round(sens_df["New ATE UB"].max() * 1.1, 4)
y_min = round(sens_df["New ATE LB"].min() * 0.9, 4)
x_max = round(sens_df.alpha.max() * 1.1, 4)
x_min = round(sens_df.alpha.min() * 0.9, 4)
plt.ylim(y_min, y_max)
plt.xlim(x_min, x_max)
ax.plot(sens_df.alpha, sens_df["New ATE"])
elif type == "raw" and ci:
fig, ax = plt.subplots()
y_max = round(sens_df["New ATE UB"].max() * 1.1, 4)
y_min = round(sens_df["New ATE LB"].min() * 0.9, 4)
x_max = round(sens_df.alpha.max() * 1.1, 4)
x_min = round(sens_df.alpha.min() * 0.9, 4)
plt.ylim(y_min, y_max)
plt.xlim(x_min, x_max)
ax.fill_between(
sens_df.alpha,
sens_df["New ATE LB"],
sens_df["New ATE UB"],
color="gray",
alpha=0.5,
)
ax.plot(sens_df.alpha, sens_df["New ATE"])
elif type == "r.squared" and ci:
fig, ax = plt.subplots()
y_max = round(sens_df["New ATE UB"].max() * 1.1, 4)
y_min = round(sens_df["New ATE LB"].min() * 0.9, 4)
plt.ylim(y_min, y_max)
ax.fill_between(
sens_df.rsqs,
sens_df["New ATE LB"],
sens_df["New ATE UB"],
color="gray",
alpha=0.5,
)
ax.plot(sens_df.rsqs, sens_df["New ATE"])
if partial_rsqs:
plt.scatter(
partial_rsqs_df.partial_rsqs,
list(sens_df[sens_df.alpha == 0]["New ATE"])
* partial_rsqs_df.shape[0],
marker="x",
color="red",
linewidth=10,
)
elif type == "r.squared" and not ci:
fig, ax = plt.subplots()
y_max = round(sens_df["New ATE UB"].max() * 1.1, 4)
y_min = round(sens_df["New ATE LB"].min() * 0.9, 4)
plt.ylim(y_min, y_max)
plt.plot(sens_df.rsqs, sens_df["New ATE"])
if partial_rsqs:
plt.scatter(
partial_rsqs_df.partial_rsqs,
list(sens_df[sens_df.alpha == 0]["New ATE"])
* partial_rsqs_df.shape[0],
marker="x",
color="red",
linewidth=10,
)
[docs] @staticmethod
def partial_rsqs_confounding(sens_df, feature_name, partial_rsqs_value, range=0.01):
"""Check partial rsqs values of feature corresponding confounding amonunt of ATE
Args:
sens_df (pandas.DataFrame): a data frame output from causalsens
feature_name (str): feature name to check
partial_rsqs_value (float) : partial rsquare value of feature
range (float) : range to search from sens_df
Return: min and max value of confounding amount
"""
rsqs_dict = []
for i in sens_df.rsqs:
if (
partial_rsqs_value - partial_rsqs_value * range
< i
< partial_rsqs_value + partial_rsqs_value * range
):
rsqs_dict.append(i)
if rsqs_dict:
confounding_min = sens_df[sens_df.rsqs.isin(rsqs_dict)].alpha.min()
confounding_max = sens_df[sens_df.rsqs.isin(rsqs_dict)].alpha.max()
logger.info(
"Only works for linear outcome models right now. Check back soon."
)
logger.info(
"For feature {} with partial rsquare {} confounding amount with possible values: {}, {}".format(
feature_name, partial_rsqs_value, confounding_min, confounding_max
)
)
return [confounding_min, confounding_max]
else:
logger.info(
"Cannot find correponding rsquare value within the range for input, please edit confounding",
"values vector or use a larger range and try again",
)