Source code for causalml.inference.tree.causal.causalforest

from typing import Union

import numpy as np
import forestci as fci
from joblib import Parallel, delayed
from warnings import catch_warnings, simplefilter, warn

from sklearn.exceptions import DataConversionWarning
from sklearn.utils.validation import check_random_state, _check_sample_weight
from sklearn.utils.multiclass import type_of_target
from sklearn import __version__ as sklearn_version
from sklearn.ensemble._forest import DOUBLE, DTYPE, MAX_INT
from sklearn.ensemble._forest import ForestRegressor
from sklearn.ensemble._forest import compute_sample_weight, issparse
from sklearn.ensemble._forest import _generate_sample_indices, _get_n_samples_bootstrap

from .causaltree import CausalTreeRegressor

try:
    from packaging.version import parse as Version
except ModuleNotFoundError:
    from distutils.version import LooseVersion as Version

if Version(sklearn_version) >= Version("1.1.0"):
    _joblib_parallel_args = dict(prefer="threads")
else:
    from sklearn.utils.fixes import _joblib_parallel_args

    _joblib_parallel_args = _joblib_parallel_args(prefer="threads")


def _parallel_build_trees(
    tree,
    forest,
    X,
    y,
    sample_weight,
    tree_idx,
    n_trees,
    verbose=0,
    class_weight=None,
    n_samples_bootstrap=None,
):
    """
    Private function used to fit a single tree in parallel."""
    if verbose > 1:
        print("building tree %d of %d" % (tree_idx + 1, n_trees))

    if forest.bootstrap:
        n_samples = X.shape[0]
        indices = _generate_sample_indices(
            tree.random_state, n_samples, n_samples_bootstrap
        )
        X, y = X[indices].copy(), y[indices].copy()
        curr_sample_weight = sample_weight[indices].copy()

        if class_weight == "subsample":
            with catch_warnings():
                simplefilter("ignore", DeprecationWarning)
                curr_sample_weight *= compute_sample_weight("auto", y, indices=indices)
        elif class_weight == "balanced_subsample":
            curr_sample_weight *= compute_sample_weight("balanced", y, indices=indices)

        tree.fit(X, y, sample_weight=curr_sample_weight, check_input=False)
    else:
        tree.fit(X, y, sample_weight=sample_weight, check_input=False)

    return tree


