Source code for featurehub.modeling.model

from collections import defaultdict
import os
import traceback
import sys
import sklearn.metrics

import numpy as np
from sklearn.externals import joblib
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.preprocessing import label_binarize, LabelEncoder
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor

from featurehub.modeling.metrics import Metric, MetricList
from featurehub.util import RANDOM_STATE

[docs]class Model(object): """Versatile modeling object. Handles classification and regression problems and computes variety of performance metrics. Parameters ---------- problem_type : str One of "classification" or "regression" """ CLASSIFICATION = "classification" REGRESSION = "regression" CLASSIFICATION_SCORING = [ { "name" : "Accuracy" , "scoring" : "accuracy" }, { "name" : "Precision" , "scoring" : "precision" }, { "name" : "Recall" , "scoring" : "recall" }, { "name" : "ROC AUC" , "scoring" : "roc_auc" }, ] REGRESSION_SCORING = [ { "name" : "Root Mean Squared Error" , "scoring" : "root_mean_squared_error" }, { "name" : "R-squared" , "scoring" : "r2" }, ] BINARY_METRIC_AGGREGATION = "micro" MULTICLASS_METRIC_AGGREGATION = "micro" def __init__(self, problem_type): self.problem_type = problem_type if self._is_classification(): self.model = Model._get_default_classifier() elif self._is_regression(): self.model = Model._get_default_regressor() else: raise NotImplementedError
[docs] def compute_metrics(self, X, Y, kind="cv", **kwargs): if kind=="cv": return self.compute_metrics_cv(X, Y, **kwargs) elif kind=="train_test": return self.compute_metrics_train_test(X, Y, **kwargs) else: raise ValueError("Bad kind: {}".format(kind))
[docs] def compute_metrics_cv(self, X, Y): """Compute cross-validated metrics. Trains this model on data X with labels Y. Returns a MetricList with the name, scoring type, and value for each Metric. Note that these values may be numpy floating points, and should be converted prior to insertion in a database. Parameters ---------- X : numpy array-like or pd.DataFrame data Y : numpy array-like or pd.DataFrame or pd.DataSeries labels """ scorings, scorings_ = self._get_scorings() # compute scores scores = self.cv_score_mean(X, Y, scorings_) # unpack into MetricList metric_list = self.scores_to_metriclist(scorings, scores) return metric_list
[docs] def compute_metrics_train_test(self, X, Y, n): """Compute metrics on test set. """ X, Y = Model._format_matrices(X, Y) X_train, Y_train = X[:n], Y[:n] X_test, Y_test = X[n:], Y[n:] scorings, scorings_ = self._get_scorings() # Determine binary/multiclass classification classes = np.unique(Y) params = self._get_params(classes) # fit model on entire training set self.model.fit(X_train, Y_train) scores = {} for scoring in scorings_: scores[scoring] = self._do_scoring(scoring, params, self.model, X_test, Y_test) metric_list = self.scores_to_metriclist(scorings, scores) return metric_list
def _do_scoring(self, scoring, params, model, X_test, Y_test, failure_value=None): # Make and evaluate predictions. Note that ROC AUC may raise # exception if somehow we only have examples from one class in # a given fold. Y_test_transformed = params[scoring]["pred_transformer"](Y_test) Y_test_pred = params[scoring]["predictor"](model, X_test) try: score = params[scoring]["scorer"](Y_test_transformed, Y_test_pred) except ValueError as e: score = failure_value print(traceback.format_exc(), file=sys.stderr) raise RuntimeError return score
[docs] def cv_score_mean(self, X, Y, scorings): """Compute mean score across cross validation folds. Split data and labels into cross validation folds and fit the model for each fold. Then, for each scoring type in scorings, compute the score. Finally, average the scores across folds. Returns a dictionary mapping scoring to score. Parameters ---------- X : numpy array-like data Y : numpy array-like labels scorings : list of str scoring types """ X, Y = Model._format_matrices(X, Y) scorings = list(scorings) # Determine binary/multiclass classification classes = np.unique(Y) params = self._get_params(classes) if self._is_classification(): kf = StratifiedKFold(shuffle=True, random_state=RANDOM_STATE+3) else: kf = KFold(shuffle=True, random_state=RANDOM_STATE+4) # Split data, train model, and evaluate metric. We fit the model just # once per fold. scoring_outputs = defaultdict(lambda : []) for train_inds, test_inds in kf.split(X, Y): X_train, X_test = X[train_inds], X[test_inds] Y_train, Y_test = Y[train_inds], Y[test_inds] self.model.fit(X_train, Y_train) for scoring in scorings: score = self._do_scoring(scoring, params, self.model, X_test, Y_test, failure_value=np.nan) scoring_outputs[scoring].append(score) for scoring in scoring_outputs: score_mean = np.nanmean(scoring_outputs[scoring]) if np.isnan(score_mean): score_mean = None scoring_outputs[scoring] = score_mean return scoring_outputs
[docs] def scores_to_metriclist(self, scorings, scores): metric_list = MetricList() for v in scorings: name = v["name"] scoring = v["scoring"] if scoring in scores: value = scores[scoring] else: value = None metric_list.append(Metric(name, scoring, value)) return metric_list
def _is_classification(self): return self.problem_type == "classification" def _is_regression(self): return self.problem_type == "regression" def _get_params(self, classes): n_classes = len(classes) is_binary = n_classes == 2 if is_binary: metric_aggregation = Model.BINARY_METRIC_AGGREGATION else: metric_aggregation = Model.MULTICLASS_METRIC_AGGREGATION # Determine predictor (labels, label probabilities, or values) and # scoring function. # predictors def predict(model, X_test): return model.predict(X_test) def predict_prob(model, X_test): return model.predict_proba(X_test) # transformers def noop(y_true): return y_true def transformer_binarize(y_true): return label_binarize(y_true, classes=classes) # scorers # nothing here params = { "accuracy" : { "predictor" : predict, "pred_transformer" : noop, "scorer" : sklearn.metrics.accuracy_score, }, "precision" : { "predictor" : predict, "pred_transformer" : noop, "scorer" : lambda y_true, y_pred: sklearn.metrics.precision_score( y_true, y_pred, average=metric_aggregation), }, "recall" : { "predictor" : predict, "pred_transformer" : noop, "scorer" : lambda y_true, y_pred: sklearn.metrics.recall_score( y_true, y_pred, average=metric_aggregation), }, "roc_auc" : { "predictor" : predict if is_binary else predict_prob, "pred_transformer" : noop if is_binary else transformer_binarize, "scorer" : lambda y_true, y_pred: sklearn.metrics.roc_auc_score( y_true, y_pred, average=metric_aggregation), }, "root_mean_squared_error" : { "predictor" : predict, "pred_transformer" : noop, "scorer" : lambda y_true, y_pred: np.sqrt(sklearn.metrics.mean_squared_error(y_true, y_pred)), }, "r2" : { "predictor" : predict, "pred_transformer" : noop, "scorer" : sklearn.metrics.r2_score }, } return params def _get_scorings(self): """Get scorings for this problem type. Returns ------- scorings : list of dict Information on metric name and associated "scoring" as defined in sklearn.metrics scorings_ : list List of "scoring" as defined in sklearn.metrics. This is a "utility variable" that can be used where we just need the names of the scoring functions and not the more complete information. """ # scoring_types maps user-readable name to `scoring`, as argument to # cross_val_score # See also http://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter if self._is_classification(): scorings = Model.CLASSIFICATION_SCORING scorings_= [s["scoring"] for s in scorings] elif self._is_regression(): scorings = Model.REGRESSION_SCORING scorings_= [s["scoring"] for s in scorings] else: raise NotImplementedError return scorings, scorings_ @staticmethod def _format_matrices(X, Y): X = Model._formatX(X) Y = Model._formatY(Y) return X, Y @staticmethod def _formatX(X): # ensure that we use np for everything # use np.float64 for all elements # *don't* use 1d array for X X = np.asfarray(X) if X.ndim == 1: X = X.reshape(-1,1) return X @staticmethod def _formatY(Y): # TODO: detect if we need to use a LabelEncoder for Y # ensure that we use np for everything # use np.float64 for all elements # *do* use 1d array for Y Y = np.asfarray(Y) if Y.ndim > 1 and Y.shape[1] > 1: raise ValueError("Target matrix has too many columns: {}" .format(Y.shape[1])) Y = Y.ravel() return Y @staticmethod def _get_default_classifier(): return DecisionTreeClassifier(random_state=RANDOM_STATE+1) @staticmethod def _get_default_regressor(): return DecisionTreeRegressor(random_state=RANDOM_STATE+2)