Source code for causalml.optimize.unit_selection

import numpy as np
import pandas as pd

from sklearn.base import clone

import warnings


[docs]class CounterfactualUnitSelector: ''' A highly experimental implementation of the counterfactual unit selection model proposed by Li and Pearl (2019). Parameters ---------- learner : object The base learner used to estimate the segment probabilities. nevertaker_payoff : float The payoff from targeting a never-taker alwaystaker_payoff : float The payoff from targeting an always-taker complier_payoff : float The payoff from targeting a complier defier_payoff : float The payoff from targeting a defier organic_conversion : float, optional (default=None) The organic conversion rate in the population without an intervention. If None, the organic conversion rate is obtained from tne control group. NB: The organic conversion in the control group is not always the same as the organic conversion rate without treatment. data : DataFrame A pandas DataFrame containing the features, treatment assignment indicator and the outcome of interest. treatment : string A string corresponding to the name of the treatment column. The assumed coding in the column is 1 for treatment and 0 for control. outcome : string A string corresponding to the name of the outcome column. The assumed coding in the column is 1 for conversion and 0 for no conversion. References ---------- Li, Ang, and Judea Pearl. 2019. “Unit Selection Based on Counterfactual Logic.” https://ftp.cs.ucla.edu/pub/stat_ser/r488.pdf. ''' def __init__(self, learner, nevertaker_payoff, alwaystaker_payoff, complier_payoff, defier_payoff, organic_conversion=None): self.learner = learner self.nevertaker_payoff = nevertaker_payoff self.alwaystaker_payoff = alwaystaker_payoff self.complier_payoff = complier_payoff self.defier_payoff = defier_payoff self.organic_conversion = organic_conversion
[docs] def fit(self, data, treatment, outcome): ''' Fits the class. ''' if self._gain_equality_check(): self._fit_segment_model(data, treatment, outcome) else: self._fit_segment_model(data, treatment, outcome) self._fit_condprob_models(data, treatment, outcome)
[docs] def predict(self, data, treatment, outcome): ''' Predicts an individual-level payoff. If gain equality is satisfied, uses the exact function; if not, uses the midpoint between bounds. ''' if self._gain_equality_check(): est_payoff = self._get_exact_benefit(data, treatment, outcome) else: est_payoff = self._obj_func_midp(data, treatment, outcome) return est_payoff
def _gain_equality_check(self): ''' Checks if gain equality is satisfied. If so, the optimization task can be simplified. ''' return self.complier_payoff + self.defier_payoff == \ self.alwaystaker_payoff + self.nevertaker_payoff @staticmethod def _make_segments(data, treatment, outcome): ''' Constructs the following segments: * AC = Pr(Y = 1, W = 1 /mid X) * AD = Pr(Y = 1, W = 0 /mid X) * ND = Pr(Y = 0, W = 1 /mid X) * ND = Pr(Y = 0, W = 0 /mid X) where the names of the outcomes correspond the combinations of the relevant segments, eg AC = Always-taker or Complier. ''' segments = np.empty(data.shape[0], dtype='object') segments[(data[treatment] == 1) & (data[outcome] == 1)] = 'AC' segments[(data[treatment] == 0) & (data[outcome] == 1)] = 'AD' segments[(data[treatment] == 1) & (data[outcome] == 0)] = 'ND' segments[(data[treatment] == 0) & (data[outcome] == 0)] = 'NC' return segments def _fit_segment_model(self, data, treatment, outcome): ''' Fits a classifier for estimating the probabilities for the unit segment combinations. ''' model = clone(self.learner) X = data.drop([treatment, outcome], axis=1) y = self._make_segments(data, treatment, outcome) self.segment_model = model.fit(X, y) def _fit_condprob_models(self, data, treatment, outcome): ''' Fits two classifiers to estimate conversion probabilities conditional on the treatment. ''' trt_learner = clone(self.learner) ctr_learner = clone(self.learner) treated = data[treatment] == 1 X = data.drop([treatment, outcome], axis=1) y = data['outcome'] self.trt_model = trt_learner.fit(X[treated], y[treated]) self.ctr_model = ctr_learner.fit(X[~treated], y[~treated]) def _get_exact_benefit(self, data, treatment, outcome): ''' Calculates the exact benefit function of Theorem 4 in Li and Pearl (2019). Returns the exact benefit. ''' beta = self.complier_payoff gamma = self.alwaystaker_payoff theta = self.nevertaker_payoff X = data.drop([treatment, outcome], axis=1) segment_prob = self.segment_model.predict_proba(X) segment_name = self.segment_model.classes_ benefit = (beta - theta) * segment_prob[:, segment_name == 'AC'] + \ (gamma - beta) * segment_prob[:, segment_name == 'AD'] + theta return benefit def _obj_func_midp(self, data, treatment, outcome): ''' Calculates bounds for the objective function. Returns the midpoint between bounds. Parameters ---------- pr_y1_w1 : float The probability of conversion given treatment assignment. pr_y1_w0 : float The probability of conversion given control assignment. pr_y0_w1 : float The probability of no conversion given treatment assignment (1 - pr_y1_w1). pr_y0_w0 : float The probability of no conversion given control assignment (1 - pr_1y_w0) pr_y1w1_x : float Probability of complier or always-taker given X. pr_y0w0_x : float Probability of complier or never-taker given X. pr_y1w0_x : float Probability of defier or always-taker given X. pr_y0w1_x : float Probability of never-taker or defier given X. pr_y_x : float Organic probability of conversion. ''' X = data.drop([treatment, outcome], axis=1) beta = self.complier_payoff gamma = self.alwaystaker_payoff theta = self.nevertaker_payoff delta = self.defier_payoff pr_y0_w1, pr_y1_w1 = np.split(self.trt_model.predict_proba(X), indices_or_sections=2, axis=1) pr_y0_w0, pr_y1_w0 = np.split(self.ctr_model.predict_proba(X), indices_or_sections=2, axis=1) segment_prob = self.segment_model.predict_proba(X) segment_name = self.segment_model.classes_ pr_y1w1_x = segment_prob[:, segment_name == 'AC'] pr_y0w0_x = segment_prob[:, segment_name == 'NC'] pr_y1w0_x = segment_prob[:, segment_name == 'AD'] pr_y0w1_x = segment_prob[:, segment_name == 'ND'] if self.organic_conversion is not None: pr_y_x = self.organic_conversion else: pr_y_x = pr_y1_w0 warnings.warn( 'Probability of organic conversion estimated from control observations.') p1 = (beta - theta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 p2 = gamma * pr_y1_w1 + delta * pr_y0_w1 + (beta - gamma) * pr_y0_w0 p3 = (gamma - delta) * pr_y1_w1 + delta * pr_y1_w0 + theta * \ pr_y0_w0 + (beta - gamma - theta + delta) * (pr_y1w1_x + pr_y0w0_x) p4 = (beta - theta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + \ theta * pr_y0_w0 + (beta - gamma - theta + delta) * \ (pr_y1w0_x + pr_y0w1_x) p5 = (gamma - delta) * pr_y1_w1 + delta * pr_y1_w0 + theta * pr_y0_w0 p6 = (beta - theta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + \ theta * pr_y0_w0 p7 = (gamma - delta) * pr_y1_w1 - (beta - gamma - theta) * pr_y1_w0 + \ theta * pr_y0_w0 + (beta - gamma - theta + delta) * pr_y_x p8 = (beta - theta) * pr_y1_w1 + delta * pr_y1_w0 + theta * \ pr_y0_w0 - (beta - gamma - theta + delta) * pr_y_x params_1 = np.concatenate((p1, p2, p3, p4), axis=1) params_2 = np.concatenate((p5, p6, p7, p8), axis=1) sigma = beta - gamma - theta + delta if sigma < 0: lower_bound = np.max(params_1, axis=1) upper_bound = np.min(params_2, axis=1) elif sigma > 0: lower_bound = np.max(params_2, axis=1) upper_bound = np.min(params_1, axis=1) return (lower_bound + upper_bound) / 2