Source code for causalml.dataset.classification

import random
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from scipy.interpolate import UnivariateSpline
from scipy.optimize import fsolve
from scipy.special import expit, logit


# ------ Define a list of functions for feature transformation
# @staticmethod
def _f_linear(x):
    """
    Linear transformation (actually identical transformation)
    """
    return np.array(x)


# @staticmethod
def _f_quadratic(x):
    """
    Quadratic transformation
    """
    return np.array(x) * np.array(x)


# @staticmethod
def _f_cubic(x):
    """
    Quadratic transformation
    """
    return np.array(x) * np.array(x) * np.array(x)


# @staticmethod
def _f_relu(x):
    """
    Relu transformation
    """
    x = np.array(x)
    return np.maximum(x, 0)


# @staticmethod
def _f_sin(x):
    """
    Sine transformation
    """
    return np.sin(np.array(x) * np.pi)


# @staticmethod
def _f_cos(x):
    """
    Cosine transformation
    """
    return np.cos(np.array(x) * np.pi)


# ------ Generating non-linear splines as feature transformation functions
# @staticmethod
def _generate_splines(
    n_functions=10,
    n_initial_points=10,
    s=0.01,
    x_min=-3,
    x_max=3,
    y_min=0,
    y_max=1,
    random_seed=2019,
):
    """
    Generate a list of spline functions for feature
    transformation.

    Parameters
    ----------
    n_functions : int, optional
        Number of spline functions to be created.
    n_initial_points: int, optional
        Number of initial random points to be placed on a 2D plot to fit a spline.
    s:  float or None, optional
        Positive smoothing factor used to choose the number of knots (arg in scipy.interpolate.UnivariateSpline).
    x_min: int or float, optional
        The minimum value of the X range.
    x_max:  int or float, optional
        The maximum value of the X range.
    y_min: int or float, optional
        The minimum value of the Y range.
    y_max: int or float, optional
        The maxium value of the Y range.
    random_seed: int, optional
        Random seed.

    Returns
    -------
    spls: list
        List of spline functions.
    """
    np.random.seed(random_seed)
    spls = []
    for i in range(n_functions):
        x = np.linspace(x_min, x_max, n_initial_points)
        y = np.random.uniform(y_min, y_max, n_initial_points)
        spl = UnivariateSpline(x, y, s=s)
        spls.append(spl)
    return spls


# @staticmethod
def _standardize(x):
    """
    Standardize a vector to be mean 0 and std 1.
    """
    return (np.array(x) - np.mean(x)) / np.std(x)


# @staticmethod
def _fixed_transformation(fs, x, f_index=0):
    """
    Transform and standardize a vector by a transformation function.
    If the given index is within the function list f_index < len(fs), then use fs[f_index] as the transformation
    function. Otherwise, randomly choose a function from the function list.

    Parameters
    ----------
    fs : list
        A collection of functions for transformation.
    x : list
        Feature values to be transformed.
    f_index : int, optional
        The function index to be used to select a transformation function.
    """
    try:
        y = fs[f_index](x)
    except IndexError:
        y = fs[np.asscalar(np.random.choice(len(fs), 1))](x)
    y = _standardize(y)
    return y


# @staticmethod
def _random_transformation(fs, x):
    """
    Transform and standardize a vector by a function randomly chosen from
    the function collection.

    Parameters
    ----------
    fs : list
        A collection of functions (splines) for transformation.
    x : list
        Feature values to be transformed.
    """
    fi = np.random.choice(range(len(fs)), 1)
    y = fs[fi[0]](x)
    y = _standardize(y)
    return y


# @staticmethod
def _softmax(z, p, xb):
    """
    Softmax function. This function is used to reversely solve the constant root value in the linear part to make the
    softmax function output mean to be a given value.

    Parameters
    ----------
    z : float
        Constant value in the linear part.
    p : float
        The target output mean value.
    xb : list
        An array, with each element as the sum of product of coefficient and feature value
    """
    sm_arr = expit(z + np.array(xb))
    res = p - np.mean(sm_arr)
    return res


