from __future__ import absolute_import, division, unicode_literals
from builtins import range
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score, average_precision_score, cohen_kappa_score, f1_score, matthews_corrcoef,
precision_recall_curve, roc_auc_score, roc_curve)
from sklearn.model_selection import StratifiedKFold
from atm.constants import METRICS_BINARY, METRICS_MULTICLASS, N_FOLDS_DEFAULT, Metrics
[docs]def rank_n_accuracy(y_true, y_prob_mat, n=0.33):
"""
Compute how often the true label is one of the top n predicted classes
for each training example.
If n is an integer, consider the top n predictions for each example.
If n is a float, it represents a proportion of the top predictions.
This metric is only really useful when the total number of classes is large.
"""
n_classes = y_prob_mat.shape[1]
if n < 1:
# round to nearest int before casting
n = int(round(n_classes * n))
# sort the rankings in descending order, then take the top n
rankings = np.argsort(-y_prob_mat)
rankings = rankings[:, :n]
num_samples = len(y_true)
correct_sample_count = 0.0 # force floating point math
for i in range(num_samples):
if y_true[i] in rankings[i, :]:
correct_sample_count += 1
return int(correct_sample_count / num_samples)
[docs]def get_per_class_matrix(y, classes=None):
"""
Create a (num_classes x num_examples) binary matrix representation of the
true and predicted y values.
If classes is None, class values will be extracted from y. Values that are
not present at all will not receive a column -- this is to allow computation
of per-class roc_auc scores without error.
"""
classes = classes or np.unique(y)
y_bin = np.zeros((len(y), len(classes)))
for i, cls in enumerate(classes):
y_bin[:, i] = (y == cls).astype(int)
return y_bin
[docs]def get_pr_roc_curves(y_true, y_pred_probs):
"""
Compute precision/recall and receiver operating characteristic metrics for a
binary class label.
y_true: series of true class labels (only 1 or 0)
y_pred_probs: series of probabilities generated by the model for the label class 1
"""
results = {}
roc = roc_curve(y_true, y_pred_probs, pos_label=1)
results[Metrics.ROC_CURVE] = {
'fprs': list(roc[0]),
'tprs': list(roc[1]),
'thresholds': list(roc[2]),
}
pr = precision_recall_curve(y_true, y_pred_probs, pos_label=1)
results[Metrics.PR_CURVE] = {
'precisions': list(pr[0]),
'recalls': list(pr[1]),
'thresholds': list(pr[2]),
}
return results
[docs]def get_metrics_binary(y_true, y_pred, y_pred_probs, include_curves=False):
results = {
Metrics.ACCURACY: accuracy_score(y_true, y_pred),
Metrics.COHEN_KAPPA: cohen_kappa_score(y_true, y_pred),
Metrics.F1: f1_score(y_true, y_pred),
Metrics.MCC: matthews_corrcoef(y_true, y_pred),
Metrics.ROC_AUC: np.nan,
Metrics.AP: np.nan,
}
# if possible, compute PR and ROC curve metrics
all_labels_same = len(np.unique(y_true)) == 1
any_probs_nan = np.any(np.isnan(y_pred_probs))
if not any_probs_nan:
# AP can be computed even if all labels are the same
y_true_bin = get_per_class_matrix(y_true, list(range(2)))
results[Metrics.AP] = average_precision_score(y_true_bin, y_pred_probs)
if not all_labels_same:
results[Metrics.ROC_AUC] = roc_auc_score(y_true_bin, y_pred_probs)
# if necessary, compute point-by-point precision/recall and ROC curve data
if include_curves:
results.update(get_pr_roc_curves(y_true, y_pred_probs[:, 1]))
return results
[docs]def get_metrics_multiclass(y_true, y_pred, y_pred_probs,
include_per_class=False, include_curves=False):
results = {
Metrics.ACCURACY: accuracy_score(y_true, y_pred),
Metrics.COHEN_KAPPA: cohen_kappa_score(y_true, y_pred),
Metrics.F1_MICRO: f1_score(y_true, y_pred, average='micro'),
Metrics.F1_MACRO: f1_score(y_true, y_pred, average='macro'),
Metrics.ROC_AUC_MICRO: np.nan,
Metrics.ROC_AUC_MACRO: np.nan,
Metrics.RANK_ACCURACY: np.nan,
}
# this parameter is most relevant for datasets with high-cardinality
# labels (lots of poosible values)
# TODO: make the rank parameter configurable
results[Metrics.RANK_ACCURACY] = rank_n_accuracy(y_true=y_true,
y_prob_mat=y_pred_probs)
# if possible, compute multi-label AUC metrics
present_classes = np.unique(y_true)
all_labels_same = len(present_classes) == 1
any_probs_nan = np.any(np.isnan(y_pred_probs))
if not (all_labels_same or any_probs_nan):
# get binary label matrix, ignoring classes that aren't present
y_true_bin = get_per_class_matrix(y_true)
# filter out probabilities for classes that aren't in this sample
filtered_probs = y_pred_probs[:, present_classes]
# actually compute roc_auc score
results[Metrics.ROC_AUC_MICRO] = roc_auc_score(y_true_bin,
filtered_probs,
average='micro')
results[Metrics.ROC_AUC_MACRO] = roc_auc_score(y_true_bin,
filtered_probs,
average='macro')
# TODO: multi-label AP metrics?
# labelwise controls whether to compute separate metrics for each posisble label
if include_per_class or include_curves:
results['class_wise'] = {}
# create binary matrices, including classes that aren't actually present
all_classes = list(range(y_pred_probs.shape[1]))
y_true_bin = get_per_class_matrix(y_true, classes=all_classes)
y_pred_bin = get_per_class_matrix(y_pred, classes=all_classes)
# for each possible class, generate F1, precision-recall, and ROC scores
# using the binary metrics function.
for cls in all_classes:
class_pred_probs = np.column_stack((1 - y_pred_probs[:, cls],
y_pred_probs[:, cls]))
class_res = get_metrics_binary(y_true=y_true_bin[:, cls],
y_pred=y_pred_bin[:, cls],
y_pred_probs=class_pred_probs,
include_curves=include_curves)
results['class_wise'][cls] = class_res
return results
[docs]def test_pipeline(pipeline, X, y, binary, **kwargs):
if binary:
get_metrics = get_metrics_binary
else:
get_metrics = get_metrics_multiclass
# run the test data through the trained pipeline
y_pred = pipeline.predict(X)
# if necessary (i.e. if a pipeline does not produce probability scores by
# default), use class distance scores in lieu of probability scores
method = pipeline.steps[-1][0]
if method in ['sgd', 'pa']:
if binary:
class_1_distance = pipeline.decision_function(X)
class_0_distance = -class_1_distance
y_pred_probs = np.column_stack((class_0_distance, class_1_distance))
else:
y_pred_probs = pipeline.decision_function(X)
else:
y_pred_probs = pipeline.predict_proba(X)
return get_metrics(y, y_pred, y_pred_probs, **kwargs)
[docs]def cross_validate_pipeline(pipeline, X, y, binary=True,
n_folds=N_FOLDS_DEFAULT, **kwargs):
"""
Compute metrics for each of `n_folds` folds of the training data in (X, y).
pipeline: the sklearn Pipeline to train and test.
X: feature matrix.
y: series of labels corresponding to rows in X.
binary: whether the label is binary or multi-ary.
n_folds: number of non-overlapping "folds" of the data to make for cross-validation.
"""
if binary:
metrics = METRICS_BINARY
else:
metrics = METRICS_MULTICLASS
df = pd.DataFrame(columns=metrics)
results = []
# TODO: how to handle classes that are so uncommon that stratified sampling
# doesn't work? i.e. len([c for c in y if c == some_class]) < n_folds
skf = StratifiedKFold(n_splits=n_folds)
skf.get_n_splits(X, y)
for train_index, test_index in skf.split(X, y):
pipeline.fit(X[train_index], y[train_index])
split_results = test_pipeline(pipeline=pipeline,
X=X[test_index],
y=y[test_index],
binary=binary, **kwargs)
df = df.append([{m: split_results.get(m) for m in metrics}])
results.append(split_results)
return df, results