import sys
import os
import dill
import inspect
import importlib.util
from multiprocessing import Pool
from textwrap import dedent
from xxhash import xxh64
from tempfile import TemporaryDirectory
from hashlib import md5
from types import ModuleType
from contextlib import contextmanager
from pandas import concat
RANDOM_STATE = 1754
TRY_AGAIN_LATER = "Please try again later or contact administrator."
TRY_AGAIN = "Please contact administrator."
def _get_function_and_execute(f_dill, *args):
f = dill.loads(f_dill)
return f(*args)
[docs]def run_isolated(f, *args):
"""Execute `f(args)` in an isolated environment.
First, uses dill to serialize the function. Unfortunately, pickle is unable
to serialize some functions, so we must serialize and deserialize the
function ourselves.
"""
f_dill = dill.dumps(f)
with Pool(1) as pool:
return pool.apply(_get_function_and_execute, (f_dill, *args))
[docs]def get_source(function):
"""Extract the source code from a given function.
Recursively extracts the source code for all local functions called by given
function. The resulting source code is encoded in utf-8.
Limitations: Cannot use `get_source` on function defined interactively in
normal Python terminal. Functions defined interactively in IPython are still
okay.
Parameters
----------
function : function
"""
# Use nested function to allow us to ultimately encode as utf-8 string.
def _get_source(function):
out = []
func_code = function.__code__
func_globals = function.__globals__
func_name = function.__name__
# known limitation: cannot use from stdin
if func_code.co_filename == '<stdin>':
raise ValueError("Cannot use `get_source` on function defined interactively.")
for name in func_code.co_names:
if name != func_name:
obj = func_globals.get(name)
if obj and inspect.isfunction(obj):
out.append(_get_source(obj))
out.append(inspect.getsource(function))
seen = set()
return "\n".join(x for x in out if not (x in seen or seen.add(x)))
code = _get_source(function)
# post-processing
code = dedent(code)
code = code.encode("utf-8")
return code
[docs]def get_function(source):
"""Return a function from given source code.
This function is usually called on source code that was in turn produced by
get_source. Note that the source code produced by get_source includes the
source for the top-level function as well as any other local functions it
calls. Here, we return the top-level function directly.
Parameters
----------
source : str or bytes
"""
# decode into str
if isinstance(source, bytes):
code = source.decode("utf-8")
elif isinstance(source, str):
code = source
else:
raise ValueError
# exec code in empty namespace
try:
namespace = {}
exec(code, namespace)
except (SyntaxError, IndentationError) as e:
print(code)
raise e
# Get top-level function from list of functions
name = get_top_level_function_name(namespace)
return namespace[name]
[docs]def get_top_level_function_name(namespace, remove_names=["__builtins__"]):
"""Figure out which is the top-level function in a namespace.
The top-level function is defined as the function that is not a name in any
other functions. co_names is a tuple of local names. We could make more
efficient, using constant lookups of names, stopping when there is only name
left, and confirming this name is not called by anyone; but hard to
anticipate a situation where user defines function chain that is long enough
that this efficiency is required.
"""
if isinstance(namespace, dict):
names = list(namespace.keys())
def get_name(name):
return namespace[name]
elif isinstance(namespace, ModuleType):
names = dir(namespace)
def get_name(name):
return getattr(namespace, name)
else:
raise ValueError("Invalid argument")
for name in remove_names:
names.remove(name)
if not names:
raise ValueError("No function was defined in source.")
names_copy = list(names)
for name in list(names):
locals_ = get_name(name).__code__.co_names
for local in locals_:
if local != name and local in names:
names.remove(local)
# at this point, the only name remaining in names should be top-level
# function
if len(names) != 1:
print("Something went wrong.", file=sys.stderr)
print("\tnames (original): {}".format(names_copy), file=sys.stderr)
print("\tnames (modified): {}".format(names), file=sys.stderr)
raise ValueError
return names[0]
[docs]def get_function2(source):
"""Return a function from given source code.
This function is usually called on source code that was in turn produced by
get_source. This function differs from `get_function` in the method used is
to write the source code to a file and then import that file as a new
module.
Note that the source code produced by get_source includes the source for the
top-level function as well as any other local functions it calls. Here, we
return the top-level function directly.
Caveat: This does not solve the problem of being able to re-extract source
from the returned function. (Or, at least, as currently implemented.)
Parameters
----------
source : str, bytes
"""
# decode into str
if isinstance(source, bytes):
code = source.decode("utf-8")
elif isinstance(source, str):
code = source
else:
raise ValueError
# first, write source to a file
with TemporaryDirectory() as d:
module_name = "temp"
file_name = os.path.join(d, module_name + ".py")
with open(file_name, "w") as f:
f.write(code)
# next, import/exec that file
spec = importlib.util.spec_from_file_location(module_name, file_name)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
top_level_name = get_top_level_function_name(module,
remove_names=["__builtins__", "__cached__", "__doc__",
"__file__", "__loader__", "__name__",
"__package__", "__spec__"])
return getattr(module, top_level_name)
[docs]def compute_dataset_hash(dataset):
"""Return hash value of dataset contents.
Uses xxhash.xxh64 hash algorithm for performance, but this algorithm should
not be considered cryptographically secure.
Parameters
----------
dataset : dict mapping str to pd.DataFrame
"""
h = xxh64()
for d in sorted(dataset.keys()):
h.update(dataset[d].to_msgpack())
return h.hexdigest()
[docs]def myhash(obj):
"""Compute md5 checksum of string-like object."""
if not isinstance(obj, bytes):
obj_enc = obj.encode("utf-8")
else:
obj_enc = obj
return md5(obj_enc).hexdigest()
[docs]@contextmanager
def possibly_talking_action(action, verbose=True):
"""Wrap statements with description of their action.
Simply prints action before executing statement, without a trailing
newline, and prints 'done' afterwards.
Parameters
----------
action : str
description of action
verbose : bool, optional (default=True)
whether to print anything at all
Examples
--------
>>> with possibly_talking_action("Calling foo...", True):
foo()
Calling foo...done
"""
if verbose:
vprint = print
else:
def do_nothing(*args, **kwargs): pass
vprint = do_nothing
vprint(action, end='')
try:
yield
vprint("done")
except Exception:
vprint("error")
raise
[docs]def is_positive_env(value):
if value is not None:
return value in ["yes", "Yes", "y", "Y", "true", "True", True, 1, "1",
"totally"]
return False