import sys
import os
import dill
import inspect
import importlib.util
from multiprocessing import Pool
from textwrap import dedent
from xxhash import xxh64
from tempfile import TemporaryDirectory
from hashlib import md5
from types import ModuleType
from contextlib import contextmanager
from pandas import concat
RANDOM_STATE = 1754
TRY_AGAIN_LATER = "Please try again later or contact administrator."
TRY_AGAIN = "Please contact administrator."
def _get_function_and_execute(f_dill, *args):
    f = dill.loads(f_dill)
    return f(*args)
[docs]def run_isolated(f, *args):
    """Execute `f(args)` in an isolated environment.
    First, uses dill to serialize the function. Unfortunately, pickle is unable
    to serialize some functions, so we must serialize and deserialize the
    function ourselves.
    """
    f_dill = dill.dumps(f)
    with Pool(1) as pool:
        return pool.apply(_get_function_and_execute, (f_dill, *args)) 
[docs]def get_source(function):
    """Extract the source code from a given function.
    Recursively extracts the source code for all local functions called by given
    function. The resulting source code is encoded in utf-8.
    Limitations: Cannot use `get_source` on function defined interactively in
    normal Python terminal. Functions defined interactively in IPython are still
    okay.
    Parameters
    ----------
    function : function
    """
    # Use nested function to allow us to ultimately encode as utf-8 string.
    def _get_source(function):
        out = []
        func_code    = function.__code__
        func_globals = function.__globals__
        func_name    = function.__name__
        # known limitation: cannot use from stdin
        if func_code.co_filename == '<stdin>':
            raise ValueError("Cannot use `get_source` on function defined interactively.")
        for name in func_code.co_names:
            if name != func_name:
                obj = func_globals.get(name)
                if obj and inspect.isfunction(obj):
                    out.append(_get_source(obj))
        out.append(inspect.getsource(function))
        seen = set()
        return "\n".join(x for x in out if not (x in seen or seen.add(x)))
    code = _get_source(function)
    # post-processing
    code = dedent(code)
    code = code.encode("utf-8")
    return code 
[docs]def get_function(source):
    """Return a function from given source code.
    This function is usually called on source code that was in turn produced by
    get_source. Note that the source code produced by get_source includes the
    source for the top-level function as well as any other local functions it
    calls. Here, we return the top-level function directly.
    Parameters
    ----------
    source : str or bytes
    """
    # decode into str
    if isinstance(source, bytes):
        code = source.decode("utf-8")
    elif isinstance(source, str):
        code = source
    else:
        raise ValueError
    # exec code in empty namespace
    try:
        namespace = {}
        exec(code, namespace)
    except (SyntaxError, IndentationError) as e:
        print(code)
        raise e
    # Get top-level function from list of functions
    name = get_top_level_function_name(namespace)
    return namespace[name] 
[docs]def get_top_level_function_name(namespace, remove_names=["__builtins__"]):
    """Figure out which is the top-level function in a namespace.
    The top-level function is defined as the function that is not a name in any
    other functions.  co_names is a tuple of local names.  We could make more
    efficient, using constant lookups of names, stopping when there is only name
    left, and confirming this name is not called by anyone; but hard to
    anticipate a situation where user defines function chain that is long enough
    that this efficiency is required.
    """
    if isinstance(namespace, dict):
        names = list(namespace.keys())
        def get_name(name):
            return namespace[name]
    elif isinstance(namespace, ModuleType):
        names = dir(namespace)
        def get_name(name):
            return getattr(namespace, name)
    else:
        raise ValueError("Invalid argument")
    for name in remove_names:
        names.remove(name)
    if not names:
        raise ValueError("No function was defined in source.")
    names_copy = list(names)
    for name in list(names):
        locals_ = get_name(name).__code__.co_names
        for local in locals_:
            if local != name and local in names:
                names.remove(local)
    # at this point, the only name remaining in names should be top-level
    # function
    if len(names) != 1:
        print("Something went wrong.", file=sys.stderr)
        print("\tnames (original): {}".format(names_copy), file=sys.stderr)
        print("\tnames (modified): {}".format(names), file=sys.stderr)
        raise ValueError
    return names[0] 
[docs]def get_function2(source):
    """Return a function from given source code.
    This function is usually called on source code that was in turn produced by
    get_source. This function differs from `get_function` in the method used is
    to write the source code to a file and then import that file as a new
    module.
    Note that the source code produced by get_source includes the source for the
    top-level function as well as any other local functions it calls. Here, we
    return the top-level function directly.
    Caveat: This does not solve the problem of being able to re-extract source
    from the returned function. (Or, at least, as currently implemented.)
    Parameters
    ----------
    source : str, bytes
    """
    # decode into str
    if isinstance(source, bytes):
        code = source.decode("utf-8")
    elif isinstance(source, str):
        code = source
    else:
        raise ValueError
    # first, write source to a file
    with TemporaryDirectory() as d:
        module_name = "temp"
        file_name = os.path.join(d, module_name + ".py")
        with open(file_name, "w") as f:
            f.write(code)
        # next, import/exec that file
        spec = importlib.util.spec_from_file_location(module_name, file_name)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        top_level_name = get_top_level_function_name(module,
                remove_names=["__builtins__", "__cached__", "__doc__",
                              "__file__", "__loader__", "__name__",
                              "__package__", "__spec__"])
        return getattr(module, top_level_name) 
[docs]def compute_dataset_hash(dataset):
    """Return hash value of dataset contents.
    Uses xxhash.xxh64 hash algorithm for performance, but this algorithm should
    not be considered cryptographically secure.
    Parameters
    ----------
    dataset : dict mapping str to pd.DataFrame
    """
    h = xxh64()
    for d in sorted(dataset.keys()):
        h.update(dataset[d].to_msgpack())
    return h.hexdigest() 
[docs]def myhash(obj):
    """Compute md5 checksum of string-like object."""
    if not isinstance(obj, bytes):
        obj_enc = obj.encode("utf-8")
    else:
        obj_enc = obj
    return md5(obj_enc).hexdigest() 
[docs]@contextmanager
def possibly_talking_action(action, verbose=True):
    """Wrap statements with description of their action.
    Simply prints action before executing statement, without a trailing
    newline, and prints 'done' afterwards.
    Parameters
    ----------
    action : str
        description of action
    verbose : bool, optional (default=True)
        whether to print anything at all
    Examples
    --------
    >>> with possibly_talking_action("Calling foo...", True):
            foo()
    Calling foo...done
    """
    if verbose:
        vprint = print
    else:
        def do_nothing(*args, **kwargs): pass
        vprint = do_nothing
    vprint(action, end='')
    try:
        yield
        vprint("done")
    except Exception:
        vprint("error")
        raise 
[docs]def is_positive_env(value):
    if value is not None:
        return value in ["yes", "Yes", "y", "Y", "true", "True", True, 1, "1",
            "totally"]
    return False