Source code for causalml.match

import argparse
import logging
import sys

import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state

logger = logging.getLogger('causalml')


[docs]def smd(feature, treatment): """Calculate the standard mean difference (SMD) of a feature between the treatment and control groups. The definition is available at https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3144483/#s11title Args: feature (pandas.Series): a column of a feature to calculate SMD for treatment (pandas.Series): a column that indicate whether a row is in the treatment group or not Returns: (float): The SMD of the feature """ t = feature[treatment == 1] c = feature[treatment == 0] return (t.mean() - c.mean()) / np.sqrt(.5 * (t.var() + c.var()))
[docs]def create_table_one(data, treatment_col, features): """Report balance in input features between the treatment and control groups. References: R's tableone at CRAN: https://github.com/kaz-yos/tableone Python's tableone at PyPi: https://github.com/tompollard/tableone Args: data (pandas.DataFrame): total or matched sample data treatment_col (str): the column name for the treatment features (list of str): the column names of features Returns: (pandas.DataFrame): A table with the means and standard deviations in the treatment and control groups, and the SMD between two groups for the features. """ t1 = pd.pivot_table(data[features + [treatment_col]], columns=treatment_col, aggfunc=[lambda x: '{:.2f} ({:.2f})'.format(x.mean(), x.std())]) t1.columns = t1.columns.droplevel(level=0) t1['SMD'] = data[features].apply( lambda x: smd(x, data[treatment_col]) ).round(4) n_row = pd.pivot_table(data[[features[0], treatment_col]], columns=treatment_col, aggfunc=['count']) n_row.columns = n_row.columns.droplevel(level=0) n_row['SMD'] = '' n_row.index = ['n'] t1 = pd.concat([n_row, t1], axis=0) t1.columns.name = '' t1.columns = ['Control', 'Treatment', 'SMD'] t1.index.name = 'Variable' return t1
[docs]class NearestNeighborMatch(object): """ Propensity score matching based on the nearest neighbor algorithm. Attributes: caliper (float): threshold to be considered as a match. replace (bool): whether to match with replacement or not ratio (int): ratio of control / treatment to be matched. used only if replace=True. shuffle (bool): whether to shuffle the treatment group data before matching random_state (numpy.random.RandomState or int): RandomState or an int seed n_jobs (int): The number of parallel jobs to run for neighbors search. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors """ def __init__(self, caliper=.2, replace=False, ratio=1, shuffle=True, random_state=None, n_jobs=-1): """Initialize a propensity score matching model. Args: caliper (float): threshold to be considered as a match. replace (bool): whether to match with replacement or not shuffle (bool): whether to shuffle the treatment group data before matching or not random_state (numpy.random.RandomState or int): RandomState or an int seed n_jobs (int): The number of parallel jobs to run for neighbors search. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors """ self.caliper = caliper self.replace = replace self.ratio = ratio self.shuffle = shuffle self.random_state = check_random_state(random_state) self.n_jobs = n_jobs
[docs] def match(self, data, treatment_col, score_cols): """Find matches from the control group by matching on specified columns (propensity preferred). Args: data (pandas.DataFrame): total input data treatment_col (str): the column name for the treatment score_cols (list): list of column names for matching (propensity column should be included) Returns: (pandas.DataFrame): The subset of data consisting of matched treatment and control group data. """ assert type(score_cols) is list, 'score_cols must be a list' treatment = data.loc[data[treatment_col] == 1, score_cols] control = data.loc[data[treatment_col] == 0, score_cols] sdcal = self.caliper * np.std(data[score_cols].values) if self.replace: scaler = StandardScaler() scaler.fit(data[score_cols]) treatment_scaled = pd.DataFrame(scaler.transform(treatment), index=treatment.index) control_scaled = pd.DataFrame(scaler.transform(control), index=control.index) # SD is the same as caliper because we use a StandardScaler above sdcal = self.caliper matching_model = NearestNeighbors(n_neighbors=self.ratio, n_jobs=self.n_jobs) matching_model.fit(control_scaled) distances, indices = matching_model.kneighbors(treatment_scaled) # distances and indices are (n_obs, self.ratio) matrices. # To index easily, reshape distances, indices and treatment into # the (n_obs * self.ratio, 1) matrices and data frame. distances = distances.T.flatten() indices = indices.T.flatten() treatment_scaled = pd.concat([treatment_scaled] * self.ratio, axis=0) cond = (distances / np.sqrt(len(score_cols)) ) < sdcal # Deduplicate the indices of the treatment group t_idx_matched = np.unique(treatment_scaled.loc[cond].index) # XXX: Should we deduplicate the indices of the control group too? c_idx_matched = np.array(control_scaled.iloc[indices[cond]].index) else: assert len(score_cols) == 1, ( 'Matching on multiple columns is only supported using the ' 'replacement method (if matching on multiple columns, set ' 'replace=True).' ) # unpack score_cols for the single-variable matching case score_col = score_cols[0] if self.shuffle: t_indices = self.random_state.permutation(treatment.index) else: t_indices = treatment.index t_idx_matched = [] c_idx_matched = [] control['unmatched'] = True for t_idx in t_indices: dist = np.abs(control.loc[control.unmatched, score_col] - treatment.loc[t_idx, score_col]) c_idx_min = dist.idxmin() if dist[c_idx_min] <= sdcal: t_idx_matched.append(t_idx) c_idx_matched.append(c_idx_min) control.loc[c_idx_min, 'unmatched'] = False return data.loc[np.concatenate([np.array(t_idx_matched), np.array(c_idx_matched)])]
[docs] def match_by_group(self, data, treatment_col, score_cols, groupby_col): """Find matches from the control group stratified by groupby_col, by matching on specified columns (propensity preferred). Args: data (pandas.DataFrame): total sample data treatment_col (str): the column name for the treatment score_cols (list): list of column names for matching (propensity column should be included) groupby_col (str): the column name to be used for stratification Returns: (pandas.DataFrame): The subset of data consisting of matched treatment and control group data. """ matched = data.groupby(groupby_col).apply( lambda x: self.match(data=x, treatment_col=treatment_col, score_cols=score_cols) ) return matched.reset_index(level=0, drop=True)
[docs]class MatchOptimizer(object): def __init__(self, treatment_col='is_treatment', ps_col='pihat', user_col=None, matching_covariates=['pihat'], max_smd=0.1, max_deviation=0.1, caliper_range=(0.01, 0.5), max_pihat_range=(0.95, 0.999), max_iter_per_param=5, min_users_per_group=1000, smd_cols=['pihat'], dev_cols_transformations={'pihat': np.mean}, dev_factor=1., verbose=True): """Finds the set of parameters that gives the best matching result. Score = (number of features with SMD > max_smd) + (sum of deviations for important variables * deviation factor) The logic behind the scoring is that we are most concerned with minimizing the number of features where SMD is lower than a certain threshold (max_smd). However, we would also like the matched dataset not deviate too much from the original dataset, in terms of key variable(s), so that we still retain a similar userbase. Args: - treatment_col (str): name of the treatment column - ps_col (str): name of the propensity score column - max_smd (float): maximum acceptable SMD - max_deviation (float): maximum acceptable deviation for important variables - caliper_range (tuple): low and high bounds for caliper search range - max_pihat_range (tuple): low and high bounds for max pihat search range - max_iter_per_param (int): maximum number of search values per parameters - min_users_per_group (int): minimum number of users per group in matched set - smd_cols (list): score is more sensitive to these features exceeding max_smd - dev_factor (float): importance weight factor for dev_cols (e.g. dev_factor=1 means a 10% deviation leads to penalty of 1 in score) - dev_cols_transformations (dict): dict of transformations to be made on dev_cols - verbose (bool): boolean flag for printing statements Returns: The best matched dataset (pd.DataFrame) """ self.treatment_col = treatment_col self.ps_col = ps_col self.user_col = user_col self.matching_covariates = matching_covariates self.max_smd = max_smd self.max_deviation = max_deviation self.caliper_range = np.linspace(*caliper_range, num=max_iter_per_param) self.max_pihat_range = np.linspace(*max_pihat_range, num=max_iter_per_param) self.max_iter_per_param = max_iter_per_param self.min_users_per_group = min_users_per_group self.smd_cols = smd_cols self.dev_factor = dev_factor self.dev_cols_transformations = dev_cols_transformations self.best_params = {} self.best_score = 1e7 # ideal score is 0 self.verbose = verbose self.pass_all = False
[docs] def single_match(self, score_cols, pihat_threshold, caliper): matcher = NearestNeighborMatch(caliper=caliper, replace=True) df_matched = matcher.match( data=self.df[self.df[self.ps_col] < pihat_threshold], treatment_col=self.treatment_col, score_cols=score_cols ) return df_matched
[docs] def check_table_one(self, tableone, matched, score_cols, pihat_threshold, caliper): # check if better than past runs smd_values = np.abs(tableone[tableone.index != 'n']['SMD'].astype(float)) num_cols_over_smd = (smd_values >= self.max_smd).sum() self.cols_to_fix = smd_values[smd_values >= self.max_smd].sort_values(ascending=False).index.values if self.user_col is None: num_users_per_group = matched.reset_index().groupby(self.treatment_col)['index'].count().min() else: num_users_per_group = matched.groupby(self.treatment_col)[self.user_col].count().min() deviations = [np.abs(self.original_stats[col] / matched[matched[self.treatment_col] == 1][col].mean() - 1) for col in self.dev_cols_transformations.keys()] score = num_cols_over_smd score += len([col for col in self.smd_cols if smd_values.loc[col] >= self.max_smd]) score += np.sum([dev*10*self.dev_factor for dev in deviations]) # check if can be considered as best score if score < self.best_score and num_users_per_group > self.min_users_per_group: self.best_score = score self.best_params = {'score_cols': score_cols.copy(), 'pihat': pihat_threshold, 'caliper': caliper} self.best_matched = matched.copy() if self.verbose: logger.info('\tScore: {:.03f} (Best Score: {:.03f})\n'.format(score, self.best_score)) # check if passes all criteria self.pass_all = ((num_users_per_group > self.min_users_per_group) and (num_cols_over_smd == 0) and all(dev < self.max_deviation for dev in deviations))
[docs] def match_and_check(self, score_cols, pihat_threshold, caliper): if self.verbose: logger.info('Preparing match for: caliper={:.03f}, ' 'pihat_threshold={:.03f}, ' 'score_cols={}'.format(caliper, pihat_threshold, score_cols)) df_matched = self.single_match(score_cols=score_cols, pihat_threshold=pihat_threshold, caliper=caliper) tableone = create_table_one(df_matched, self.treatment_col, self.matching_covariates) self.check_table_one(tableone, df_matched, score_cols, pihat_threshold, caliper)
[docs] def search_best_match(self, df): self.df = df self.original_stats = {} for col, trans in self.dev_cols_transformations.items(): self.original_stats[col] = trans(self.df[self.df[self.treatment_col] == 1][col]) # search best max pihat if self.verbose: logger.info('SEARCHING FOR BEST PIHAT') score_cols = [self.ps_col] caliper = self.caliper_range[-1] for pihat_threshold in self.max_pihat_range: self.match_and_check(score_cols, pihat_threshold, caliper) # search best score_cols if self.verbose: logger.info('SEARCHING FOR BEST SCORE_COLS') pihat_threshold = self.best_params['pihat'] caliper = self.caliper_range[int(self.caliper_range.shape[0]/2)] score_cols = [self.ps_col] while not self.pass_all: if len(self.cols_to_fix) == 0: break elif np.intersect1d(self.cols_to_fix, score_cols).shape[0] > 0: break else: score_cols.append(self.cols_to_fix[0]) self.match_and_check(score_cols, pihat_threshold, caliper) # search best caliper if self.verbose: logger.info('SEARCHING FOR BEST CALIPER') score_cols = self.best_params['score_cols'] pihat_threshold = self.best_params['pihat'] for caliper in self.caliper_range: self.match_and_check(score_cols, pihat_threshold, caliper) # summarize if self.verbose: logger.info('\n-----\nBest params are:\n{}'.format(self.best_params)) return self.best_matched
if __name__ == '__main__': from .features import TREATMENT_COL, SCORE_COL, GROUPBY_COL, PROPENSITY_FEATURES from .features import PROPENSITY_FEATURE_TRANSFORMATIONS, MATCHING_COVARIATES from .features import load_data from .propensity import ElasticNetPropensityModel parser = argparse.ArgumentParser() parser.add_argument('--input-file', required=True, dest='input_file') parser.add_argument('--output-file', required=True, dest='output_file') parser.add_argument('--treatment-col', default=TREATMENT_COL, dest='treatment_col') parser.add_argument('--groupby-col', default=GROUPBY_COL, dest='groupby_col') parser.add_argument('--feature-cols', nargs='+', default=PROPENSITY_FEATURES, dest='feature_cols') parser.add_argument('--caliper', type=float, default=.2) parser.add_argument('--replace', default=False, action='store_true') parser.add_argument('--ratio', type=int, default=1) args = parser.parse_args() logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) logger.info('Loading data from {}'.format(args.input_file)) df = pd.read_csv(args.input_file) df[args.treatment_col] = df[args.treatment_col].astype(int) logger.info('shape: {}\n{}'.format(df.shape, df.head())) pm = ElasticNetPropensityModel(random_state=42) w = df[args.treatment_col].values X = load_data(data=df, features=args.feature_cols, transformations=PROPENSITY_FEATURE_TRANSFORMATIONS) logger.info('Scoring with a propensity model: {}'.format(pm)) df[SCORE_COL] = pm.fit_predict(X, w) logger.info('Balance before matching:\n{}'.format(create_table_one(data=df, treatment_col=args.treatment_col, features=MATCHING_COVARIATES))) logger.info('Matching based on the propensity score with the nearest neighbor model') psm = NearestNeighborMatch(replace=args.replace, ratio=args.ratio, random_state=42) matched = psm.match_by_group(data=df, treatment_col=args.treatment_col, score_cols=[SCORE_COL], groupby_col=args.groupby_col) logger.info('shape: {}\n{}'.format(matched.shape, matched.head())) logger.info('Balance after matching:\n{}'.format(create_table_one(data=matched, treatment_col=args.treatment_col, features=MATCHING_COVARIATES))) matched.to_csv(args.output_file, index=False) logger.info('Matched data saved as {}'.format(args.output_file))