#!/usr/bin/python2.7
from __future__ import absolute_import, unicode_literals
import datetime
import imp
import logging
import os
import re
import socket
import traceback
import warnings
from builtins import object, str
from collections import defaultdict
import boto3
import numpy as np
from atm.classifier import Model
from atm.constants import CUSTOM_CLASS_REGEX, SELECTORS, TUNERS
from atm.database import ClassifierStatus, DBSession
from atm.utilities import ensure_directory, get_instance, save_metrics, save_model, update_params
# shhh
warnings.filterwarnings('ignore')
# for garrays
os.environ['GNUMPY_IMPLICIT_CONVERSION'] = 'allow'
# load the library-wide logger
LOGGER = logging.getLogger('atm')
# Local hostname, for logging.
HOSTNAME = socket.gethostname()
# Exception thrown when something goes wrong for the worker, but the worker
# handles the error.
[docs]class ClassifierError(Exception):
pass
[docs]class Worker(object):
def __init__(self, database, datarun, save_files=True, cloud_mode=False,
aws_access_key=None, aws_secret_key=None, s3_bucket=None, s3_folder=None,
models_dir='models', metrics_dir='metrics', verbose_metrics=False):
self.db = database
self.datarun = datarun
self.save_files = save_files
self.cloud_mode = cloud_mode
self.aws_access_key = aws_access_key
self.aws_secret_key = aws_secret_key
self.s3_bucket = s3_bucket
self.s3_folder = s3_folder
self.models_dir = models_dir
self.metrics_dir = metrics_dir
self.verbose_metrics = verbose_metrics
ensure_directory(self.models_dir)
ensure_directory(self.metrics_dir)
# load the Dataset from the database
self.dataset = self.db.get_dataset(self.datarun.dataset_id)
# load the Selector and Tuner classes specified by our datarun
self.load_selector()
self.load_tuner()
[docs] def load_selector(self):
"""
Load and initialize the BTB class which will be responsible for
selecting hyperpartitions.
"""
# selector will either be a key into SELECTORS or a path to
# a file that defines a class called CustomSelector.
if self.datarun.selector in SELECTORS:
Selector = SELECTORS[self.datarun.selector]
else:
path, classname = re.match(CUSTOM_CLASS_REGEX,
self.datarun.selector).groups()
mod = imp.load_source('btb.selection.custom', path)
Selector = getattr(mod, classname)
LOGGER.info('Selector: %s' % Selector)
# generate the arguments we need to initialize the selector
hyperpartitions = self.db.get_hyperpartitions(datarun_id=self.datarun.id)
hp_by_method = defaultdict(list)
for hp in hyperpartitions:
hp_by_method[hp.method].append(hp.id)
hyperpartition_ids = [hp.id for hp in hyperpartitions]
# Selector classes support passing in redundant arguments
self.selector = get_instance(Selector,
choices=hyperpartition_ids,
k=self.datarun.k_window,
by_algorithm=dict(hp_by_method))
[docs] def load_tuner(self):
"""
Load, but don't initialize, the BTB class which will be responsible for
choosing non-hyperpartition hyperparameter values (a subclass of Tuner). The
tuner must be initialized with information about the hyperpartition, so it
cannot be created until later.
"""
# tuner will either be a key into TUNERS or a path to
# a file that defines a class called CustomTuner.
if self.datarun.tuner in TUNERS:
self.Tuner = TUNERS[self.datarun.tuner]
else:
path, classname = re.match(CUSTOM_CLASS_REGEX, self.datarun.tuner).groups()
mod = imp.load_source('btb.tuning.custom', path)
self.Tuner = getattr(mod, classname)
LOGGER.info('Tuner: %s' % self.Tuner)
[docs] def select_hyperpartition(self):
"""
Use the hyperpartition selection method specified by our datarun to choose a
hyperpartition of hyperparameters from the ModelHub. Only consider
partitions for which gridding is not complete.
"""
hyperpartitions = self.db.get_hyperpartitions(datarun_id=self.datarun.id)
# load classifiers and build scores lists
# make sure all hyperpartitions are present in the dict, even ones that
# don't have any classifiers. That way the selector can choose hyperpartitions
# that haven't been scored yet.
hyperpartition_scores = {fs.id: [] for fs in hyperpartitions}
classifiers = self.db.get_classifiers(datarun_id=self.datarun.id)
for c in classifiers:
# ignore hyperpartitions for which gridding is done
if c.hyperpartition_id not in hyperpartition_scores:
continue
# the cast to float is necessary because the score is a Decimal;
# doing Decimal-float arithmetic throws errors later on.
score = float(getattr(c, self.datarun.score_target) or 0)
hyperpartition_scores[c.hyperpartition_id].append(score)
hyperpartition_id = self.selector.select(hyperpartition_scores)
return self.db.get_hyperpartition(hyperpartition_id)
[docs] def tune_hyperparameters(self, hyperpartition):
"""
Use the hyperparameter tuning method specified by our datarun to choose
a set of hyperparameters from the potential space.
"""
# Get parameter metadata for this hyperpartition
tunables = hyperpartition.tunables
# If there aren't any tunable parameters, we're done. Return the vector
# of values in the hyperpartition and mark the set as finished.
if not len(tunables):
LOGGER.warning('No tunables for hyperpartition %d' % hyperpartition.id)
self.db.mark_hyperpartition_gridding_done(hyperpartition.id)
return update_params(params=[],
tunables=tunables,
categoricals=hyperpartition.categoricals,
constants=hyperpartition.constants)
# Get previously-used parameters: every classifier should either be
# completed or have thrown an error
all_clfs = self.db.get_classifiers(hyperpartition_id=hyperpartition.id)
classifiers = [c for c in all_clfs if c.status == ClassifierStatus.COMPLETE]
X = [c.hyperparameter_values for c in classifiers]
y = np.array([float(getattr(c, self.datarun.score_target)) for c in classifiers])
# Initialize the tuner and propose a new set of parameters
# this has to be initialized with information from the hyperpartition, so we
# need to do it fresh for each classifier (not in load_tuner)
tuner = get_instance(self.Tuner,
tunables=tunables,
gridding=self.datarun.gridding,
r_minimum=self.datarun.r_minimum)
if len(X) > 0:
tuner.add(X, y)
params = tuner.propose()
if params is None and self.datarun.gridding:
LOGGER.info('Gridding done for hyperpartition %d' % hyperpartition.id)
self.db.mark_hyperpartition_gridding_done(hyperpartition.id)
return None
# Append categorical and constants to the params.
return update_params(params=params,
categoricals=hyperpartition.categoricals,
constants=hyperpartition.constants)
[docs] def test_classifier(self, method, params):
"""
Given a set of fully-qualified hyperparameters, create and test a
classifier model.
Returns: Model object and metrics dictionary
"""
model = Model(method=method, params=params,
judgment_metric=self.datarun.metric,
class_column=self.dataset.class_column,
verbose_metrics=self.verbose_metrics)
train_path, test_path = self.dataset.load()
metrics = model.train_test(self.dataset)
target = self.datarun.score_target
def metric_string(model):
if 'cv' in target or 'mu_sigma' in target:
return '%.3f +- %.3f' % (model.cv_judgment_metric,
2 * model.cv_judgment_metric_stdev)
else:
return '%.3f' % model.test_judgment_metric
LOGGER.info('Judgment metric (%s, %s): %s' % (self.datarun.metric,
target[:-len('_judgment_metric')],
metric_string(model)))
old_best = self.db.get_best_classifier(datarun_id=self.datarun.id,
score_target=target)
if old_best is not None:
if getattr(model, target) > getattr(old_best, target):
LOGGER.info('New best score! Previous best (classifier %s): %s',
old_best.id, metric_string(old_best))
else:
LOGGER.info('Best so far (classifier %s): %s',
old_best.id, metric_string(old_best))
return model, metrics
[docs] def save_classifier(self, classifier_id, model, metrics):
"""
Update a classifier with metrics and model information and mark it as
"complete"
classifier_id: ID of the classifier to save
model: Model object containing a serializable representation of the
final model generated by this classifier.
metrics: Dictionary containing cross-validation and test metrics data
for the model.
"""
# whether to save model and metrics data to the filesystem
if self.save_files:
# keep a database session open so that the utility functions can
# access the linked hyperpartitions and dataruns
with DBSession(self.db):
classifier = self.db.get_classifier(classifier_id)
model_path = save_model(classifier, self.models_dir, model)
metric_path = save_metrics(classifier, self.metrics_dir, metrics)
# if necessary, save model and metrics to Amazon S3 bucket
if self.cloud_mode:
try:
model_path, metric_path = self.save_classifier_cloud(model_path, metric_path)
except Exception:
msg = traceback.format_exc()
LOGGER.error('Error in save_classifier_cloud()')
self.db.mark_classifier_errored(classifier_id, error_message=msg)
else:
model_path = None
metric_path = None
# update the classifier in the database
self.db.complete_classifier(classifier_id=classifier_id,
model_location=model_path,
metrics_location=metric_path,
cv_score=model.cv_judgment_metric,
cv_stdev=model.cv_judgment_metric_stdev,
test_score=model.test_judgment_metric)
# update this session's hyperpartition entry
LOGGER.info('Saved classifier %d.' % classifier_id)
[docs] def save_classifier_cloud(self, local_model_path, local_metric_path, delete_local=False):
"""
Save a classifier to the S3 bucket supplied on __init__. Saves a
serialized representaion of the model as well as a detailed set
of metrics.
local_model_path: path to serialized model in the local file system
local_metric_path: path to serialized metrics in the local file system
"""
client = boto3.client(
's3',
aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_key,
)
if self.s3_folder:
aws_model_path = os.path.join(self.s3_folder, local_model_path)
aws_metric_path = os.path.join(self.s3_folder, local_metric_path)
else:
aws_model_path = local_model_path
aws_metric_path = local_metric_path
LOGGER.info('Uploading model at %s to s3://%s/%s',
local_model_path, self.s3_bucket, aws_model_path)
client.upload_file(local_model_path, self.s3_bucket, aws_model_path)
LOGGER.info('Uploading metric at %s to s3://%s/%s',
local_metric_path, self.s3_bucket, aws_metric_path)
client.upload_file(local_metric_path, self.s3_bucket, aws_metric_path)
if delete_local:
LOGGER.info('Deleting local copies of %s and %s',
local_model_path, local_metric_path)
os.remove(local_model_path)
os.remove(local_metric_path)
return (
's3://{}/{}'.format(self.s3_bucket, aws_model_path),
's3://{}/{}'.format(self.s3_bucket, aws_metric_path)
)
[docs] def is_datarun_finished(self):
"""
Check to see whether the datarun is finished. This could be due to the
budget being exhausted or due to hyperparameter gridding being done.
"""
hyperpartitions = self.db.get_hyperpartitions(datarun_id=self.datarun.id)
if not hyperpartitions:
LOGGER.warning('No incomplete hyperpartitions for datarun %d present in database.'
% self.datarun.id)
return True
if self.datarun.budget_type == 'classifier':
# hyperpartition classifier counts are updated whenever a classifier
# is created, so this will count running, errored, and complete.
n_completed = len(self.db.get_classifiers(datarun_id=self.datarun.id))
if n_completed >= self.datarun.budget:
LOGGER.warning('Classifier budget has run out!')
return True
elif self.datarun.budget_type == 'walltime':
deadline = self.datarun.deadline
if datetime.datetime.now() > deadline:
LOGGER.warning('Walltime budget has run out!')
return True
return False
[docs] def run_classifier(self, hyperpartition_id=None):
"""
Choose hyperparameters, then use them to test and save a Classifier.
"""
# check to see if our work is done
if self.is_datarun_finished():
# marked the run as done successfully
self.db.mark_datarun_complete(self.datarun.id)
LOGGER.warning('Datarun %d has ended.' % self.datarun.id)
return
try:
LOGGER.debug('Choosing hyperparameters...')
if hyperpartition_id is not None:
hyperpartition = self.db.get_hyperpartition(hyperpartition_id)
if hyperpartition.datarun_id != self.datarun.id:
LOGGER.error('Hyperpartition %d is not a part of datarun %d'
% (hyperpartition_id, self.datarun.id))
return
else:
# use the multi-arm bandit to choose which hyperpartition to use next
hyperpartition = self.select_hyperpartition()
# use tuner to choose a set of parameters for the hyperpartition
params = self.tune_hyperparameters(hyperpartition)
except Exception:
LOGGER.error('Error choosing hyperparameters: datarun=%s' % str(self.datarun))
LOGGER.error(traceback.format_exc())
raise ClassifierError()
if params is None:
LOGGER.warning('No parameters chosen: hyperpartition %d is finished.'
% hyperpartition.id)
return
param_info = 'Chose parameters for method "%s":' % hyperpartition.method
for k in sorted(params.keys()):
param_info += '\n\t%s = %s' % (k, params[k])
LOGGER.info(param_info)
LOGGER.debug('Creating classifier...')
classifier = self.db.start_classifier(hyperpartition_id=hyperpartition.id,
datarun_id=self.datarun.id,
host=HOSTNAME,
hyperparameter_values=params)
try:
LOGGER.debug('Testing classifier...')
model, metrics = self.test_classifier(hyperpartition.method, params)
LOGGER.debug('Saving classifier...')
self.save_classifier(classifier.id, model, metrics)
except Exception:
msg = traceback.format_exc()
LOGGER.error('Error testing classifier: datarun=%s' % str(self.datarun))
LOGGER.error(msg)
self.db.mark_classifier_errored(classifier.id, error_message=msg)
raise ClassifierError()