[docs]class CausalRandomForestRegressor(ForestRegressor): def __init__( self, n_estimators: int = 100, *, control_name: Union[int, str] = 0, criterion: str = "causal_mse", alpha: float = 0.05, max_depth: int = None, min_samples_split: int = 60, min_samples_leaf: int = 100, min_weight_fraction_leaf: float = 0.0, max_features: Union[int, float, str] = 1.0, max_leaf_nodes: int = None, min_impurity_decrease: float = float("-inf"), bootstrap: bool = True, oob_score: bool = False, n_jobs: int = None, random_state: int = None, verbose: int = 0, warm_start: bool = False, ccp_alpha: float = 0.0, groups_penalty: float = 0.5, max_samples: int = None, groups_cnt: bool = True, ): """ Initialize Random Forest of CausalTreeRegressors Args: n_estimators: (int, default=100) Number of trees in the forest control_name: (str or int) Name of control group criterion: ({"causal_mse", "standard_mse"}, default="causal_mse"): Function to measure the quality of a split. alpha: (float) The confidence level alpha of the ATE estimate and ITE bootstrap estimates max_depth: (int, default=None) The maximum depth of the tree. min_samples_split: (int or float, default=2) The minimum number of samples required to split an internal node: min_samples_leaf: (int or float), default=100 The minimum number of samples required to be at a leaf node. min_weight_fraction_leaf: (float, default=0.0) The minimum weighted fraction of the sum total of weights (of all the input samples) required to be at a leaf node. max_features: (int, float or {"auto", "sqrt", "log2"}, default=None) The number of features to consider when looking for the best split max_leaf_nodes: (int, default=None) Grow a tree with ``max_leaf_nodes`` in best-first fashion. min_impurity_decrease: (float, default=float("-inf"))) A node will be split if this split induces a decrease of the impurity greater than or equal to this value. bootstrap : (bool, default=True) Whether bootstrap samples are used when building trees. oob_score : bool, default=False Whether to use out-of-bag samples to estimate the generalization score. n_jobs : int, default=None The number of jobs to run in parallel. random_state : (int, RandomState instance or None, default=None) Controls both the randomness of the bootstrapping of the samples used when building trees (if ``bootstrap=True``) and the sampling of the features to consider when looking for the best split at each node (if ``max_features < n_features``). verbose : (int, default=0) Controls the verbosity when fitting and predicting. warm_start : (bool, default=False) When set to ``True``, reuse the solution of the previous call to fit and add more estimators to the ensemble, otherwise, just fit a whole new forest. ccp_alpha : (non-negative float, default=0.0) Complexity parameter used for Minimal Cost-Complexity Pruning. groups_penalty: (float, default=0.5) This penalty coefficient manages the node impurity increase in case of the difference between treatment and control samples sizes. max_samples : (int or float, default=None) If bootstrap is True, the number of samples to draw from X to train each base estimator. groups_cnt: (bool), count treatment and control groups for each node/leaf """ self._estimator = CausalTreeRegressor( control_name=control_name, criterion=criterion, groups_cnt=groups_cnt ) _estimator_key = ( "estimator" if Version(sklearn_version) >= Version("1.2.0") else "base_estimator" ) _parent_args = { _estimator_key: self._estimator, "n_estimators": n_estimators, "estimator_params": ( "criterion", "control_name", "max_depth", "min_samples_split", "min_weight_fraction_leaf", "max_features", "max_leaf_nodes", "min_impurity_decrease", "ccp_alpha", "groups_penalty", "min_samples_leaf", "random_state", ), "bootstrap": bootstrap, "oob_score": oob_score, "n_jobs": n_jobs, "random_state": random_state, "verbose": verbose, "warm_start": warm_start, "max_samples": max_samples, } super().__init__(**_parent_args) self.criterion = criterion self.control_name = control_name self.max_depth = max_depth self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_weight_fraction_leaf = min_weight_fraction_leaf self.max_features = max_features self.max_leaf_nodes = max_leaf_nodes self.min_impurity_decrease = min_impurity_decrease self.ccp_alpha = ccp_alpha self.groups_penalty = groups_penalty self.alpha = alpha self.groups_cnt = groups_cnt def _fit(self, X: np.ndarray, y: np.ndarray, sample_weight: np.ndarray = None): """ Build a forest of trees from the training set (X, y). With modified _parallel_build_trees for Causal Trees used in BaseForest.fit() Source: https://github.com/scikit-learn/scikit-learn/blob/main/sklearn/ensemble/_forest.py Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The training input samples. Internally, its dtype will be converted to ``dtype=np.float32``. If a sparse matrix is provided, it will be converted into a sparse ``csc_matrix``. y : array-like of shape (n_samples,) or (n_samples, n_outputs) The target values (class labels in classification, real numbers in regression). sample_weight : array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. In the case of classification, splits are also ignored if they would result in any single class carrying a negative weight in either child node. Returns ------- self : object Fitted estimator. """ # Validate or convert input data if issparse(y): raise ValueError("sparse multilabel-indicator for y is not supported.") X, y = self._validate_data( X, y, multi_output=True, accept_sparse="csc", dtype=DTYPE ) if sample_weight is not None: sample_weight = _check_sample_weight(sample_weight, X) if issparse(X): # Pre-sort indices to avoid that each individual tree of the # ensemble sorts the indices. X.sort_indices() y = np.atleast_1d(y) if y.ndim == 2 and y.shape[1] == 1: warn( "A column-vector y was passed when a 1d array was" " expected. Please change the shape of y to " "(n_samples,), for example using ravel().", DataConversionWarning, stacklevel=2, ) if y.ndim == 1: y = np.reshape(y, (-1, 1)) if self.criterion == "poisson": if np.any(y < 0): raise ValueError( "Some value(s) of y are negative which is " "not allowed for Poisson regression." ) if np.sum(y) <= 0: raise ValueError( "Sum of y is not strictly positive which " "is necessary for Poisson regression." ) self.n_outputs_ = np.unique(sample_weight).astype(int).size + 1 self.max_outputs_ = self.n_outputs_ y, expanded_class_weight = self._validate_y_class_weight(y) if getattr(y, "dtype", None) != DOUBLE or not y.flags.contiguous: y = np.ascontiguousarray(y, dtype=DOUBLE) if expanded_class_weight is not None: if sample_weight is not None: sample_weight = sample_weight * expanded_class_weight else: sample_weight = expanded_class_weight if not self.bootstrap and self.max_samples is not None: raise ValueError( "`max_sample` cannot be set if `bootstrap=False`. " "Either switch to `bootstrap=True` or set " "`max_sample=None`." ) elif self.bootstrap: n_samples_bootstrap = _get_n_samples_bootstrap( n_samples=X.shape[0], max_samples=self.max_samples ) else: n_samples_bootstrap = None # Check parameters self._validate_estimator() if not self.bootstrap and self.oob_score: raise ValueError("Out of bag estimation only available if bootstrap=True") random_state = check_random_state(self.random_state) if not self.warm_start or not hasattr(self, "estimators_"): # Free allocated memory, if any self.estimators_ = [] n_more_estimators = self.n_estimators - len(self.estimators_) if n_more_estimators < 0: raise ValueError( "n_estimators=%d must be larger or equal to " "len(estimators_)=%d when warm_start==True" % (self.n_estimators, len(self.estimators_)) ) elif n_more_estimators == 0: warn( "Warm-start fitting without increasing n_estimators does not " "fit new trees." ) else: if self.warm_start and len(self.estimators_) > 0: # We draw from the random state to get the random state we # would have got if we hadn't used a warm_start. random_state.randint(MAX_INT, size=len(self.estimators_)) trees = [ self._make_estimator(append=False, random_state=random_state) for i in range(n_more_estimators) ] trees = Parallel( n_jobs=self.n_jobs, verbose=self.verbose, **_joblib_parallel_args, )( delayed(_parallel_build_trees)( t, self, X, y, sample_weight, i, len(trees), verbose=self.verbose, class_weight=self.class_weight, n_samples_bootstrap=n_samples_bootstrap, ) for i, t in enumerate(trees) ) self.estimators_.extend(trees) if self.oob_score: y_type = type_of_target(y) if y_type in ("multiclass-multioutput", "unknown"): raise ValueError( "The type of target cannot be used to compute OOB " f"estimates. Got {y_type} while only the following are " "supported: continuous, continuous-multioutput, binary, " "multiclass, multilabel-indicator." ) self._set_oob_score_and_attributes(X, y) if hasattr(self, "classes_") and self.n_outputs_ == 1: self.n_classes_ = self.n_classes_[0] self.classes_ = self.classes_[0] return self
[docs] def fit(self, X: np.ndarray, treatment: np.ndarray, y: np.ndarray): """ Fit Causal RandomForest Args: X: (np.ndarray), feature matrix treatment: (np.ndarray), treatment vector y: (np.ndarray), outcome vector Returns: self """ X, y, w = self._estimator._prepare_data(X=X, y=y, treatment=treatment) return self._fit(X=X, y=y, sample_weight=w)
[docs] def predict(self, X: np.ndarray, with_outcomes: bool = False) -> np.ndarray: """Predict individual treatment effects Args: X (np.matrix): a feature matrix with_outcomes (bool), default=False, include outcomes Y_hat(X|T=0), Y_hat(X|T=1) along with individual treatment effect Returns: (np.matrix): individual treatment effect (ITE), dim=nx1 or ITE with outcomes [Y_hat(X|T=0), Y_hat(X|T=1), ITE], dim=nx3 """ if with_outcomes: self.n_outputs_ = self.max_outputs_ for estimator in self.estimators_: estimator._with_outcomes = True else: self.n_outputs_ = 1 y_pred = super().predict(X) return y_pred
[docs] def calculate_error( self, X_train: np.ndarray, X_test: np.ndarray, inbag: np.ndarray = None, calibrate: bool = True, memory_constrained: bool = False, memory_limit: int = None, ) -> np.ndarray: """ Calculate error bars from scikit-learn RandomForest estimators Source: https://github.com/scikit-learn-contrib/forest-confidence-interval Args: X_train: (np.ndarray), training subsample of feature matrix, (n_train_sample, n_features) X_test: (np.ndarray), test subsample of feature matrix, (n_train_sample, n_features) inbag: (ndarray, optional), The inbag matrix that fit the data. If set to `None` (default) it will be inferred from the forest. However, this only works for trees for which bootstrapping was set to `True`. That is, if sampling was done with replacement. Otherwise, users need to provide their own inbag matrix. calibrate: (boolean, optional) Whether to apply calibration to mitigate Monte Carlo noise. Some variance estimates may be negative due to Monte Carlo effects if the number of trees in the forest is too small. To use calibration, Default: True memory_constrained: (boolean, optional) Whether or not there is a restriction on memory. If False, it is assumed that a ndarray of shape (n_train_sample,n_test_sample) fits in main memory. Setting to True can actually provide a speedup if memory_limit is tuned to the optimal range. memory_limit: (int, optional) An upper bound for how much memory the intermediate matrices will take up in Megabytes. This must be provided if memory_constrained=True. Returns: (np.ndarray), An array with the unbiased sampling variance for a RandomForest object. """ var = fci.random_forest_error( self, X_train, X_test, inbag=inbag, calibrate=calibrate, memory_constrained=memory_constrained, memory_limit=memory_limit, ) return var