# -*- coding: utf-8 -*-
"""Core ATM module.
This module contains the ATM class, which is the one responsible for
executing and orchestrating the main ATM functionalities.
"""
import logging
import random
import time
from datetime import datetime, timedelta
from operator import attrgetter
from tqdm import tqdm
from atm.constants import TIME_FMT, PartitionStatus, RunStatus
from atm.database import Database
from atm.method import Method
from atm.worker import ClassifierError, Worker
LOGGER = logging.getLogger(__name__)
[docs]class ATM(object):
_LOOP_WAIT = 5
def __init__(
self,
# SQL Conf
dialect='sqlite',
database='atm.db',
username=None,
password=None,
host=None,
port=None,
query=None,
# AWS Conf
access_key=None,
secret_key=None,
s3_bucket=None,
s3_folder=None,
# Log Conf
models_dir='models',
metrics_dir='metrics',
verbose_metrics=False,
):
self.db = Database(dialect, database, username, host, port, query)
self.aws_access_key = access_key
self.aws_secret_key = secret_key
self.s3_bucket = s3_bucket
self.s3_folder = s3_folder
self.models_dir = models_dir
self.metrics_dir = metrics_dir
self.verbose_metrics = verbose_metrics
[docs] def add_dataset(self, train_path, test_path=None, name=None,
description=None, class_column=None):
"""Add a new dataset to the Database.
Args:
train_path (str):
Path to the training CSV file. It can be a local filesystem path,
absolute or relative, or an HTTP or HTTPS URL, or an S3 path in the
format ``s3://{bucket_name}/{key}``. Required.
test_path (str):
Path to the testing CSV file. It can be a local filesystem path,
absolute or relative, or an HTTP or HTTPS URL, or an S3 path in the
format ``s3://{bucket_name}/{key}``.
Optional. If not given, the training CSV will be split in two parts,
train and test.
name (str):
Name given to this dataset. Optional. If not given, a hash will be
generated from the training_path and used as the Dataset name.
description (str):
Human friendly description of the Dataset. Optional.
class_column (str):
Name of the column that will be used as the target variable.
Optional. Defaults to ``'class'``.
Returns:
Dataset:
The created dataset.
"""
return self.db.create_dataset(
train_path=train_path,
test_path=test_path,
name=name,
description=description,
class_column=class_column,
aws_access_key=self.aws_access_key,
aws_secret_key=self.aws_secret_key,
)
[docs] def add_datarun(self, dataset_id, budget=100, budget_type='classifier',
gridding=0, k_window=3, metric='f1', methods=['logreg', 'dt', 'knn'],
r_minimum=2, run_per_partition=False, score_target='cv', priority=1,
selector='uniform', tuner='uniform', deadline=None):
"""Register one or more Dataruns to the Database.
The methods hyperparameters will be analyzed and Hyperpartitions generated
from them.
If ``run_per_partition`` is ``True``, one Datarun will be created for each
Hyperpartition. Otherwise, a single one will be created for all of them.
Args:
dataset_id (int):
Id of the Dataset which this Datarun will belong to.
budget (int):
Budget amount. Optional. Defaults to ``100``.
budget_type (str):
Budget Type. Can be 'classifier' or 'walltime'.
Optional. Defaults to ``'classifier'``.
gridding (int):
``gridding`` setting for the Tuner. Optional. Defaults to ``0``.
k_window (int):
``k`` setting for the Selector. Optional. Defaults to ``3``.
metric (str):
Metric to use for the tuning and selection. Optional. Defaults to ``'f1'``.
methods (list):
List of methods to try. Optional. Defaults to ``['logreg', 'dt', 'knn']``.
r_minimum (int):
``r_minimum`` setting for the Tuner. Optional. Defaults to ``2``.
run_per_partition (bool):
whether to create a separated Datarun for each Hyperpartition or not.
Optional. Defaults to ``False``.
score_target (str):
Which score to use for the tuning and selection process. It can be ``'cv'`` or
``'test'``. Optional. Defaults to ``'cv'``.
priority (int):
Priority of this Datarun. The higher the better. Optional. Defaults to ``1``.
selector (str):
Type of selector to use. Optional. Defaults to ``'uniform'``.
tuner (str):
Type of tuner to use. Optional. Defaults to ``'uniform'``.
deadline (str):
Time deadline. It must be a string representing a datetime in the format
``'%Y-%m-%d %H:%M'``. If given, ``budget_type`` will be set to ``'walltime'``.
Returns:
Datarun:
The created Datarun or list of Dataruns.
"""
if deadline:
deadline = datetime.strptime(deadline, TIME_FMT)
budget_type = 'walltime'
elif budget_type == 'walltime':
deadline = datetime.now() + timedelta(minutes=budget)
run_description = '___'.join([tuner, selector])
target = score_target + '_judgment_metric'
method_parts = {}
for method in methods:
# enumerate all combinations of categorical variables for this method
method_instance = Method(method)
method_parts[method] = method_instance.get_hyperpartitions()
LOGGER.info('method {} has {} hyperpartitions'.format(
method, len(method_parts[method])))
dataruns = list()
if not run_per_partition:
datarun = self.db.create_datarun(
dataset_id=dataset_id,
description=run_description,
tuner=tuner,
selector=selector,
gridding=gridding,
priority=priority,
budget_type=budget_type,
budget=budget,
deadline=deadline,
metric=metric,
score_target=target,
k_window=k_window,
r_minimum=r_minimum
)
dataruns.append(datarun)
for method, parts in method_parts.items():
for part in parts:
# if necessary, create a new datarun for each hyperpartition.
# This setting is useful for debugging.
if run_per_partition:
datarun = self.db.create_datarun(
dataset_id=dataset_id,
description=run_description,
tuner=tuner,
selector=selector,
gridding=gridding,
priority=priority,
budget_type=budget_type,
budget=budget,
deadline=deadline,
metric=metric,
score_target=target,
k_window=k_window,
r_minimum=r_minimum
)
dataruns.append(datarun)
# create a new hyperpartition in the database
self.db.create_hyperpartition(datarun_id=datarun.id,
method=method,
tunables=part.tunables,
constants=part.constants,
categoricals=part.categoricals,
status=PartitionStatus.INCOMPLETE)
dataset = self.db.get_dataset(dataset_id)
LOGGER.info('Dataruns created. Summary:')
LOGGER.info('\tDataset ID: {}'.format(dataset.id))
LOGGER.info('\tTraining data: {}'.format(dataset.train_path))
LOGGER.info('\tTest data: {}'.format(dataset.test_path))
if run_per_partition:
LOGGER.info('\tDatarun IDs: {}'.format(
', '.join(str(datarun.id) for datarun in dataruns)))
else:
LOGGER.info('\tDatarun ID: {}'.format(dataruns[0].id))
LOGGER.info('\tHyperpartition selection strategy: {}'.format(dataruns[0].selector))
LOGGER.info('\tParameter tuning strategy: {}'.format(dataruns[0].tuner))
LOGGER.info('\tBudget: {} ({})'.format(dataruns[0].budget, dataruns[0].budget_type))
return dataruns if run_per_partition else dataruns[0]
[docs] def work(self, datarun_ids=None, save_files=True, choose_randomly=True,
cloud_mode=False, total_time=None, wait=True, verbose=False):
"""Get unfinished Dataruns from the database and work on them.
Check the ModelHub Database for unfinished Dataruns, and work on them
as they are added. This process will continue to run until it exceeds
total_time or there are no more Dataruns to process or it is killed.
Args:
datarun_ids (list):
list of IDs of Dataruns to work on. If ``None``, this will work on any
unfinished Dataruns found in the database. Optional. Defaults to ``None``.
save_files (bool):
Whether to save the fitted classifiers and their metrics or not.
Optional. Defaults to True.
choose_randomly (bool):
If ``True``, work on all the highest-priority dataruns in random order.
Otherwise, work on them in sequential order (by ID).
Optional. Defaults to ``True``.
cloud_mode (bool):
Save the models and metrics in AWS S3 instead of locally. This option
works only if S3 configuration has been provided on initialization.
Optional. Defaults to ``False``.
total_time (int):
Total time to run the work process, in seconds. If ``None``, continue to
run until interrupted or there are no more Dataruns to process.
Optional. Defaults to ``None``.
wait (bool):
If ``True``, wait for more Dataruns to be inserted into the Database
once all have been processed. Otherwise, exit the worker loop
when they run out.
Optional. Defaults to ``False``.
verbose (bool):
Whether to be verbose about the process. Optional. Defaults to ``True``.
"""
start_time = datetime.now()
# main loop
while True:
# get all pending and running dataruns, or all pending/running dataruns
# from the list we were given
dataruns = self.db.get_dataruns(include_ids=datarun_ids, ignore_complete=True)
if not dataruns:
if wait:
LOGGER.debug('No dataruns found. Sleeping %d seconds and trying again.',
self._LOOP_WAIT)
time.sleep(self._LOOP_WAIT)
continue
else:
LOGGER.info('No dataruns found. Exiting.')
break
# either choose a run randomly between priority, or take the run with the lowest ID
if choose_randomly:
run = random.choice(dataruns)
else:
run = sorted(dataruns, key=attrgetter('id'))[0]
# say we've started working on this datarun, if we haven't already
self.db.mark_datarun_running(run.id)
LOGGER.info('Computing on datarun %d' % run.id)
# actual work happens here
worker = Worker(self.db, run, save_files=save_files,
cloud_mode=cloud_mode, aws_access_key=self.aws_access_key,
aws_secret_key=self.aws_secret_key, s3_bucket=self.s3_bucket,
s3_folder=self.s3_folder, models_dir=self.models_dir,
metrics_dir=self.metrics_dir, verbose_metrics=self.verbose_metrics)
try:
if run.budget_type == 'classifier':
pbar = tqdm(
total=run.budget,
ascii=True,
initial=run.completed_classifiers,
disable=not verbose
)
while run.status != RunStatus.COMPLETE:
worker.run_classifier()
run = self.db.get_datarun(run.id)
if verbose and run.completed_classifiers > pbar.last_print_n:
pbar.update(run.completed_classifiers - pbar.last_print_n)
pbar.close()
elif run.budget_type == 'walltime':
pbar = tqdm(
disable=not verbose,
ascii=True,
initial=run.completed_classifiers,
unit=' Classifiers'
)
while run.status != RunStatus.COMPLETE:
worker.run_classifier()
run = self.db.get_datarun(run.id) # Refresh the datarun object.
if verbose and run.completed_classifiers > pbar.last_print_n:
pbar.update(run.completed_classifiers - pbar.last_print_n)
pbar.close()
except ClassifierError:
# the exception has already been handled; just wait a sec so we
# don't go out of control reporting errors
LOGGER.error('Something went wrong. Sleeping %d seconds.', self._LOOP_WAIT)
time.sleep(self._LOOP_WAIT)
elapsed_time = (datetime.now() - start_time).total_seconds()
if total_time is not None and elapsed_time >= total_time:
LOGGER.info('Total run time for worker exceeded; exiting.')
break
[docs] def run(self, train_path, test_path=None, name=None, description=None,
class_column='class', budget=100, budget_type='classifier', gridding=0, k_window=3,
metric='f1', methods=['logreg', 'dt', 'knn'], r_minimum=2, run_per_partition=False,
score_target='cv', selector='uniform', tuner='uniform', deadline=None, priority=1,
save_files=True, choose_randomly=True, cloud_mode=False, total_time=None,
verbose=True):
"""Create a Dataset and a Datarun and then work on it.
Args:
train_path (str):
Path to the training CSV file. It can be a local filesystem path,
absolute or relative, or an HTTP or HTTPS URL, or an S3 path in the
format ``s3://{bucket_name}/{key}``. Required.
test_path (str):
Path to the testing CSV file. It can be a local filesystem path,
absolute or relative, or an HTTP or HTTPS URL, or an S3 path in the
format ``s3://{bucket_name}/{key}``.
Optional. If not given, the training CSV will be split in two parts,
train and test.
name (str):
Name given to this dataset. Optional. If not given, a hash will be
generated from the training_path and used as the Dataset name.
description (str):
Human friendly description of the Dataset. Optional.
class_column (str):
Name of the column that will be used as the target variable.
Optional. Defaults to ``'class'``.
budget (int):
Budget amount. Optional. Defaults to ``100``.
budget_type (str):
Budget Type. Can be 'classifier' or 'walltime'.
Optional. Defaults to ``'classifier'``.
gridding (int):
``gridding`` setting for the Tuner. Optional. Defaults to ``0``.
k_window (int):
``k`` setting for the Selector. Optional. Defaults to ``3``.
metric (str):
Metric to use for the tuning and selection. Optional. Defaults to ``'f1'``.
methods (list):
List of methods to try. Optional. Defaults to ``['logreg', 'dt', 'knn']``.
r_minimum (int):
``r_minimum`` setting for the Tuner. Optional. Defaults to ``2``.
run_per_partition (bool):
whether to create a separated Datarun for each Hyperpartition or not.
Optional. Defaults to ``False``.
score_target (str):
Which score to use for the tuning and selection process. It can be ``'cv'`` or
``'test'``. Optional. Defaults to ``'cv'``.
priority (int):
Priority of this Datarun. The higher the better. Optional. Defaults to ``1``.
selector (str):
Type of selector to use. Optional. Defaults to ``'uniform'``.
tuner (str):
Type of tuner to use. Optional. Defaults to ``'uniform'``.
deadline (str):
Time deadline. It must be a string representing a datetime in the format
``'%Y-%m-%d %H:%M'``. If given, ``budget_type`` will be set to ``'walltime'``.
verbose (bool):
Whether to be verbose about the process. Optional. Defaults to ``True``.
Returns:
Datarun:
The created Datarun or list of Dataruns.
"""
dataset = self.add_dataset(train_path, test_path, name, description, class_column)
datarun = self.add_datarun(
dataset.id,
budget,
budget_type,
gridding,
k_window,
metric,
methods,
r_minimum,
run_per_partition,
score_target,
priority,
selector,
tuner,
deadline
)
if run_per_partition:
datarun_ids = [_datarun.id for _datarun in datarun]
else:
datarun_ids = [datarun.id]
if verbose:
print('Processing dataset {}'.format(train_path))
self.work(
datarun_ids,
save_files,
choose_randomly,
cloud_mode,
total_time,
False,
verbose=verbose
)
dataruns = self.db.get_dataruns(
include_ids=datarun_ids,
ignore_complete=False,
ignore_pending=True
)
if run_per_partition:
return dataruns
elif len(dataruns) == 1:
return dataruns[0]
[docs] def load_model(self, classifier_id):
"""Load a Model from the Database.
Args:
classifier_id (int):
Id of the Model to load.
Returns:
Model:
The loaded model instance.
"""
return self.db.get_classifier(classifier_id).load_model()