import logging
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.special import expit, logit
from scipy.stats import norm
from sklearn.preprocessing import MinMaxScaler
from causalml.inference.meta.utils import check_treatment_vector, check_p_conditions, convert_pd_to_np
from causalml.propensity import calibrate
logger = logging.getLogger('causalml')
def logit_tmle(x, y, a, h0, h1):
p = expit(a + x[0] * h0 + x[1] * h1)
return np.mean(-np.log(np.power(p, y) * np.power(1 - p, 1 - y)))
def logit_tmle_grad(x, y, a, h0, h1):
p = expit(a + x[0] * h0 + x[1] * h1)
return np.array([-np.mean((y - p) * h0), -np.mean((y - p) * h1)])
def logit_tmle_hess(x, y, a, h0, h1):
p = expit(a + x[0] * h0 + x[1] * h1)
return np.array([[np.mean(p * (1 - p) * h0 * h0), np.mean(p * (1 - p) * h0 * h1)],
[np.mean(p * (1 - p) * h0 * h1), np.mean(p * (1 - p) * h1 * h1)]])
def simple_tmle(y, w, q0w, q1w, p, alpha=0.0001):
"""Calculate the ATE and variances with the simplified TMLE method.
Args:
y (numpy.array): an outcome vector
w (numpy.array): a treatment vector
q0w (numpy.array): an outcome prediction vector given no treatment
q1w (numpy.array): an outcome prediction vector given treatment
p (numpy.array): a propensity score vector
alpha (float, optional): a clipping threshold for predictions
Returns:
(tuple)
- ate (float): ATE
- se (float): The standard error of ATE
"""
scaler = MinMaxScaler()
ystar = scaler.fit_transform(y.reshape(-1, 1)).flatten()
q0 = np.clip(scaler.transform(q0w.reshape(-1, 1)).flatten(), alpha, 1 - alpha)
q1 = np.clip(scaler.transform(q1w.reshape(-1, 1)).flatten(), alpha, 1 - alpha)
qaw = q0 * (1 - w) + q1 * w
intercept = logit(qaw)
h1 = w / p
h0 = (1 - w) / (1 - p)
sol = minimize(logit_tmle, np.zeros(2), args=(ystar, intercept, h0, h1),
method="Newton-CG", jac=logit_tmle_grad, hess=logit_tmle_hess)
qawstar = scaler.inverse_transform(expit(intercept + sol.x[0] * h0 + sol.x[1] * h1).reshape(-1, 1)).flatten()
q0star = scaler.inverse_transform(expit(logit(q0) + sol.x[0] / (1 - p)).reshape(-1, 1)).flatten()
q1star = scaler.inverse_transform(expit(logit(q1) + sol.x[1] / p).reshape(-1, 1)).flatten()
ic = (w / p - (1 - w) / (1 - p)) * (y - qawstar) + q1star - q0star - np.mean(q1star - q0star)
return np.mean(q1star - q0star), np.sqrt(np.var(ic) / np.size(y))
[docs]class TMLELearner(object):
"""Targeted maximum likelihood estimation.
Ref: Gruber, S., & Van Der Laan, M. J. (2009). Targeted maximum likelihood estimation: A gentle introduction.
"""
def __init__(self, learner, ate_alpha=.05, control_name=0, cv=None, calibrate_propensity=True):
"""Initialize a TMLE learner.
Args:
learner: a model to estimate the outcome
ate_alpha (float, optional): the confidence level alpha of the ATE estimate
control_name (str or int, optional): the name of the control group
cv (sklearn.model_selection._BaseKFold, optional): sklearn CV object
"""
self.model_tau = learner
self.ate_alpha = ate_alpha
self.control_name = control_name
self.cv = cv
self.calibrate_propensity = calibrate_propensity
def __repr__(self):
return '{}(model={}, cv={})'.format(self.__class__.__name__, self.model_tau.__repr__(), self.cv)
[docs] def estimate_ate(self, X, treatment, y, p, segment=None, return_ci=False):
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
p (np.ndarray or pd.Series or dict): an array of propensity scores of float (0,1) in the single-treatment
case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1)
segment (np.array, optional): An optional segment vector of int. If given, the ATE and its CI will be
estimated for each segment.
return_ci (bool, optional): Whether to return confidence intervals
Returns:
(tuple): The ATE and its confidence interval (LB, UB) for each treatment, t and segment, s
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
check_p_conditions(p, self.t_groups)
if isinstance(p, (np.ndarray, pd.Series)):
treatment_name = self.t_groups[0]
p = {treatment_name: convert_pd_to_np(p)}
elif isinstance(p, dict):
p = {treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items()}
ate = []
ate_lb = []
ate_ub = []
for i, group in enumerate(self.t_groups):
logger.info('Estimating ATE for group {}.'.format(group))
w_group = (treatment == group).astype(int)
p_group = p[group]
if self.calibrate_propensity:
logger.info('Calibrating propensity scores.')
p_group = calibrate(p_group, w_group)
yhat_c = np.zeros_like(y, dtype=float)
yhat_t = np.zeros_like(y, dtype=float)
if self.cv:
for i_fold, (i_trn, i_val) in enumerate(self.cv.split(X, y), 1):
logger.info('Training an outcome model for CV #{}'.format(i_fold))
self.model_tau.fit(np.hstack((X[i_trn], w_group[i_trn].reshape(-1, 1))), y[i_trn])
yhat_c[i_val] = self.model_tau.predict(np.hstack((X[i_val], np.zeros((len(i_val), 1)))))
yhat_t[i_val] = self.model_tau.predict(np.hstack((X[i_val], np.ones((len(i_val), 1)))))
else:
self.model_tau.fit(np.hstack((X, w_group.reshape(-1, 1))), y)
yhat_c = self.model_tau.predict(np.hstack((X, np.zeros((len(y), 1)))))
yhat_t = self.model_tau.predict(np.hstack((X, np.ones((len(y), 1)))))
if segment is None:
logger.info('Training the TMLE learner.')
_ate, se = simple_tmle(y, w_group, yhat_c, yhat_t, p_group)
_ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2)
else:
assert segment.shape[0] == X.shape[0] and segment.ndim == 1, 'Segment must be the 1-d np.array of int.'
segments = np.unique(segment)
_ate = []
_ate_lb = []
_ate_ub = []
for s in sorted(segments):
logger.info('Training the TMLE learner for segment {}.'.format(s))
filt = (segment == s) & (yhat_c < np.quantile(yhat_c, q=.99))
_ate_s, se = simple_tmle(y[filt], w_group[filt], yhat_c[filt], yhat_t[filt], p_group[filt])
_ate_lb_s = _ate_s - se * norm.ppf(1 - self.ate_alpha / 2)
_ate_ub_s = _ate_s + se * norm.ppf(1 - self.ate_alpha / 2)
_ate.append(_ate_s)
_ate_lb.append(_ate_lb_s)
_ate_ub.append(_ate_ub_s)
ate.append(_ate)
ate_lb.append(_ate_lb)
ate_ub.append(_ate_ub)
return np.array(ate), np.array(ate_lb), np.array(ate_ub)