Source code for causalml.inference.tree.utils

"""
Utility functions for uplift trees.
"""

import time
from typing import Callable

import numpy as np
import pandas as pd


[docs]def cat_group(dfx, kpix, n_group=10): """ Category Reduction for Categorical Variables Args ---- dfx : dataframe The inputs data dataframe. kpix : string The column of the feature. n_group : int, optional (default = 10) The number of top category values to be remained, other category values will be put into "Other". Returns ------- The transformed categorical feature value list. """ if dfx[kpix].nunique() > n_group: # get the top categories top = dfx[kpix].isin(dfx[kpix].value_counts().index[:n_group]) dfx.loc[~top, kpix] = "Other" return dfx[kpix].values else: return dfx[kpix].values
[docs]def cat_transform(dfx, kpix, kpi1): """ Encoding string features. Args ---- dfx : dataframe The inputs data dataframe. kpix : string The column of the feature. kpi1 : list The list of feature names. Returns ------- dfx : DataFrame The updated dataframe containing the encoded data. kpi1 : list The updated feature names containing the new dummy feature names. """ df_dummy = pd.get_dummies(dfx[kpix].values) new_col_names = ["%s_%s" % (kpix, x) for x in df_dummy.columns] df_dummy.columns = new_col_names dfx = pd.concat([dfx, df_dummy], axis=1) for new_col in new_col_names: if new_col not in kpi1: kpi1.append(new_col) if kpix in kpi1: kpi1.remove(kpix) return dfx, kpi1
[docs]def cv_fold_index(n, i, k, random_seed=2018): """ Encoding string features. Args ---- dfx : dataframe The inputs data dataframe. kpix : string The column of the feature. kpi1 : list The list of feature names. Returns ------- dfx : DataFrame The updated dataframe containing the encoded data. kpi1 : list The updated feature names containing the new dummy feature names. """ np.random.seed(random_seed) rlist = np.random.choice(a=range(k), size=n, replace=True) fold_i_index = np.where(rlist == i)[0] return fold_i_index
# Categorize continuous variable
[docs]def cat_continuous(x, granularity="Medium"): """ Categorize (bin) continuous variable based on percentile. Args ---- x : list Feature values. granularity : string, optional, (default = 'Medium') Control the granularity of the bins, optional values are: 'High', 'Medium', 'Low'. Returns ------- res : list List of percentile bins for the feature value. """ if granularity == "High": lspercentile = [ np.percentile(x, 5), np.percentile(x, 10), np.percentile(x, 15), np.percentile(x, 20), np.percentile(x, 25), np.percentile(x, 30), np.percentile(x, 35), np.percentile(x, 40), np.percentile(x, 45), np.percentile(x, 50), np.percentile(x, 55), np.percentile(x, 60), np.percentile(x, 65), np.percentile(x, 70), np.percentile(x, 75), np.percentile(x, 80), np.percentile(x, 85), np.percentile(x, 90), np.percentile(x, 95), np.percentile(x, 99), ] res = [ ( "> p90 (%s)" % (lspercentile[8]) if z > lspercentile[8] else ( "<= p10 (%s)" % (lspercentile[0]) if z <= lspercentile[0] else ( "<= p20 (%s)" % (lspercentile[1]) if z <= lspercentile[1] else ( "<= p30 (%s)" % (lspercentile[2]) if z <= lspercentile[2] else ( "<= p40 (%s)" % (lspercentile[3]) if z <= lspercentile[3] else ( "<= p50 (%s)" % (lspercentile[4]) if z <= lspercentile[4] else ( "<= p60 (%s)" % (lspercentile[5]) if z <= lspercentile[5] else ( "<= p70 (%s)" % (lspercentile[6]) if z <= lspercentile[6] else ( "<= p80 (%s)" % (lspercentile[7]) if z <= lspercentile[7] else ( "<= p90 (%s)" % (lspercentile[8]) if z <= lspercentile[8] else "> p90 (%s)" % (lspercentile[8]) ) ) ) ) ) ) ) ) ) ) for z in x ] elif granularity == "Medium": lspercentile = [ np.percentile(x, 10), np.percentile(x, 20), np.percentile(x, 30), np.percentile(x, 40), np.percentile(x, 50), np.percentile(x, 60), np.percentile(x, 70), np.percentile(x, 80), np.percentile(x, 90), ] res = [ ( "<= p10 (%s)" % (lspercentile[0]) if z <= lspercentile[0] else ( "<= p20 (%s)" % (lspercentile[1]) if z <= lspercentile[1] else ( "<= p30 (%s)" % (lspercentile[2]) if z <= lspercentile[2] else ( "<= p40 (%s)" % (lspercentile[3]) if z <= lspercentile[3] else ( "<= p50 (%s)" % (lspercentile[4]) if z <= lspercentile[4] else ( "<= p60 (%s)" % (lspercentile[5]) if z <= lspercentile[5] else ( "<= p70 (%s)" % (lspercentile[6]) if z <= lspercentile[6] else ( "<= p80 (%s)" % (lspercentile[7]) if z <= lspercentile[7] else ( "<= p90 (%s)" % (lspercentile[8]) if z <= lspercentile[8] else "> p90 (%s)" % (lspercentile[8]) ) ) ) ) ) ) ) ) ) for z in x ] else: lspercentile = [ np.percentile(x, 15), np.percentile(x, 50), np.percentile(x, 85), ] res = [ ( "1-Very Low" if z < lspercentile[0] else ( "2-Low" if z < lspercentile[1] else "3-High" if z < lspercentile[2] else "4-Very High" ) ) for z in x ] return res
[docs]def kpi_transform(dfx, kpi_combo, kpi_combo_new): """ Feature transformation from continuous feature to binned features for a list of features Args ---- dfx : DataFrame DataFrame containing the features. kpi_combo : list of string List of feature names to be transformed kpi_combo_new : list of string List of new feature names to be assigned to the transformed features. Returns ------- dfx : DataFrame Updated DataFrame containing the new features. """ for j in range(len(kpi_combo)): if type(dfx[kpi_combo[j]].values[0]) is str: dfx[kpi_combo_new[j]] = dfx[kpi_combo[j]].values dfx[kpi_combo_new[j]] = cat_group(dfx=dfx, kpix=kpi_combo_new[j]) else: if len(kpi_combo) > 1: dfx[kpi_combo_new[j]] = cat_continuous( dfx[kpi_combo[j]].values, granularity="Low" ) else: dfx[kpi_combo_new[j]] = cat_continuous( dfx[kpi_combo[j]].values, granularity="High" ) return dfx
[docs]def get_tree_leaves_mask(tree) -> np.ndarray: """ Get mask array for tree leaves Args: tree: CausalTreeRegressor Tree object Returns: np.ndarray Mask array """ n_nodes = tree.tree_.node_count children_left = tree.tree_.children_left children_right = tree.tree_.children_right node_depth = np.zeros(shape=n_nodes, dtype=np.int64) is_leaves = np.zeros(shape=n_nodes, dtype=bool) stack = [(0, 0)] while len(stack) > 0: node_id, depth = stack.pop() node_depth[node_id] = depth is_split_node = children_left[node_id] != children_right[node_id] if is_split_node: stack.append((children_left[node_id], depth + 1)) stack.append((children_right[node_id], depth + 1)) else: is_leaves[node_id] = True return is_leaves
def timeit(exclude_kwargs: tuple = ()) -> Callable: """ timeit decorator Args: exclude_kwargs: (tuple), keyword arguments that should be excluded from display Returns: Callable """ def wrapper(f: Callable): def wrapped(*args, **kw): ts = time.time() result = f(*args, **kw) te = time.time() display_kw = {k: v for k, v in kw.items() if k not in exclude_kwargs} print( "Function: {} Kwargs: {} Elapsed time: {:2.4f}".format( f.__name__, display_kw, te - ts ) ) return result return wrapped return wrapper