Source code for featurehub.evaluation.client

import sys
import os
import json
import pandas as pd
import requests
import collections
import dill
import traceback
from urllib.parse import quote_from_bytes

from featurehub.util import (
    compute_dataset_hash, run_isolated, get_source, possibly_talking_action,
    myhash
)
from featurehub.admin.sqlalchemy_declarative import Problem, Feature
from featurehub.evaluation                   import EvaluationResponse
from featurehub.modeling                     import Model

[docs]class EvaluatorClient(object): def __init__(self, problem_id, username, orm, dataset={}, target=None, entities_featurized=None): self.problem_id = problem_id self.username = username self.orm = orm self.dataset = dataset self.target = target self.entities_featurized = entities_featurized if self.dataset: self.__dataset_hash = compute_dataset_hash(self.dataset) else: self.__dataset_hash = None
[docs] def check_if_registered(self, feature, verbose=False): """Check if feature is registered. Extracts source code, then looks for the identical source code in the feature database. Parameters ---------- feature : function verbose : bool Whether to print output. """ code = get_source(feature) return self._check_if_registered(code, verbose=verbose)
def _check_if_registered(self, code, verbose=False): md5 = myhash(code) with self.orm.session_scope() as session: filters = ( Feature.problem_id == self.problem_id, Feature.md5 == md5, ) query = session.query(Feature).filter(*filters) result = query.scalar() if result: if verbose: print("Feature already registered.") return True return False
[docs] def submit(self, feature, description): """Submit feature to server for evaluation on test data. If successful, registers feature in feature database and returns key performance metrics. Runs the feature in an isolated environment to extract the feature values. Validates the feature values. Then, builds a model on that one feature, performs cross validation, and returns key performance metrics. Parameters ---------- feature : function Feature to evaluate description : str Feature description """ from featurehub.user.session import Session feature_dill = quote_from_bytes(dill.dumps(feature)) code = get_source(feature) data = { "database" : self.orm.database, "problem_id" : self.problem_id, "feature_dill" : feature_dill, "code" : code, "description" : description, } response = Session._eval_server_post("submit", data) if response.ok: try: eval_response = EvaluationResponse.from_string(response.text) print(eval_response) except Exception as e: # TODO print("response failed with exception") print(traceback.format_exc(), file=sys.stderr) try: print(response, file=sys.stderr) print(response.text, file=sys.stderr) except Exception: pass else: # TODO print("response failed with bad status code") try: print(response, file=sys.stderr) print(response.text, file=sys.stderr) except Exception: pass
[docs] def evaluate(self, feature): """Evaluate feature on training dataset and return key performance metrics. Runs the feature in an isolated environment to extract the feature values. Validates the feature values. Then, builds a model on that one feature and computes key cross-validated metrics. Prints results and returns a dictionary with (metric => value) entries. If the feature is invalid, prints reason and returns empty dictionary. Parameters ---------- feature : function Feature to evaluate """ try: metrics = self._evaluate(feature, verbose=True) metrics_str = metrics.to_string(kind="user") metrics_user = metrics.convert(kind="user") print(metrics_str) except ValueError as e: print("Feature is not valid: {}".format(str(e)), file=sys.stderr) print(traceback.format_exc(), file=sys.stderr) metrics_user = {} try: # TODO this can be an async procedure self._log_evaluation_attempt(feature) except Exception: pass return metrics_user
def _log_evaluation_attempt(self, feature): from featurehub.user.session import Session code = get_source(feature) data = { "database" : self.orm.database, "problem_id" : self.problem_id, "code" : code, } Session._eval_server_post("log-evaluation-attempt", data) def _evaluate(self, feature, verbose=False): with possibly_talking_action("Obtaining dataset...", verbose): self._load_dataset() with possibly_talking_action("Extracting features...", verbose): feature_values = self._extract_features(feature) # confirm dataset has not been changed with possibly_talking_action("Verifying dataset integrity...", verbose): self._verify_dataset_integrity() # validate with possibly_talking_action("Validating feature values...", verbose): result = self._validate_feature_values(feature_values) # full feature matrix with possibly_talking_action("Building full feature matrix...", verbose): X = self._build_feature_matrix(feature_values) # target values # with possibly_talking_action("Extracting target values...", verbose): Y = self._extract_label() # compute metrics with possibly_talking_action("Fitting model and computing metrics...", verbose): metrics = self._compute_metrics(X, Y) return metrics # # The rest of these methods are subroutines within _evaluate, or utility # functions of those subroutines. # def _create_model(self): with self.orm.session_scope() as session: problem = session.query(Problem)\ .filter(Problem.id == self.problem_id).one() problem_type = problem.problem_type return Model(problem_type) def _compute_metrics(self, X, Y): model = self._create_model() metrics = model.compute_metrics_cv(X, Y) return metrics def _extract_features(self, feature): assert isinstance(feature, collections.Callable), \ "feature must be a function!" return run_isolated(feature, self.dataset) def _extract_label(self): if pd.DataFrame(self.target).empty: self._load_dataset() return self.target def _load_dataset_split(self, split="train", dataset={}, entities_featurized=None, target=None, dataset_hash=None, compute_hash=True): # query db for import parameters to load files is_present_dataset = bool(dataset) is_present_entities_featurized = not pd.DataFrame(entities_featurized).empty is_present_target = not pd.DataFrame(target).empty is_anything_missing = not all( [is_present_dataset, is_present_entities_featurized, is_present_target]) if is_anything_missing: with self.orm.session_scope() as session: problem = session.query(Problem)\ .filter(Problem.id == self.problem_id).one() problem_data_dir = getattr(problem, "data_dir_{}".format(split)) problem_files = json.loads(problem.files) problem_table_names = json.loads(problem.table_names) problem_entities_featurized_table_name = \ problem.entities_featurized_table_name problem_target_table_name = problem.target_table_name # load entities and other tables if not is_present_dataset: # load other tables for (table_name, filename) in zip (problem_table_names, problem_files): if table_name == problem_entities_featurized_table_name or \ table_name == problem_target_table_name: continue abs_filename = os.path.join(problem_data_dir, filename) dataset[table_name] = pd.read_csv(abs_filename, low_memory=False, header=0) # compute/recompute hash if compute_hash: dataset_hash = compute_dataset_hash(dataset) else: dataset_hash = None # recompute dataset hash. condition only met if we dataset has already # loaded, but dataset hash had not been computed. (because we just # computed hash several lines above!) if compute_hash: if not dataset_hash: dataset_hash = compute_dataset_hash(dataset) # load entities featurized if not is_present_entities_featurized: # if empty string, we simply don't have any features to add if problem_entities_featurized_table_name: cols = list(problem_table_names) ind_features = cols.index(problem_entities_featurized_table_name) abs_filename = os.path.join(problem_data_dir, problem_files[ind_features]) entities_featurized = pd.read_csv(abs_filename, low_memory=False, header=0) # load target if not is_present_target: cols = list(problem_table_names) ind_target = cols.index(problem_target_table_name) abs_filename = os.path.join(problem_data_dir, problem_files[ind_target]) # target might not exist if we are making predictions on unseen # test data if os.path.exists(abs_filename): target = pd.read_csv(abs_filename, low_memory=False, header=0) else: target = None return dataset, entities_featurized, target, dataset_hash def _load_dataset(self): """Load dataset if not present. Also computes/re-computes dataset hash. """ # TODO check for dtypes file, facilitating lower memory usage self.dataset, self.entities_featurized, self.target, \ self.__dataset_hash = self._load_dataset_split( split="train", dataset=self.dataset, entities_featurized=self.entities_featurized, target=self.target, dataset_hash=self.__dataset_hash) def _reload_dataset(self): """Force reload of dataset. Doesn't reload entities_featurized or target, because we only call this routine when the dataset hash has changed. """ self.dataset = {} self._load_dataset() def _validate_feature_values(self, feature_values): """Check whether feature values are valid. Currently checks if the feature is a DataFrame of the correct dimensions. If the feature is valid, returns an empty string. Otherwise, raises ValueError with message of a str containing a semicolon-delimited list of reasons that the feature is invalid. Parameters ---------- feature_values : np array-like Returns ------- Empty string if feature values are valid. Raises ------ ValueError with message of a str containing semicolon-delimited list of reasons that the feature is invalid. """ problems = [] # must be coerced to DataFrame try: feature_values_df = pd.DataFrame(feature_values) except Exception: problems.append("cannot be coerced to DataFrame") problems = "; ".join(problems) raise ValueError(problems) if pd.DataFrame(self.target).empty: self._load_dataset() # must have the right shape expected_shape = (self.target.shape[0], 1) # pylint: disable=no-member if feature_values_df.shape != expected_shape: problems.append( "returns DataFrame of invalid shape " "(actual {}, expected {})".format( feature_values_df.shape, expected_shape) ) problems = "; ".join(problems) if problems: raise ValueError(problems) # problems must be an empty string return problems def _verify_dataset_integrity(self): new_hash = compute_dataset_hash(self.dataset) if self.__dataset_hash != new_hash: print("Old hash: {}".format(self.__dataset_hash), file=sys.stderr) print("New hash: {}".format(new_hash), file=sys.stderr) #TODO exception handling self._reload_dataset() def _build_feature_matrix(self, feature_values): values_df = pd.DataFrame(feature_values) if not pd.DataFrame(self.entities_featurized).empty: X = pd.concat([self.entities_featurized, values_df], axis=1) else: X = values_df return X
[docs]class EvaluatorServer(EvaluatorClient): def __init__(self, problem_id, username, orm): super().__init__(problem_id, username, orm) # separate training and testing datasets self.dataset_train = {} self.target_train = None self.entities_featurized_train = None self.dataset_test = {} self.target_test = None self.entities_featurized_test = None
[docs] def check_if_registered(self, code, verbose=False): """Check if feature is registered. Overwrites client method by expecting code to be passed directly. This is because on the server, we are limited to be unable to do code -> function -> code. Parameters ---------- code : str verbose : bool, optional (default=False) Whether to print output. """ return self._check_if_registered(code, verbose=verbose)
[docs] def evaluate(self, feature): """Evaluate feature. Returns a dictionary with (metric => value) entries. If the feature is invalid, re-raises the ValueError. Parameters ---------- feature : function Feature to evaluate """ try: metrics = self._evaluate(feature, verbose=False) return metrics except ValueError as e: raise
[docs] def submit(self, feature, description): """Does nothing. This class is instantiated at the server, thus we are already registering the feature. """ pass
def _compute_metrics(self, X, Y): model = self._create_model() # split X and Y into train and test n = len(self.target_train) metrics = model.compute_metrics_train_test(X, Y, n) return metrics def _verify_dataset_integrity(self): """Does nothing. Don't need to verify dataset integrity on server because we re-load the dataset for every new feature. """ pass def _load_dataset(self): # load dataset for train data self.dataset_train, self.entities_featurized_train, \ self.target_train, _ = self._load_dataset_split( split="train", dataset=self.dataset_train, entities_featurized=self.entities_featurized_train, target=self.target_train, compute_hash=False) # load dataset for test data self.dataset_test, self.entities_featurized_test, \ self.target_test, _ = self._load_dataset_split( split="test", dataset=self.dataset_test, entities_featurized=self.entities_featurized_test, target=self.target_test, compute_hash=False) # make a copy of the dataset self.dataset = {} for key in self.dataset_train: self.dataset[key] = self.dataset_train[key].copy() # concatenate as applicable with self.orm.session_scope() as session: problem = session.query(Problem)\ .filter(Problem.id == self.problem_id).one() problem_entities_table_name = problem.entities_table_name self.dataset[problem_entities_table_name] = \ pd.concat([self.dataset_train[problem_entities_table_name], self.dataset_test[problem_entities_table_name]], axis=0) try: self.target = pd.concat([self.target_train, self.target_test], axis=0) except Exception: # todo self.target = self.target_train try: self.entities_featurized = pd.concat([self.entities_featurized_train, self.entities_featurized_test], axis=0) except ValueError: # if there are no preprocessed features in the first place, all # will be None, and pd.concat fails self.entities_featurized = None # Reset indices. Otherwise, since we did a vertical concatenation, we # have duplicate values in our indices. self.dataset[problem_entities_table_name].reset_index(drop=True, inplace=True) self.target.reset_index(drop=True, inplace=True) if self.entities_featurized is not None: self.entities_featurized.reset_index(drop=True, inplace=True) def _evaluate(self, feature, verbose=False): metrics = super()._evaluate(feature, verbose) return metrics