from abc import ABCMeta, abstractmethod
import logging
import numpy as np
from pygam import LogisticGAM, s
from sklearn.metrics import roc_auc_score as auc
from sklearn.linear_model import LogisticRegressionCV
from sklearn.model_selection import StratifiedKFold, train_test_split
import xgboost as xgb
logger = logging.getLogger('causalml')
[docs]class PropensityModel(metaclass=ABCMeta):
def __init__(self, clip_bounds=(1e-3, 1 - 1e-3), **model_kwargs):
"""
Args:
clip_bounds (tuple): lower and upper bounds for clipping propensity scores. Bounds should be implemented
such that: 0 < lower < upper < 1, to avoid division by zero in BaseRLearner.fit_predict() step.
model_kwargs: Keyword arguments to be passed to the underlying classification model.
"""
self.clip_bounds = clip_bounds
self.model_kwargs = model_kwargs
self.model = self._model
@property
@abstractmethod
def _model(self):
pass
def __repr__(self):
return self.model.__repr__()
[docs] def fit(self, X, y):
"""
Fit a propensity model.
Args:
X (numpy.ndarray): a feature matrix
y (numpy.ndarray): a binary target vector
"""
self.model.fit(X, y)
[docs] def predict(self, X):
"""
Predict propensity scores.
Args:
X (numpy.ndarray): a feature matrix
Returns:
(numpy.ndarray): Propensity scores between 0 and 1.
"""
return np.clip(
self.model.predict_proba(X)[:, 1], *self.clip_bounds
)
[docs] def fit_predict(self, X, y):
"""
Fit a propensity model and predict propensity scores.
Args:
X (numpy.ndarray): a feature matrix
y (numpy.ndarray): a binary target vector
Returns:
(numpy.ndarray): Propensity scores between 0 and 1.
"""
self.fit(X, y)
propensity_scores = self.predict(X)
logger.info('AUC score: {:.6f}'.format(auc(y, propensity_scores)))
return propensity_scores
[docs]class LogisticRegressionPropensityModel(PropensityModel):
"""
Propensity regression model based on the LogisticRegression algorithm.
"""
@property
def _model(self):
kwargs = {
'penalty': 'elasticnet',
'solver': 'saga',
'Cs': np.logspace(1e-3, 1 - 1e-3, 4),
'l1_ratios': np.linspace(1e-3, 1 - 1e-3, 4),
'cv': StratifiedKFold(
n_splits=self.model_kwargs.pop('n_fold') if 'n_fold' in self.model_kwargs else 4,
shuffle=True,
random_state=self.model_kwargs.get('random_state', 42)
),
'random_state': 42,
}
kwargs.update(self.model_kwargs)
return LogisticRegressionCV(**kwargs)
[docs]class ElasticNetPropensityModel(LogisticRegressionPropensityModel):
pass
[docs]class GradientBoostedPropensityModel(PropensityModel):
"""
Gradient boosted propensity score model with optional early stopping.
Notes
-----
Please see the xgboost documentation for more information on gradient boosting tuning parameters:
https://xgboost.readthedocs.io/en/latest/python/python_api.html
"""
def __init__(
self,
early_stop=False,
clip_bounds=(1e-3, 1 - 1e-3),
**model_kwargs
):
super(GradientBoostedPropensityModel, self).__init__(clip_bounds, **model_kwargs)
self.early_stop = early_stop
@property
def _model(self):
kwargs = {
'max_depth': 8,
'learning_rate': 0.1,
'n_estimators': 100,
'objective': 'binary:logistic',
'nthread': -1,
'colsample_bytree': 0.8,
'random_state': 42,
}
kwargs.update(self.model_kwargs)
return xgb.XGBClassifier(**kwargs)
[docs] def fit(self, X, y, early_stopping_rounds=10, stop_val_size=0.2):
"""
Fit a propensity model.
Args:
X (numpy.ndarray): a feature matrix
y (numpy.ndarray): a binary target vector
"""
if self.early_stop:
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=stop_val_size
)
self.model.fit(
X_train,
y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=early_stopping_rounds
)
else:
super(GradientBoostedPropensityModel, self).fit(X, y)
[docs] def predict(self, X):
"""
Predict propensity scores.
Args:
X (numpy.ndarray): a feature matrix
Returns:
(numpy.ndarray): Propensity scores between 0 and 1.
"""
if self.early_stop:
return np.clip(
self.model.predict_proba(
X,
ntree_limit=self.model.best_ntree_limit
)[:, 1],
*self.clip_bounds
)
else:
return super(GradientBoostedPropensityModel, self).predict(X)
[docs]def calibrate(ps, treatment):
"""Calibrate propensity scores with logistic GAM.
Ref: https://pygam.readthedocs.io/en/latest/api/logisticgam.html
Args:
ps (numpy.array): a propensity score vector
treatment (numpy.array): a binary treatment vector (0: control, 1: treated)
Returns:
(numpy.array): a calibrated propensity score vector
"""
gam = LogisticGAM(s(0)).fit(ps, treatment)
return gam.predict_proba(ps)
[docs]def compute_propensity_score(X, treatment, p_model=None, X_pred=None, treatment_pred=None, calibrate_p=True):
"""Generate propensity score if user didn't provide
Args:
X (np.matrix): features for training
treatment (np.array or pd.Series): a treatment vector for training
p_model (propensity model object, optional):
ElasticNetPropensityModel (default) / GradientBoostedPropensityModel
X_pred (np.matrix, optional): features for prediction
treatment_pred (np.array or pd.Series, optional): a treatment vector for prediciton
calibrate_p (bool, optional): whether calibrate the propensity score
Returns:
(tuple)
- p (numpy.ndarray): propensity score
- p_model (PropensityModel): a trained PropensityModel object
"""
if treatment_pred is None:
treatment_pred = treatment.copy()
if p_model is None:
p_model = ElasticNetPropensityModel()
p_model.fit(X, treatment)
if X_pred is None:
p = p_model.predict(X)
else:
p = p_model.predict(X_pred)
if calibrate_p:
logger.info('Calibrating propensity scores.')
p = calibrate(p, treatment_pred)
# force the p values within the range
eps = np.finfo(float).eps
p = np.where(p < 0 + eps, 0 + eps*1.001, p)
p = np.where(p > 1 - eps, 1 - eps*1.001, p)
return p, p_model