# ------ Data generation function (V2) using logistic regression as underlying model
[docs]def make_uplift_classification_logistic( n_samples=10000, treatment_name=["control", "treatment1", "treatment2", "treatment3"], y_name="conversion", n_classification_features=10, n_classification_informative=5, n_classification_redundant=0, n_classification_repeated=0, n_uplift_dict={"treatment1": 2, "treatment2": 2, "treatment3": 3}, n_mix_informative_uplift_dict={"treatment1": 1, "treatment2": 1, "treatment3": 0}, delta_uplift_dict={"treatment1": 0.02, "treatment2": 0.05, "treatment3": -0.05}, positive_class_proportion=0.1, random_seed=20200101, feature_association_list=["linear", "quadratic", "cubic", "relu", "sin", "cos"], random_select_association=True, error_std=0.05, ): """Generate a synthetic dataset for classification uplift modeling problem. Parameters ---------- n_samples : int, optional (default=1000) The number of samples to be generated for each treatment group. treatment_name: list, optional (default = ['control','treatment1','treatment2','treatment3']) The list of treatment names. The first element must be 'control' as control group, and the rest are treated as treatment groups. y_name: string, optional (default = 'conversion') The name of the outcome variable to be used as a column in the output dataframe. n_classification_features: int, optional (default = 10) Total number of features for base classification n_classification_informative: int, optional (default = 5) Total number of informative features for base classification n_classification_redundant: int, optional (default = 0) Total number of redundant features for base classification n_classification_repeated: int, optional (default = 0) Total number of repeated features for base classification n_uplift_dict: dictionary, optional (default: {'treatment1': 2, 'treatment2': 2, 'treatment3': 3}) Number of features for generating heterogeneous treatment effects for corresponding treatment group. Dictionary of {treatment_key: number_of_features_for_uplift}. n_mix_informative_uplift_dict: dictionary, optional (default: {'treatment1': 1, 'treatment2': 1, 'treatment3': 1}) Number of mix features for each treatment. The mix feature is defined as a linear combination of a randomly selected informative classification feature and a randomly selected uplift feature. The mixture is made by a weighted sum (p*feature1 + (1-p)*feature2), where the weight p is drawn from a uniform distribution between 0 and 1. delta_uplift_dict: dictionary, optional (default: {'treatment1': .02, 'treatment2': .05, 'treatment3': -.05}) Treatment effect (delta), can be positive or negative. Dictionary of {treatment_key: delta}. positive_class_proportion: float, optional (default = 0.1) The proportion of positive label (1) in the control group, or the mean of outcome variable for control group. random_seed : int, optional (default = 20200101) The random seed to be used in the data generation process. feature_association_list : list, optional (default = ['linear','quadratic','cubic','relu','sin','cos']) List of uplift feature association patterns to the treatment effect. For example, if the feature pattern is 'quadratic', then the treatment effect will increase or decrease quadratically with the feature. The values in the list must be one of ('linear','quadratic','cubic','relu','sin','cos'). However, the same value can appear multiple times in the list. random_select_association : boolean, optional (default = True) How the feature patterns are selected from the feature_association_list to be applied in the data generation process. If random_select_association = True, then for every uplift feature, a random feature association pattern is selected from the list. If random_select_association = False, then the feature association pattern is selected from the list in turns to be applied to each feature one by one. error_std : float, optional (default = 0.05) Standard deviation to be used in the error term of the logistic regression. The error is drawn from a normal distribution with mean 0 and standard deviation specified in this argument. Returns ------- df1 : DataFrame A data frame containing the treatment label, features, and outcome variable. x_name : list The list of feature names generated. """ # Set means for each experiment group mean_dict = {} mean_dict[treatment_name[0]] = positive_class_proportion for treatment_key_i in treatment_name[1:]: mean_dict[treatment_key_i] = positive_class_proportion if treatment_key_i in delta_uplift_dict: mean_dict[treatment_key_i] += delta_uplift_dict[treatment_key_i] # create data frame df1 = pd.DataFrame() n = n_samples # set seed np.random.seed(seed=random_seed) # define feature association function list ------------------------------------------------# feature_association_pattern_dict = { "linear": _f_linear, "quadratic": _f_quadratic, "cubic": _f_cubic, "relu": _f_relu, "sin": _f_sin, "cos": _f_cos, } f_list = [] for fi in feature_association_list: f_list.append(feature_association_pattern_dict[fi]) # generate treatment key ------------------------------------------------# treatment_list = [] for ti in treatment_name: treatment_list += [ti] * n treatment_list = np.random.permutation(treatment_list) df1["treatment_group_key"] = treatment_list # feature name list x_name = [] x_informative_name = [] x_informative_transformed = [] # generate informative features -----------------------------------------# for xi in range(n_classification_informative): # observed feature x = np.random.normal(0, 1, df1.shape[0]) x_name_i = "x" + str(len(x_name) + 1) + "_informative" x_name.append(x_name_i) x_informative_name.append(x_name_i) df1[x_name_i] = x # transformed feature that takes effect in the model x_name_i = x_name_i + "_transformed" df1[x_name_i] = _fixed_transformation(f_list, x, xi) x_informative_transformed.append(x_name_i) # generate redundant features (linear) ----------------------------------# # linearly combine informative ones for xi in range(n_classification_redundant): nx = ( np.random.choice(n_classification_informative, size=1, replace=False)[0] + 1 ) bx = np.random.normal(0, 1, size=nx) fx = np.random.choice( n_classification_informative, size=nx, replace=False, p=None ) x_name_i = "x" + str(len(x_name) + 1) + "_redundant_linear" for xxi in range(nx): x_name_i += "_x" + str(fx[xxi] + 1) x_name.append(x_name_i) x = np.zeros(df1.shape[0]) for xxi in range(nx): x += bx[xxi] * df1[x_name[fx[xxi]]] x = _standardize(x) df1[x_name_i] = x # generate repeated features --------------------------------------------# # randomly select from informative ones for xi in range(n_classification_repeated): # [N] sklearn.datasets.make_classification may also draw repeated # features from redundant ones fx = np.random.choice( n_classification_informative, size=1, replace=False, p=None ) x_name_i = "x" + str(len(x_name) + 1) + "_repeated" + "_x" + str(fx[0] + 1) x_name.append(x_name_i) df1[x_name_i] = df1[x_name[fx[0]]] # generate irrelevant features ------------------------------------------# for xi in range( n_classification_features - n_classification_informative - n_classification_redundant - n_classification_repeated ): x_name_i = "x" + str(len(x_name) + 1) + "_irrelevant" x_name.append(x_name_i) df1[x_name_i] = np.random.normal(0, 1, df1.shape[0]) # Generate uplift features ------------------------------------------------# x_name_uplift_transformed_dict = dict() for treatment_key_i in treatment_name: treatment_index = df1.index[ df1["treatment_group_key"] == treatment_key_i ].tolist() if treatment_key_i in n_uplift_dict and n_uplift_dict[treatment_key_i] > 0: x_name_uplift_transformed = [] x_name_uplift = [] for xi in range(n_uplift_dict[treatment_key_i]): # observed feature x = np.random.normal(0, 1, df1.shape[0]) x_name_i = "x" + str(len(x_name) + 1) + "_uplift" x_name.append(x_name_i) x_name_uplift.append(x_name_i) df1[x_name_i] = x # transformed feature that takes effect in the model x_name_i = x_name_i + "_transformed" if random_select_association: df1[x_name_i] = _fixed_transformation( f_list, x, random.randint(0, len(f_list) - 1) ) else: df1[x_name_i] = _fixed_transformation(f_list, x, xi % len(f_list)) x_name_uplift_transformed.append(x_name_i) x_name_uplift_transformed_dict[treatment_key_i] = x_name_uplift_transformed # generate mixed informative and uplift features for treatment_key_i in treatment_name: if ( treatment_key_i in n_mix_informative_uplift_dict and n_mix_informative_uplift_dict[treatment_key_i] > 0 ): for xi in range(n_mix_informative_uplift_dict[treatment_key_i]): x_name_i = "x" + str(len(x_name) + 1) + "_mix" x_name.append(x_name_i) p_weight = np.random.uniform(0, 1) df1[x_name_i] = ( p_weight * df1[np.random.choice(x_informative_name)] + (1 - p_weight) * df1[np.random.choice(x_name_uplift)] ) # generate conversion probability ------------------------------------------------# # baseline conversion coef_classify = [] for ci in range(n_classification_informative): rcoef = [0] while np.abs(rcoef) < 0.1: rcoef = np.random.randn(1) * np.sqrt(1.0 / n_classification_informative) coef_classify.append(rcoef[0]) x_classify = df1[x_informative_transformed].values p1 = positive_class_proportion a10 = logit(p1) err = np.random.normal(0, error_std, df1.shape[0]) xb_array = (x_classify * coef_classify).sum(axis=1) + err # solve for the constant value so that the output metric mean equal to the function input positive_class_proportion a1 = fsolve(_softmax, a10, args=(p1, xb_array))[0] df1["conversion_prob_linear"] = a1 + xb_array df1["control_conversion_prob_linear"] = df1["conversion_prob_linear"].values # uplift conversion for treatment_key_i in treatment_name: if ( treatment_key_i in delta_uplift_dict and np.abs(delta_uplift_dict[treatment_key_i]) > 0.0 ): treatment_index = df1.index[ df1["treatment_group_key"] == treatment_key_i ].tolist() # coefficient coef_uplift = [] for ci in range(n_uplift_dict[treatment_key_i]): coef_uplift.append(0.5) x_uplift = df1.loc[ :, x_name_uplift_transformed_dict[treatment_key_i] ].values p2 = mean_dict[treatment_key_i] a20 = np.log(p2 / (1.0 - p2)) - a1 xb_array = df1["conversion_prob_linear"].values + ( x_uplift * coef_uplift ).sum(axis=1) xb_array_treatment = xb_array[treatment_index] a2 = fsolve(_softmax, a20, args=(p2, xb_array_treatment))[0] df1["%s_conversion_prob_linear" % (treatment_key_i)] = a2 + xb_array df1.loc[treatment_index, "conversion_prob_linear"] = df1.loc[ treatment_index, "%s_conversion_prob_linear" % (treatment_key_i) ].values else: df1["%s_conversion_prob_linear" % (treatment_key_i)] = df1[ "conversion_prob_linear" ].values # generate conversion probability and true treatment effect ---------------------------------# df1["conversion_prob"] = 1 / (1 + np.exp(-df1["conversion_prob_linear"].values)) df1["control_conversion_prob"] = 1 / ( 1 + np.exp(-df1["control_conversion_prob_linear"].values) ) for treatment_key_i in treatment_name: df1["%s_conversion_prob" % (treatment_key_i)] = 1 / ( 1 + np.exp(-df1["%s_conversion_prob_linear" % (treatment_key_i)].values) ) df1["%s_true_effect" % (treatment_key_i)] = ( df1["%s_conversion_prob" % (treatment_key_i)].values - df1["control_conversion_prob"].values ) # generate Y ------------------------------------------------------------# df1["conversion_prob"] = np.clip(df1["conversion_prob"].values, 0, 1) df1[y_name] = np.random.binomial(1, df1["conversion_prob"].values) return df1, x_name
[docs]def make_uplift_classification( n_samples=1000, treatment_name=["control", "treatment1", "treatment2", "treatment3"], y_name="conversion", n_classification_features=10, n_classification_informative=5, n_classification_redundant=0, n_classification_repeated=0, n_uplift_increase_dict={"treatment1": 2, "treatment2": 2, "treatment3": 2}, n_uplift_decrease_dict={"treatment1": 0, "treatment2": 0, "treatment3": 0}, delta_uplift_increase_dict={ "treatment1": 0.02, "treatment2": 0.05, "treatment3": 0.1, }, delta_uplift_decrease_dict={ "treatment1": 0.0, "treatment2": 0.0, "treatment3": 0.0, }, n_uplift_increase_mix_informative_dict={ "treatment1": 1, "treatment2": 1, "treatment3": 1, }, n_uplift_decrease_mix_informative_dict={ "treatment1": 0, "treatment2": 0, "treatment3": 0, }, positive_class_proportion=0.5, random_seed=20190101, ): """Generate a synthetic dataset for classification uplift modeling problem. Parameters ---------- n_samples : int, optional (default=1000) The number of samples to be generated for each treatment group. treatment_name: list, optional (default = ['control','treatment1','treatment2','treatment3']) The list of treatment names. y_name: string, optional (default = 'conversion') The name of the outcome variable to be used as a column in the output dataframe. n_classification_features: int, optional (default = 10) Total number of features for base classification n_classification_informative: int, optional (default = 5) Total number of informative features for base classification n_classification_redundant: int, optional (default = 0) Total number of redundant features for base classification n_classification_repeated: int, optional (default = 0) Total number of repeated features for base classification n_uplift_increase_dict: dictionary, optional (default: {'treatment1': 2, 'treatment2': 2, 'treatment3': 2}) Number of features for generating positive treatment effects for corresponding treatment group. Dictionary of {treatment_key: number_of_features_for_increase_uplift}. n_uplift_decrease_dict: dictionary, optional (default: {'treatment1': 0, 'treatment2': 0, 'treatment3': 0}) Number of features for generating negative treatment effects for corresponding treatment group. Dictionary of {treatment_key: number_of_features_for_increase_uplift}. delta_uplift_increase_dict: dictionary, optional (default: {'treatment1': .02, 'treatment2': .05, 'treatment3': .1}) Positive treatment effect created by the positive uplift features on the base classification label. Dictionary of {treatment_key: increase_delta}. delta_uplift_decrease_dict: dictionary, optional (default: {'treatment1': 0., 'treatment2': 0., 'treatment3': 0.}) Negative treatment effect created by the negative uplift features on the base classification label. Dictionary of {treatment_key: increase_delta}. n_uplift_increase_mix_informative_dict: dictionary, optional Number of positive mix features for each treatment. The positive mix feature is defined as a linear combination of a randomly selected informative classification feature and a randomly selected positive uplift feature. The linear combination is made by two coefficients sampled from a uniform distribution between -1 and 1. default: {'treatment1': 1, 'treatment2': 1, 'treatment3': 1} n_uplift_decrease_mix_informative_dict: dictionary, optional Number of negative mix features for each treatment. The negative mix feature is defined as a linear combination of a randomly selected informative classification feature and a randomly selected negative uplift feature. The linear combination is made by two coefficients sampled from a uniform distribution between -1 and 1. default: {'treatment1': 0, 'treatment2': 0, 'treatment3': 0} positive_class_proportion: float, optional (default = 0.5) The proportion of positive label (1) in the control group. random_seed : int, optional (default = 20190101) The random seed to be used in the data generation process. Returns ------- df_res : DataFrame A data frame containing the treatment label, features, and outcome variable. x_name : list The list of feature names generated. Notes ----- The algorithm for generating the base classification dataset is adapted from the make_classification method in the sklearn package, that uses the algorithm in Guyon [1] designed to generate the "Madelon" dataset. References ---------- .. [1] I. Guyon, "Design of experiments for the NIPS 2003 variable selection benchmark", 2003. """ # set seed np.random.seed(seed=random_seed) # create data frame df_res = pd.DataFrame() # generate treatment key n_all = n_samples * len(treatment_name) treatment_list = [] for ti in treatment_name: treatment_list += [ti] * n_samples treatment_list = np.random.permutation(treatment_list) df_res["treatment_group_key"] = treatment_list # generate features and labels X1, Y1 = make_classification( n_samples=n_all, n_features=n_classification_features, n_informative=n_classification_informative, n_redundant=n_classification_redundant, n_repeated=n_classification_repeated, n_clusters_per_class=1, weights=[1 - positive_class_proportion, positive_class_proportion], ) x_name = [] x_informative_name = [] for xi in range(n_classification_informative): x_name_i = "x" + str(len(x_name) + 1) + "_informative" x_name.append(x_name_i) x_informative_name.append(x_name_i) df_res[x_name_i] = X1[:, xi] for xi in range(n_classification_redundant): x_name_i = "x" + str(len(x_name) + 1) + "_redundant" x_name.append(x_name_i) df_res[x_name_i] = X1[:, n_classification_informative + xi] for xi in range(n_classification_repeated): x_name_i = "x" + str(len(x_name) + 1) + "_repeated" x_name.append(x_name_i) df_res[x_name_i] = X1[ :, n_classification_informative + n_classification_redundant + xi ] for xi in range( n_classification_features - n_classification_informative - n_classification_redundant - n_classification_repeated ): x_name_i = "x" + str(len(x_name) + 1) + "_irrelevant" x_name.append(x_name_i) df_res[x_name_i] = np.random.normal(0, 1, n_all) # default treatment effects Y = Y1.copy() Y_increase = np.zeros_like(Y1) Y_decrease = np.zeros_like(Y1) # generate uplift (positive) for treatment_key_i in treatment_name: treatment_index = df_res.index[ df_res["treatment_group_key"] == treatment_key_i ].tolist() if ( treatment_key_i in n_uplift_increase_dict and n_uplift_increase_dict[treatment_key_i] > 0 ): x_uplift_increase_name = [] adjust_class_proportion = (delta_uplift_increase_dict[treatment_key_i]) / ( 1 - positive_class_proportion ) X_increase, Y_increase = make_classification( n_samples=n_all, n_features=n_uplift_increase_dict[treatment_key_i], n_informative=n_uplift_increase_dict[treatment_key_i], n_redundant=0, n_clusters_per_class=1, weights=[1 - adjust_class_proportion, adjust_class_proportion], ) for xi in range(n_uplift_increase_dict[treatment_key_i]): x_name_i = "x" + str(len(x_name) + 1) + "_uplift_increase" x_name.append(x_name_i) x_uplift_increase_name.append(x_name_i) df_res[x_name_i] = X_increase[:, xi] Y[treatment_index] = Y[treatment_index] + Y_increase[treatment_index] if n_uplift_increase_mix_informative_dict[treatment_key_i] > 0: for xi in range( n_uplift_increase_mix_informative_dict[treatment_key_i] ): x_name_i = "x" + str(len(x_name) + 1) + "_increase_mix" x_name.append(x_name_i) df_res[x_name_i] = ( np.random.uniform(-1, 1) * df_res[np.random.choice(x_informative_name)] + np.random.uniform(-1, 1) * df_res[np.random.choice(x_uplift_increase_name)] ) # generate uplift (negative) for treatment_key_i in treatment_name: treatment_index = df_res.index[ df_res["treatment_group_key"] == treatment_key_i ].tolist() if ( treatment_key_i in n_uplift_decrease_dict and n_uplift_decrease_dict[treatment_key_i] > 0 ): x_uplift_decrease_name = [] adjust_class_proportion = (delta_uplift_decrease_dict[treatment_key_i]) / ( 1 - positive_class_proportion ) X_decrease, Y_decrease = make_classification( n_samples=n_all, n_features=n_uplift_decrease_dict[treatment_key_i], n_informative=n_uplift_decrease_dict[treatment_key_i], n_redundant=0, n_clusters_per_class=1, weights=[1 - adjust_class_proportion, adjust_class_proportion], ) for xi in range(n_uplift_decrease_dict[treatment_key_i]): x_name_i = "x" + str(len(x_name) + 1) + "_uplift_decrease" x_name.append(x_name_i) x_uplift_decrease_name.append(x_name_i) df_res[x_name_i] = X_decrease[:, xi] Y[treatment_index] = Y[treatment_index] - Y_decrease[treatment_index] if n_uplift_decrease_mix_informative_dict[treatment_key_i] > 0: for xi in range( n_uplift_decrease_mix_informative_dict[treatment_key_i] ): x_name_i = "x" + str(len(x_name) + 1) + "_decrease_mix" x_name.append(x_name_i) df_res[x_name_i] = ( np.random.uniform(-1, 1) * df_res[np.random.choice(x_informative_name)] + np.random.uniform(-1, 1) * df_res[np.random.choice(x_uplift_decrease_name)] ) # truncate Y Y = np.clip(Y, 0, 1) df_res[y_name] = Y df_res["treatment_effect"] = Y - Y1 return df_res, x_name