Source code for atm.classifier

# -*- coding: utf-8 -*-

"""
.. module:: wrapper
   :synopsis: Model around classification method.

"""
from __future__ import absolute_import, division, unicode_literals

import logging
import os
import pickle
import re
import time
from builtins import object
from collections import defaultdict
from importlib import import_module

import numpy as np
import pandas as pd
from sklearn import decomposition
from sklearn.gaussian_process.kernels import (
    RBF, ConstantKernel, ExpSineSquared, Matern, RationalQuadratic)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, StandardScaler

from atm.constants import Metrics
from atm.encoder import DataEncoder
from atm.method import Method
from atm.metrics import cross_validate_pipeline, test_pipeline

# load the library-wide logger
logger = logging.getLogger('atm')


[docs]class Model(object): """ This class contains everything needed to run an end-to-end ATM classifier pipeline. It is initialized with a set of parameters and trained like a normal sklearn model. This class can be pickled and saved to disk, then unpickled outside of ATM and used to classify new datasets. """ # these are special keys that are used for general purpose # things like scaling, normalization, PCA, etc SCALE = "_scale" WHITEN = "_whiten" MINMAX = "_scale_minmax" PCA = "_pca" PCA_DIMS = "_pca_dimensions" # list of all such keys ATM_KEYS = [SCALE, WHITEN, MINMAX, PCA, PCA_DIMS] # number of folds for cross-validation (arbitrary, for speed) N_FOLDS = 5 def __init__(self, method, params, judgment_metric, class_column, testing_ratio=0.3, verbose_metrics=False): """ Parameters: method: the short method code (as defined in constants.py) or path to method json params: parameters passed to the sklearn classifier constructor judgment_metric: string that indicates which metric should be optimized for. class_column: sklearn classifier class """ # configuration & database self.method = method self.params = params self.judgment_metric = judgment_metric self.class_column = class_column self.testing_ratio = testing_ratio self.verbose_metrics = verbose_metrics # load the classifier method's class path = Method(method).class_path.split('.') mod_str, cls_str = '.'.join(path[:-1]), path[-1] mod = import_module(mod_str) self.class_ = getattr(mod, cls_str) # pipelining self.pipeline = None # persistent random state self.random_state = np.random.randint(1e7) def _make_pipeline(self): """ Makes the classifier as well as scaling or dimension reduction steps. """ # create a list of steps, starting with the data encoder steps = [] # create a classifier with specified parameters hyperparameters = {k: v for k, v in list(self.params.items()) if k not in Model.ATM_KEYS} atm_params = {k: v for k, v in list(self.params.items()) if k in Model.ATM_KEYS} # do special conversions hyperparameters = self._special_conversions(hyperparameters) classifier = self.class_(**hyperparameters) if Model.PCA in atm_params and atm_params[Model.PCA]: whiten = (Model.WHITEN in atm_params and atm_params[Model.WHITEN]) pca_dims = atm_params[Model.PCA_DIMS] # PCA dimension in atm_params is a float reprsenting percentages of # features to use if pca_dims < 1: dimensions = int(pca_dims * float(self.num_features)) logger.info("Using PCA to reduce %d features to %d dimensions" % (self.num_features, dimensions)) pca = decomposition.PCA(n_components=dimensions, whiten=whiten) steps.append(('pca', pca)) # should we scale the data? if atm_params.get(Model.SCALE): steps.append(('standard_scale', StandardScaler())) elif self.params.get(Model.MINMAX): steps.append(('minmax_scale', MinMaxScaler())) # add the classifier as the final step in the pipeline steps.append((self.method, classifier)) self.pipeline = Pipeline(steps) def _cross_validate(self, X, y): # TODO: this is hacky. See https://github.com/HDI-Project/ATM/issues/48 binary = self.num_classes == 2 kwargs = {} if self.verbose_metrics: kwargs['include_curves'] = True if not binary: kwargs['include_per_class'] = True df, cv_scores = cross_validate_pipeline(pipeline=self.pipeline, X=X, y=y, binary=binary, n_folds=self.N_FOLDS, **kwargs) self.cv_judgment_metric = np.mean(df[self.judgment_metric]) self.cv_judgment_metric_stdev = np.std(df[self.judgment_metric]) cv_stdev = (2 * self.cv_judgment_metric_stdev) self.mu_sigma_judgment_metric = self.cv_judgment_metric - cv_stdev return cv_scores def _test_final_model(self, X, y): """ Test the (already trained) model pipeline on the provided test data (X and y). Store the test judgment metric and return the rest of the metrics as a hierarchical dictionary. """ # time the prediction start_time = time.time() total = time.time() - start_time self.avg_predict_time = total / float(len(y)) # TODO: this is hacky. See https://github.com/HDI-Project/ATM/issues/48 binary = self.num_classes == 2 kwargs = {} if self.verbose_metrics: kwargs['include_curves'] = True if not binary: kwargs['include_per_class'] = True # compute the actual test scores! test_scores = test_pipeline(self.pipeline, X, y, binary, **kwargs) # save meta-metrics self.test_judgment_metric = test_scores.get(self.judgment_metric) return test_scores
[docs] def train_test(self, dataset): """Train and test this model using Cross Validation and Holdout. Args: dataset (Dataset): Dataset object from database. Returns: dict: Dictionary containing: * cv (list): The cross validation scores array * test (dict): The test scores dictionary """ self.num_classes = dataset.k_classes self.num_features = dataset.d_features # if necessary, cast judgment metric into its binary/multiary equivalent if self.num_classes == 2: if self.judgment_metric in [Metrics.F1_MICRO, Metrics.F1_MACRO]: self.judgment_metric = Metrics.F1 elif self.judgment_metric in [Metrics.ROC_AUC_MICRO, Metrics.ROC_AUC_MACRO]: self.judgment_metric = Metrics.ROC_AUC else: if self.judgment_metric == Metrics.F1: self.judgment_metric = Metrics.F1_MACRO elif self.judgment_metric == Metrics.ROC_AUC: self.judgment_metric = Metrics.ROC_AUC_MACRO # load training data train_data, test_data = dataset.load(self.testing_ratio, self.random_state) # extract feature matrix and labels from raw data self.encoder = DataEncoder(class_column=self.class_column) self.encoder.fit(train_data) X_train, y_train = self.encoder.transform(train_data) X_test, y_test = self.encoder.transform(test_data) # create and cross-validate pipeline self._make_pipeline() cv_scores = self._cross_validate(X_train, y_train) # train and test the final model self.pipeline.fit(X_train, y_train) test_scores = self._test_final_model(X_test, y_test) return {'cv': cv_scores, 'test': test_scores}
[docs] def predict(self, data): """Generate predictions from new data. Args: data (pandas.DataFrame): Data for which to predict classes Returns: pandas.Series: Vector of predictions """ X, _ = self.encoder.transform(data) predictions = self.pipeline.predict(X) labels = self.encoder.label_encoder.inverse_transform(predictions) labels = pd.Series(labels, index=data.index) return labels
def _special_conversions(self, params): # TODO: replace this logic with something better # create list parameters lists = defaultdict(list) element_regex = re.compile(r'(.*)\[(\d)\]') for name, param in list(params.items()): # look for variables of the form "param_name[1]" match = element_regex.match(name) if match: # name of the list parameter lname = match.groups()[0] # index of the list item index = int(match.groups()[1]) lists[lname].append((index, param)) # drop the element parameter from our list del params[name] for lname, items in list(lists.items()): # drop the list size parameter del params['len(%s)' % lname] # sort the list by index params[lname] = [val for idx, val in sorted(items)] # Gaussian process classifier if self.method == "gp": if params["kernel"] == "constant": params["kernel"] = ConstantKernel() elif params["kernel"] == "rbf": params["kernel"] = RBF() elif params["kernel"] == "matern": params["kernel"] = Matern(nu=params["nu"]) del params["nu"] elif params["kernel"] == "rational_quadratic": params["kernel"] = RationalQuadratic(length_scale=params["length_scale"], alpha=params["alpha"]) del params["length_scale"] del params["alpha"] elif params["kernel"] == "exp_sine_squared": params["kernel"] = ExpSineSquared(length_scale=params["length_scale"], periodicity=params["periodicity"]) del params["length_scale"] del params["periodicity"] # return the updated parameter vector return params
[docs] @classmethod def load(cls, path): """Loads a saved Model instance from a path. Args: path (str): path where the model is saved. Returns: Model: New model instance. """ with open(path, 'rb') as classifier: return pickle.load(classifier)
[docs] def save(self, path, force=False): """Save this Model using pickle. Args: path (str): Path where the model should be saved. force (bool): If True, overwrite the model if it already exists. """ if os.path.exists(path) and not force: print('The indicated path already exists. Use `force=True` to overwrite.') base_path = os.path.dirname(path) if not os.path.exists(base_path): os.makedirs(base_path) with open(path, 'wb') as pickle_file: pickle.dump(self, pickle_file) print("Model saved as {}".format(path))