Source code for causalml.optimize.unit_selection

import numpy as np

from sklearn.base import clone

import warnings


[docs]class CounterfactualUnitSelector: """ A highly experimental implementation of the counterfactual unit selection model proposed by Li and Pearl (2019). Parameters ---------- learner : object The base learner used to estimate the segment probabilities. nevertaker_payoff : float The payoff from targeting a never-taker alwaystaker_payoff : float The payoff from targeting an always-taker complier_payoff : float The payoff from targeting a complier defier_payoff : float The payoff from targeting a defier organic_conversion : float, optional (default=None) The organic conversion rate in the population without an intervention. If None, the organic conversion rate is obtained from tne control group. NB: The organic conversion in the control group is not always the same as the organic conversion rate without treatment. data : DataFrame A pandas DataFrame containing the features, treatment assignment indicator and the outcome of interest. treatment : string A string corresponding to the name of the treatment column. The assumed coding in the column is 1 for treatment and 0 for control. outcome : string A string corresponding to the name of the outcome column. The assumed coding in the column is 1 for conversion and 0 for no conversion. References ---------- Li, Ang, and Judea Pearl. 2019. “Unit Selection Based on Counterfactual Logic.” https://ftp.cs.ucla.edu/pub/stat_ser/r488.pdf. """ def __init__( self, learner, nevertaker_payoff, alwaystaker_payoff, complier_payoff, defier_payoff, organic_conversion=None, ): self.learner = learner self.nevertaker_payoff = nevertaker_payoff self.alwaystaker_payoff = alwaystaker_payoff self.complier_payoff = complier_payoff self.defier_payoff = defier_payoff self.organic_conversion = organic_conversion
[docs] def fit(self, data, treatment, outcome): """ Fits the class. """ if self._gain_equality_check(): self._fit_segment_model(data, treatment, outcome) else: self._fit_segment_model(data, treatment, outcome) self._fit_condprob_models(data, treatment, outcome)
[docs] def predict(self, data, treatment, outcome): """ Predicts an individual-level payoff. If gain equality is satisfied, uses the exact function; if not, uses the midpoint between bounds. """ if self._gain_equality_check(): est_payoff = self._get_exact_benefit(data, treatment, outcome) else: est_payoff = self._obj_func_midp(data, treatment, outcome) return est_payoff
def _gain_equality_check(self): """ Checks if gain equality is satisfied. If so, the optimization task can be simplified. """ return ( self.complier_payoff + self.defier_payoff == self.alwaystaker_payoff + self.nevertaker_payoff ) @staticmethod def _make_segments(data, treatment, outcome): """ Constructs the following segments: * AC = Pr(Y = 1, W = 1 /mid X) * AD = Pr(Y = 1, W = 0 /mid X) * ND = Pr(Y = 0, W = 1 /mid X) * ND = Pr(Y = 0, W = 0 /mid X) where the names of the outcomes correspond the combinations of the relevant segments, eg AC = Always-taker or Complier. """ segments = np.empty(data.shape[0], dtype="object") segments[(data[treatment] == 1) & (data[outcome] == 1)] = "AC" segments[(data[treatment] == 0) & (data[outcome] == 1)] = "AD" segments[(data[treatment] == 1) & (data[outcome] == 0)] = "ND" segments[(data[treatment] == 0) & (data[outcome] == 0)] = "NC" return segments def _fit_segment_model(self, data, treatment, outcome): """ Fits a classifier for estimating the probabilities for the unit segment combinations. """ model = clone(self.learner) X = data.drop([treatment, outcome], axis=1) y = self._make_segments(data, treatment, outcome) self.segment_model = model.fit(X, y) def _fit_condprob_models(self, data, treatment, outcome): """ Fits two classifiers to estimate conversion probabilities conditional on the treatment. """ trt_learner = clone(self.learner) ctr_learner = clone(self.learner) treated = data[treatment] == 1 X = data.drop([treatment, outcome], axis=1) y = data[outcome] self.trt_model = trt_learner.fit(X[treated], y[treated]) self.ctr_model = ctr_learner.fit(X[~treated], y[~treated]) def _get_exact_benefit(self, data, treatment, outcome): """ Calculates the exact benefit function of Theorem 4 in Li and Pearl (2019). Returns the exact benefit. """ beta = self.complier_payoff gamma = self.alwaystaker_payoff theta = self.nevertaker_payoff X = data.drop([treatment, outcome], axis=1) segment_prob = self.segment_model.predict_proba(X) segment_name = self.segment_model.classes_ benefit = ( (beta - theta) * segment_prob[:, segment_name == "AC"] + (gamma - beta) * segment_prob[:, segment_name == "AD"] + theta ) return benefit def _obj_func_midp(self, data, treatment, outcome): """ Calculates bounds for the objective function. Returns the midpoint between bounds. Parameters ---------- pr_y1_w1 : float The probability of conversion given treatment assignment. pr_y1_w0 : float The probability of conversion given control assignment. pr_y0_w1 : float The probability of no conversion given treatment assignment (1 - pr_y1_w1). pr_y0_w0 : float The probability of no conversion given control assignment (1 - pr_1y_w0) pr_y1w1_x : float Probability of complier or always-taker given X. pr_y0w0_x : float Probability of complier or never-taker given X. pr_y1w0_x : float Probability of defier or always-taker given X. pr_y0w1_x : float Probability of never-taker or defier given X. pr_y_x : float Organic probability of conversion. """ X = data.drop([treatment, outcome], axis=1) beta = self.complier_payoff gamma = self.alwaystaker_payoff theta = self.nevertaker_payoff delta = self.defier_payoff pr_y0_w1, pr_y1_w1 = np.split( self.trt_model.predict_proba(X), indices_or_sections=2, axis=1 ) pr_y0_w0, pr_y1_w0 = np.split( self.ctr_model.predict_proba(X), indices_or_sections=2, axis=1 ) segment_prob = self.segment_model.predict_proba(X) segment_name = self.segment_model.classes_ pr_y1w1_x = segment_prob[:, segment_name == "AC"] pr_y0w0_x = segment_prob[:, segment_name == "NC"] pr_y1w0_x = segment_prob[:, segment_name == "AD"] pr_y0w1_x = segment_prob[:, segment_name == "ND"] if self.organic_conversion is not None: pr_y_x = self.organic_conversion else: pr_y_x = pr_y1_w0 warnings.warn( "Probability of organic conversion estimated from control observations." ) p1 = (beta - theta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 p2 = gamma * pr_y1_w1 + delta * pr_y0_w1 + (beta - gamma) * pr_y0_w0 p3 = ( (gamma - delta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 + (beta - gamma - theta + delta) * (pr_y1w1_x + pr_y0w0_x) ) p4 = ( (beta - theta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + theta * pr_y0_w0 + (beta - gamma - theta + delta) * (pr_y1w0_x + pr_y0w1_x) ) p5 = (gamma - delta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 p6 = ( (beta - theta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + theta * pr_y0_w0 ) p7 = ( (gamma - delta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + theta * pr_y0_w0 + (beta - gamma - theta + delta) * pr_y_x ) p8 = ( (beta - theta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 - (beta - gamma - theta + delta) * pr_y_x ) params_1 = np.concatenate((p1, p2, p3, p4), axis=1) params_2 = np.concatenate((p5, p6, p7, p8), axis=1) sigma = beta - gamma - theta + delta if sigma < 0: lower_bound = np.max(params_1, axis=1) upper_bound = np.min(params_2, axis=1) elif sigma > 0: lower_bound = np.max(params_2, axis=1) upper_bound = np.min(params_1, axis=1) return (lower_bound + upper_bound) / 2