Source code for causalml.dataset.synthetic

from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import auc
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from xgboost import XGBRegressor
from scipy.stats import entropy
import warnings

from causalml.inference.meta import (
    BaseXRegressor,
    BaseRRegressor,
    BaseSRegressor,
    BaseTRegressor,
)
from causalml.inference.tree.causal.causaltree import CausalTreeRegressor
from causalml.propensity import ElasticNetPropensityModel
from causalml.metrics import plot_gain, get_cumgain


plt.style.use("fivethirtyeight")
warnings.filterwarnings("ignore")

KEY_GENERATED_DATA = "generated_data"
KEY_ACTUAL = "Actuals"

RANDOM_SEED = 42


[docs]def get_synthetic_preds(synthetic_data_func, n=1000, estimators={}): """Generate predictions for synthetic data using specified function (single simulation) Args: synthetic_data_func (function): synthetic data generation function n (int, optional): number of samples estimators (dict of object): dict of names and objects of treatment effect estimators Returns: (dict): dict of the actual and estimates of treatment effects """ y, X, w, tau, b, e = synthetic_data_func(n=n) preds_dict = {} preds_dict[KEY_ACTUAL] = tau preds_dict[KEY_GENERATED_DATA] = { "y": y, "X": X, "w": w, "tau": tau, "b": b, "e": e, } # Predict p_hat because e would not be directly observed in real-life p_model = ElasticNetPropensityModel() p_hat = p_model.fit_predict(X, w) if estimators: for name, learner in estimators.items(): try: preds_dict[name] = learner.fit_predict( X=X, treatment=w, y=y, p=p_hat ).flatten() except TypeError: preds_dict[name] = learner.fit_predict(X=X, treatment=w, y=y).flatten() else: for base_learner, label_l in zip( [BaseSRegressor, BaseTRegressor, BaseXRegressor, BaseRRegressor], ["S", "T", "X", "R"], ): for model, label_m in zip([LinearRegression, XGBRegressor], ["LR", "XGB"]): learner = base_learner(model()) model_name = "{} Learner ({})".format(label_l, label_m) try: preds_dict[model_name] = learner.fit_predict( X=X, treatment=w, y=y, p=p_hat ).flatten() except TypeError: preds_dict[model_name] = learner.fit_predict( X=X, treatment=w, y=y ).flatten() learner = CausalTreeRegressor(random_state=RANDOM_SEED) preds_dict["Causal Tree"] = learner.fit_predict(X=X, treatment=w, y=y).flatten() return preds_dict
[docs]def get_synthetic_summary(synthetic_data_func, n=1000, k=1, estimators={}): """Generate a summary for predictions on synthetic data using specified function Args: synthetic_data_func (function): synthetic data generation function n (int, optional): number of samples per simulation k (int, optional): number of simulations """ summaries = [] for i in range(k): synthetic_preds = get_synthetic_preds( synthetic_data_func, n=n, estimators=estimators ) actuals = synthetic_preds[KEY_ACTUAL] synthetic_summary = pd.DataFrame( { label: [preds.mean(), mse(preds, actuals)] for label, preds in synthetic_preds.items() if label != KEY_GENERATED_DATA }, index=["ATE", "MSE"], ).T synthetic_summary["Abs % Error of ATE"] = np.abs( (synthetic_summary["ATE"] / synthetic_summary.loc[KEY_ACTUAL, "ATE"]) - 1 ) for label in synthetic_summary.index: stacked_values = np.hstack((synthetic_preds[label], actuals)) stacked_low = np.percentile(stacked_values, 0.1) stacked_high = np.percentile(stacked_values, 99.9) bins = np.linspace(stacked_low, stacked_high, 100) distr = np.histogram(synthetic_preds[label], bins=bins)[0] distr = np.clip(distr / distr.sum(), 0.001, 0.999) true_distr = np.histogram(actuals, bins=bins)[0] true_distr = np.clip(true_distr / true_distr.sum(), 0.001, 0.999) kl = entropy(distr, true_distr) synthetic_summary.loc[label, "KL Divergence"] = kl summaries.append(synthetic_summary) summary = sum(summaries) / k return summary[["Abs % Error of ATE", "MSE", "KL Divergence"]]
[docs]def scatter_plot_summary(synthetic_summary, k, drop_learners=[], drop_cols=[]): """Generates a scatter plot comparing learner performance. Each learner's performance is plotted as a point in the (Abs % Error of ATE, MSE) space. Args: synthetic_summary (pd.DataFrame): summary generated by get_synthetic_summary() k (int): number of simulations (used only for plot title text) drop_learners (list, optional): list of learners (str) to omit when plotting drop_cols (list, optional): list of metrics (str) to omit when plotting """ plot_data = synthetic_summary.drop(drop_learners).drop(drop_cols, axis=1) fig, ax = plt.subplots() fig.set_size_inches(12, 8) xs = plot_data["Abs % Error of ATE"] ys = plot_data["MSE"] ax.scatter(xs, ys) ylim = ax.get_ylim() xlim = ax.get_xlim() for i, txt in enumerate(plot_data.index): ax.annotate( txt, ( xs[i] - np.random.binomial(1, 0.5) * xlim[1] * 0.04, ys[i] - ylim[1] * 0.03, ), ) ax.set_xlabel("Abs % Error of ATE") ax.set_ylabel("MSE") ax.set_title("Learner Performance (averaged over k={} simulations)".format(k))
[docs]def bar_plot_summary( synthetic_summary, k, drop_learners=[], drop_cols=[], sort_cols=["MSE", "Abs % Error of ATE"], ): """Generates a bar plot comparing learner performance. Args: synthetic_summary (pd.DataFrame): summary generated by get_synthetic_summary() k (int): number of simulations (used only for plot title text) drop_learners (list, optional): list of learners (str) to omit when plotting drop_cols (list, optional): list of metrics (str) to omit when plotting sort_cols (list, optional): list of metrics (str) to sort on when plotting """ plot_data = synthetic_summary.sort_values(sort_cols, ascending=True) plot_data = plot_data.drop(drop_learners + [KEY_ACTUAL]).drop(drop_cols, axis=1) plot_data.plot(kind="bar", figsize=(12, 8)) plt.xticks(rotation=30) plt.title("Learner Performance (averaged over k={} simulations)".format(k))
[docs]def distr_plot_single_sim( synthetic_preds, kind="kde", drop_learners=[], bins=50, histtype="step", alpha=1, linewidth=1, bw_method=1, ): """Plots the distribution of each learner's predictions (for a single simulation). Kernel Density Estimation (kde) and actual histogram plots supported. Args: synthetic_preds (dict): dictionary of predictions generated by get_synthetic_preds() kind (str, optional): 'kde' or 'hist' drop_learners (list, optional): list of learners (str) to omit when plotting bins (int, optional): number of bins to plot if kind set to 'hist' histtype (str, optional): histogram type if kind set to 'hist' alpha (float, optional): alpha (transparency) for plotting linewidth (int, optional): line width for plotting bw_method (float, optional): parameter for kde """ preds_for_plot = synthetic_preds.copy() # deleted generated data and assign actual value del preds_for_plot[KEY_GENERATED_DATA] global_lower = np.percentile(np.hstack(list(preds_for_plot.values())), 1) global_upper = np.percentile(np.hstack(list(preds_for_plot.values())), 99) learners = list(preds_for_plot.keys()) learners = [learner for learner in learners if learner not in drop_learners] # Plotting plt.figure(figsize=(12, 8)) colors = [ "black", "red", "blue", "green", "cyan", "brown", "grey", "pink", "orange", "yellow", ] for i, (k, v) in enumerate(preds_for_plot.items()): if k in learners: if kind == "kde": v = pd.Series(v.flatten()) v = v[v.between(global_lower, global_upper)] v.plot( kind="kde", bw_method=bw_method, label=k, linewidth=linewidth, color=colors[i], ) elif kind == "hist": plt.hist( v, bins=np.linspace(global_lower, global_upper, bins), label=k, histtype=histtype, alpha=alpha, linewidth=linewidth, color=colors[i], ) else: pass plt.xlim(global_lower, global_upper) plt.legend(loc="center left", bbox_to_anchor=(1, 0.5)) plt.title("Distribution from a Single Simulation")
[docs]def scatter_plot_single_sim(synthetic_preds): """Creates a grid of scatter plots comparing each learner's predictions with the truth (for a single simulation). Args: synthetic_preds (dict): dictionary of predictions generated by get_synthetic_preds() or get_synthetic_preds_holdout() """ preds_for_plot = synthetic_preds.copy() # deleted generated data and get actual column name del preds_for_plot[KEY_GENERATED_DATA] n_row = int(np.ceil(len(preds_for_plot.keys()) / 3)) fig, axes = plt.subplots(n_row, 3, figsize=(5 * n_row, 15)) axes = np.ravel(axes) for i, (label, preds) in enumerate(preds_for_plot.items()): axes[i].scatter(preds_for_plot[KEY_ACTUAL], preds, s=2, label="Predictions") axes[i].set_title(label, size=12) axes[i].set_xlabel("Actual", size=10) axes[i].set_ylabel("Prediction", size=10) xlim = axes[i].get_xlim() ylim = axes[i].get_xlim() axes[i].plot( [xlim[0], xlim[1]], [ylim[0], ylim[1]], label="Perfect Model", linewidth=1, color="grey", ) axes[i].legend(loc=2, prop={"size": 10})
[docs]def get_synthetic_preds_holdout( synthetic_data_func, n=1000, valid_size=0.2, estimators={} ): """Generate predictions for synthetic data using specified function (single simulation) for train and holdout Args: synthetic_data_func (function): synthetic data generation function n (int, optional): number of samples valid_size(float,optional): validaiton/hold out data size estimators (dict of object): dict of names and objects of treatment effect estimators Returns: (tuple): synthetic training and validation data dictionaries: - preds_dict_train (dict): synthetic training data dictionary - preds_dict_valid (dict): synthetic validation data dictionary """ y, X, w, tau, b, e = synthetic_data_func(n=n) ( X_train, X_val, y_train, y_val, w_train, w_val, tau_train, tau_val, b_train, b_val, e_train, e_val, ) = train_test_split( X, y, w, tau, b, e, test_size=valid_size, random_state=RANDOM_SEED, shuffle=True ) preds_dict_train = {} preds_dict_valid = {} preds_dict_train[KEY_ACTUAL] = tau_train preds_dict_valid[KEY_ACTUAL] = tau_val preds_dict_train["generated_data"] = { "y": y_train, "X": X_train, "w": w_train, "tau": tau_train, "b": b_train, "e": e_train, } preds_dict_valid["generated_data"] = { "y": y_val, "X": X_val, "w": w_val, "tau": tau_val, "b": b_val, "e": e_val, } # Predict p_hat because e would not be directly observed in real-life p_model = ElasticNetPropensityModel() p_hat_train = p_model.fit_predict(X_train, w_train) p_hat_val = p_model.fit_predict(X_val, w_val) for base_learner, label_l in zip( [BaseSRegressor, BaseTRegressor, BaseXRegressor, BaseRRegressor], ["S", "T", "X", "R"], ): for model, label_m in zip([LinearRegression, XGBRegressor], ["LR", "XGB"]): # RLearner will need to fit on the p_hat if label_l != "R": learner = base_learner(model()) # fit the model on training data only learner.fit(X=X_train, treatment=w_train, y=y_train) try: preds_dict_train["{} Learner ({})".format(label_l, label_m)] = ( learner.predict(X=X_train, p=p_hat_train).flatten() ) preds_dict_valid["{} Learner ({})".format(label_l, label_m)] = ( learner.predict(X=X_val, p=p_hat_val).flatten() ) except TypeError: preds_dict_train["{} Learner ({})".format(label_l, label_m)] = ( learner.predict( X=X_train, treatment=w_train, y=y_train ).flatten() ) preds_dict_valid["{} Learner ({})".format(label_l, label_m)] = ( learner.predict(X=X_val, treatment=w_val, y=y_val).flatten() ) else: learner = base_learner(model()) learner.fit(X=X_train, p=p_hat_train, treatment=w_train, y=y_train) preds_dict_train["{} Learner ({})".format(label_l, label_m)] = ( learner.predict(X=X_train).flatten() ) preds_dict_valid["{} Learner ({})".format(label_l, label_m)] = ( learner.predict(X=X_val).flatten() ) return preds_dict_train, preds_dict_valid
[docs]def get_synthetic_summary_holdout(synthetic_data_func, n=1000, valid_size=0.2, k=1): """Generate a summary for predictions on synthetic data for train and holdout using specified function Args: synthetic_data_func (function): synthetic data generation function n (int, optional): number of samples per simulation valid_size(float,optional): validation/hold out data size k (int, optional): number of simulations Returns: (tuple): summary evaluation metrics of predictions for train and validation: - summary_train (pandas.DataFrame): training data evaluation summary - summary_train (pandas.DataFrame): validation data evaluation summary """ summaries_train = [] summaries_validation = [] for i in range(k): preds_dict_train, preds_dict_valid = get_synthetic_preds_holdout( synthetic_data_func, n=n, valid_size=valid_size ) actuals_train = preds_dict_train[KEY_ACTUAL] actuals_validation = preds_dict_valid[KEY_ACTUAL] synthetic_summary_train = pd.DataFrame( { label: [preds.mean(), mse(preds, actuals_train)] for label, preds in preds_dict_train.items() if KEY_GENERATED_DATA not in label.lower() }, index=["ATE", "MSE"], ).T synthetic_summary_train["Abs % Error of ATE"] = np.abs( ( synthetic_summary_train["ATE"] / synthetic_summary_train.loc[KEY_ACTUAL, "ATE"] ) - 1 ) synthetic_summary_validation = pd.DataFrame( { label: [preds.mean(), mse(preds, actuals_validation)] for label, preds in preds_dict_valid.items() if KEY_GENERATED_DATA not in label.lower() }, index=["ATE", "MSE"], ).T synthetic_summary_validation["Abs % Error of ATE"] = np.abs( ( synthetic_summary_validation["ATE"] / synthetic_summary_validation.loc[KEY_ACTUAL, "ATE"] ) - 1 ) # calculate kl divergence for training for label in synthetic_summary_train.index: stacked_values = np.hstack((preds_dict_train[label], actuals_train)) stacked_low = np.percentile(stacked_values, 0.1) stacked_high = np.percentile(stacked_values, 99.9) bins = np.linspace(stacked_low, stacked_high, 100) distr = np.histogram(preds_dict_train[label], bins=bins)[0] distr = np.clip(distr / distr.sum(), 0.001, 0.999) true_distr = np.histogram(actuals_train, bins=bins)[0] true_distr = np.clip(true_distr / true_distr.sum(), 0.001, 0.999) kl = entropy(distr, true_distr) synthetic_summary_train.loc[label, "KL Divergence"] = kl # calculate kl divergence for validation for label in synthetic_summary_validation.index: stacked_values = np.hstack((preds_dict_valid[label], actuals_validation)) stacked_low = np.percentile(stacked_values, 0.1) stacked_high = np.percentile(stacked_values, 99.9) bins = np.linspace(stacked_low, stacked_high, 100) distr = np.histogram(preds_dict_valid[label], bins=bins)[0] distr = np.clip(distr / distr.sum(), 0.001, 0.999) true_distr = np.histogram(actuals_validation, bins=bins)[0] true_distr = np.clip(true_distr / true_distr.sum(), 0.001, 0.999) kl = entropy(distr, true_distr) synthetic_summary_validation.loc[label, "KL Divergence"] = kl summaries_train.append(synthetic_summary_train) summaries_validation.append(synthetic_summary_validation) summary_train = sum(summaries_train) / k summary_validation = sum(summaries_validation) / k return ( summary_train[["Abs % Error of ATE", "MSE", "KL Divergence"]], summary_validation[["Abs % Error of ATE", "MSE", "KL Divergence"]], )
[docs]def scatter_plot_summary_holdout( train_summary, validation_summary, k, label=["Train", "Validation"], drop_learners=[], drop_cols=[], ): """Generates a scatter plot comparing learner performance by training and validation. Args: train_summary (pd.DataFrame): summary for training synthetic data generated by get_synthetic_summary_holdout() validation_summary (pd.DataFrame): summary for validation synthetic data generated by get_synthetic_summary_holdout() label (string, optional): legend label for plot k (int): number of simulations (used only for plot title text) drop_learners (list, optional): list of learners (str) to omit when plotting drop_cols (list, optional): list of metrics (str) to omit when plotting """ train_summary = train_summary.drop(drop_learners).drop(drop_cols, axis=1) validation_summary = validation_summary.drop(drop_learners).drop(drop_cols, axis=1) plot_data = pd.concat([train_summary, validation_summary]) plot_data["label"] = [i.replace("Train", "") for i in plot_data.index] plot_data["label"] = [i.replace("Validation", "") for i in plot_data.label] fig, ax = plt.subplots() fig.set_size_inches(12, 8) xs = plot_data["Abs % Error of ATE"] ys = plot_data["MSE"] group = np.array( [label[0]] * train_summary.shape[0] + [label[1]] * validation_summary.shape[0] ) cdict = {label[0]: "red", label[1]: "blue"} for g in np.unique(group): ix = np.where(group == g)[0].tolist() ax.scatter(xs[ix], ys[ix], c=cdict[g], label=g, s=100) for i, txt in enumerate(plot_data.label[:10]): ax.annotate(txt, (xs[i] + 0.005, ys[i])) ax.set_xlabel("Abs % Error of ATE") ax.set_ylabel("MSE") ax.set_title("Learner Performance (averaged over k={} simulations)".format(k)) ax.legend(loc="center left", bbox_to_anchor=(1.1, 0.5)) plt.show()
[docs]def bar_plot_summary_holdout( train_summary, validation_summary, k, drop_learners=[], drop_cols=[] ): """Generates a bar plot comparing learner performance by training and validation Args: train_summary (pd.DataFrame): summary for training synthetic data generated by get_synthetic_summary_holdout() validation_summary (pd.DataFrame): summary for validation synthetic data generated by get_synthetic_summary_holdout() k (int): number of simulations (used only for plot title text) drop_learners (list, optional): list of learners (str) to omit when plotting drop_cols (list, optional): list of metrics (str) to omit when plotting """ train_summary = train_summary.drop([KEY_ACTUAL]) train_summary["Learner"] = train_summary.index validation_summary = validation_summary.drop([KEY_ACTUAL]) validation_summary["Learner"] = validation_summary.index for metric in ["Abs % Error of ATE", "MSE", "KL Divergence"]: plot_data_sub = pd.DataFrame(train_summary.Learner).reset_index(drop=True) plot_data_sub["train"] = train_summary[metric].values plot_data_sub["validation"] = validation_summary[metric].values plot_data_sub = plot_data_sub.set_index("Learner") plot_data_sub = plot_data_sub.drop(drop_learners).drop(drop_cols, axis=1) plot_data_sub = plot_data_sub.sort_values("train", ascending=True) plot_data_sub.plot(kind="bar", color=["red", "blue"], figsize=(12, 8)) plt.xticks(rotation=30) plt.title( "Learner Performance of {} (averaged over k={} simulations)".format( metric, k ) )
[docs]def get_synthetic_auuc( synthetic_preds, drop_learners=[], outcome_col="y", treatment_col="w", treatment_effect_col="tau", plot=True, ): """Get auuc values for cumulative gains of model estimates in quantiles. For details, reference get_cumgain() and plot_gain() Args: synthetic_preds (dict): dictionary of predictions generated by get_synthetic_preds() or get_synthetic_preds_holdout() outcome_col (str, optional): the column name for the actual outcome treatment_col (str, optional): the column name for the treatment indicator (0 or 1) treatment_effect_col (str, optional): the column name for the true treatment effect plot (boolean,optional): plot the cumulative gain chart or not Returns: (pandas.DataFrame): auuc values by learner for cumulative gains of model estimates """ synthetic_preds_df = synthetic_preds.copy() generated_data = synthetic_preds_df.pop(KEY_GENERATED_DATA) synthetic_preds_df = pd.DataFrame(synthetic_preds_df) synthetic_preds_df = synthetic_preds_df.drop(drop_learners, axis=1) synthetic_preds_df["y"] = generated_data[outcome_col] synthetic_preds_df["w"] = generated_data[treatment_col] if treatment_effect_col in generated_data.keys(): synthetic_preds_df["tau"] = generated_data[treatment_effect_col] assert ( (outcome_col in synthetic_preds_df.columns) and (treatment_col in synthetic_preds_df.columns) or treatment_effect_col in synthetic_preds_df.columns ) cumlift = get_cumgain( synthetic_preds_df, outcome_col="y", treatment_col="w", treatment_effect_col="tau", ) auuc_df = pd.DataFrame(cumlift.columns) auuc_df.columns = ["Learner"] auuc_df["cum_gain_auuc"] = [ auc(cumlift.index.values / 100, cumlift[learner].values) for learner in cumlift.columns ] auuc_df = auuc_df.sort_values("cum_gain_auuc", ascending=False) if plot: plot_gain( synthetic_preds_df, outcome_col=outcome_col, treatment_col=treatment_col, treatment_effect_col=treatment_effect_col, ) return auuc_df