Source code for causalml.propensity

from abc import ABCMeta, abstractmethod
import logging
import numpy as np
from pygam import LogisticGAM, s
from sklearn.metrics import roc_auc_score as auc
from sklearn.linear_model import LogisticRegressionCV
from sklearn.model_selection import StratifiedKFold, train_test_split
import xgboost as xgb


logger = logging.getLogger('causalml')


[docs]class PropensityModel(metaclass=ABCMeta): def __init__(self, clip_bounds=(1e-3, 1 - 1e-3), **model_kwargs): """ Args: clip_bounds (tuple): lower and upper bounds for clipping propensity scores. Bounds should be implemented such that: 0 < lower < upper < 1, to avoid division by zero in BaseRLearner.fit_predict() step. model_kwargs: Keyword arguments to be passed to the underlying classification model. """ self.clip_bounds = clip_bounds self.model_kwargs = model_kwargs self.model = self._model @property @abstractmethod def _model(self): pass def __repr__(self): return self.model.__repr__()
[docs] def fit(self, X, y): """ Fit a propensity model. Args: X (numpy.ndarray): a feature matrix y (numpy.ndarray): a binary target vector """ self.model.fit(X, y)
[docs] def predict(self, X): """ Predict propensity scores. Args: X (numpy.ndarray): a feature matrix Returns: (numpy.ndarray): Propensity scores between 0 and 1. """ return np.clip( self.model.predict_proba(X)[:, 1], *self.clip_bounds )
[docs] def fit_predict(self, X, y): """ Fit a propensity model and predict propensity scores. Args: X (numpy.ndarray): a feature matrix y (numpy.ndarray): a binary target vector Returns: (numpy.ndarray): Propensity scores between 0 and 1. """ self.fit(X, y) propensity_scores = self.predict(X) logger.info('AUC score: {:.6f}'.format(auc(y, propensity_scores))) return propensity_scores
[docs]class LogisticRegressionPropensityModel(PropensityModel): """ Propensity regression model based on the LogisticRegression algorithm. """ @property def _model(self): kwargs = { 'penalty': 'elasticnet', 'solver': 'saga', 'Cs': np.logspace(1e-3, 1 - 1e-3, 4), 'l1_ratios': np.linspace(1e-3, 1 - 1e-3, 4), 'cv': StratifiedKFold( n_splits=self.model_kwargs.pop('n_fold') if 'n_fold' in self.model_kwargs else 4, shuffle=True, random_state=self.model_kwargs.get('random_state', 42) ), 'random_state': 42, } kwargs.update(self.model_kwargs) return LogisticRegressionCV(**kwargs)
[docs]class ElasticNetPropensityModel(LogisticRegressionPropensityModel): pass
[docs]class GradientBoostedPropensityModel(PropensityModel): """ Gradient boosted propensity score model with optional early stopping. Notes ----- Please see the xgboost documentation for more information on gradient boosting tuning parameters: https://xgboost.readthedocs.io/en/latest/python/python_api.html """ def __init__( self, early_stop=False, clip_bounds=(1e-3, 1 - 1e-3), **model_kwargs ): super(GradientBoostedPropensityModel, self).__init__(clip_bounds, **model_kwargs) self.early_stop = early_stop @property def _model(self): kwargs = { 'max_depth': 8, 'learning_rate': 0.1, 'n_estimators': 100, 'objective': 'binary:logistic', 'nthread': -1, 'colsample_bytree': 0.8, 'random_state': 42, } kwargs.update(self.model_kwargs) return xgb.XGBClassifier(**kwargs)
[docs] def fit(self, X, y, early_stopping_rounds=10, stop_val_size=0.2): """ Fit a propensity model. Args: X (numpy.ndarray): a feature matrix y (numpy.ndarray): a binary target vector """ if self.early_stop: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=stop_val_size ) self.model.fit( X_train, y_train, eval_set=[(X_val, y_val)], early_stopping_rounds=early_stopping_rounds ) else: super(GradientBoostedPropensityModel, self).fit(X, y)
[docs] def predict(self, X): """ Predict propensity scores. Args: X (numpy.ndarray): a feature matrix Returns: (numpy.ndarray): Propensity scores between 0 and 1. """ if self.early_stop: return np.clip( self.model.predict_proba( X, ntree_limit=self.model.best_ntree_limit )[:, 1], *self.clip_bounds ) else: return super(GradientBoostedPropensityModel, self).predict(X)
[docs]def calibrate(ps, treatment): """Calibrate propensity scores with logistic GAM. Ref: https://pygam.readthedocs.io/en/latest/api/logisticgam.html Args: ps (numpy.array): a propensity score vector treatment (numpy.array): a binary treatment vector (0: control, 1: treated) Returns: (numpy.array): a calibrated propensity score vector """ gam = LogisticGAM(s(0)).fit(ps, treatment) return gam.predict_proba(ps)
[docs]def compute_propensity_score(X, treatment, p_model=None, X_pred=None, treatment_pred=None, calibrate_p=True): """Generate propensity score if user didn't provide Args: X (np.matrix): features for training treatment (np.array or pd.Series): a treatment vector for training p_model (propensity model object, optional): ElasticNetPropensityModel (default) / GradientBoostedPropensityModel X_pred (np.matrix, optional): features for prediction treatment_pred (np.array or pd.Series, optional): a treatment vector for prediciton calibrate_p (bool, optional): whether calibrate the propensity score Returns: (tuple) - p (numpy.ndarray): propensity score - p_model (PropensityModel): a trained PropensityModel object """ if treatment_pred is None: treatment_pred = treatment.copy() if p_model is None: p_model = ElasticNetPropensityModel() p_model.fit(X, treatment) if X_pred is None: p = p_model.predict(X) else: p = p_model.predict(X_pred) if calibrate_p: logger.info('Calibrating propensity scores.') p = calibrate(p, treatment_pred) # force the p values within the range eps = np.finfo(float).eps p = np.where(p < 0 + eps, 0 + eps*1.001, p) p = np.where(p > 1 - eps, 1 - eps*1.001, p) return p, p_model