Source code for bionic.decorators

"""
These are the decorators we expose to Bionic users.  They are used as follows:

    @builder
    @bionic.decorator1
    @bionic.decorator2
    ...
    def entity_name(arg1, arg2, ...):
        ...

"""

from .aip.task import TaskConfig as AipTaskConfig
from .datatypes import CodeVersion, CodeVersioningPolicy
from .decoration import decorator_updating_accumulator
from .descriptors.parsing import (
    nondraft_dnode_from_descriptor,
    entity_dnode_from_descriptor,
)
from .provider import (
    GatherProvider,
    AttrUpdateProvider,
    PyplotProvider,
    RenamingProvider,
    ArgDescriptorSubstitutionProvider,
    NewOutputDescriptorProvider,
)
from .utils.misc import oneline
from . import interpret


# TODO: Our current bytecode analysis emits warnings for any complex
# variable in any code, user-defined or library code. I'm going to
# suppress the warning by default for now. There are a few options for
# us to consider before we can turn it on by default.
# - Warn only if the complex variable is directly referenced from an
# entity function. The user will be able to make code changes when they
# see the warning.
# - Similar to Streamlit, give users more control over what code is
# analyzed. Streamlit currently watches everything in PYTHONPATH and
# gives an option to the users to blacklist certain files or modules.
[docs]def version( major=None, minor=None, ignore_bytecode=None, suppress_bytecode_warnings=None, ): """ Identifies the version of a Python function. The version has two components: a major version and a minor version. Each of these can be either an integer or a string, and defaults to ``0``. When you change the implementation of an entity function, you should update its version so that Bionic knows to whether invalidate any cached values of that function and re-compute them. An update in *major* version indicates a *functional* change: Bionic will assume that the function can return different output and won't use any cached artifacts created by a previous version of the function. An update in *minor* version indicates a *nonfunctional* change, such as a refactoring or performance optimization: Bionic will assume that the function behaves the same for all inputs, and will continue using cached artifacts as long as the major version still matches. Updating the minor version is only required when using Bionic's "assisted versioning" mode. You may also want to update the major version when there are changes in functions or libraries that the entity function calls, or in any external data source (like a database) that the function accesses. Parameters ---------- major: Integer or string (default 0) An arbitrary identifier for a function's behavior. minor: Integer or string (default 0) An arbitrary identifier for a function's nonfunctional characteristics. ignore_bytecode: Boolean (default False) Whether this entity's bytecode should be ignored. suppress_bytecode_warnings: Boolean (default False) Whether warnings from this entity's bytecode analysis should be ignored. Returns ------- Function: A decorator which can be applied to an entity function. """ # We don't replace any None values with defaults here; instead we let the # CodeVersion and CodeVersioningPolicy constructors apply their own # defaults. That way the defaults for this decorator are the same as for an # undecorated function. (However, we do have to make sure the documentation # of this decorator stays in sync with those defaults.) if not isinstance(ignore_bytecode, (bool, type(None))): raise ValueError( f"Argument ignore_bytecode must be a boolean; got {ignore_bytecode!r}" ) includes_bytecode = None if ignore_bytecode is None else not ignore_bytecode if not isinstance(suppress_bytecode_warnings, (bool, type(None))): message = f""" Argument suppress_bytecode_warnings must be a boolean; got {suppress_bytecode_warnings!r} """ raise ValueError(oneline(message)) return decorator_updating_accumulator( lambda acc: acc.wrap_provider( AttrUpdateProvider, "code_versioning_policy", CodeVersioningPolicy( version=CodeVersion( major=major, minor=minor, includes_bytecode=includes_bytecode, ), suppress_bytecode_warnings=suppress_bytecode_warnings, ), ) )
def version_no_warnings(major=None, minor=None): """ Same as the `@version` decorator, but it suppresses all bytecode warnings. """ if callable(major): func = major return version_no_warnings()(func) return version(major, minor, suppress_bytecode_warnings=True) # In the future I expect we'll have other caching options -- disabling in-memory # caching, allowing caching for shorter periods, etc. -- but I'm not sure what # the API should look like.
[docs]def persist(enabled): """ Indicates whether computed values should be cached persistently. Overrides the value of `core__persist_by_default` when set. Parameters ---------- enabled: Boolean Whether this entity's values should be persisted (e.g., to local disk). Returns ------- Function: A decorator which can be applied to an entity function. """ if not isinstance(enabled, bool): raise ValueError(f"Argument must be a boolean; got {enabled!r}") return decorator_updating_accumulator( lambda acc: acc.update_attr("should_persist", enabled, "@persist") )
[docs]def memoize(enabled): """ Indicates whether computed values should be cached in memory. Overrides the value of `core__memoize_by_default` when set. Parameters ---------- enabled: Boolean Whether this entity's values should be memoized. Returns ------- Function: A decorator which can be applied to an entity function. """ if not isinstance(enabled, bool): raise ValueError(f"Argument must be a boolean; got {enabled!r}") return decorator_updating_accumulator( lambda acc: acc.update_attr("should_memoize", enabled, "@memoize") )
[docs]def changes_per_run(enabled=None): """ Indicates whether this function is non-deterministic: i.e., if it’s called multiple times with the same inputs, can it return different outputs? When ``enabled`` is true, Bionic will recompute this function's value (and potentially the values of anything depending on it) each time this flow is instantiated, rather than reusing a value cached on disk. For example, if the function queries data from an external database, the results may be different each time even if the query stays the same. However, for practical reasons, Bionic won't compute a new value more than once within a single run. That is, once it's been computed for a particular Flow instance, that value will be saved in memory and reused. This is a compromise: logically it makes sense to recompute it every time, but it's much simpler to have a single fixed value for each entity within a given flow instance. For this reason, when this decorator is enabled, memoization must not be disabled for this entity. Note that ``@changes_per_run`` is not the same as ``@persist(False)``. For example, the following code will not necessarily query the database each time: .. code-block:: python @builder @bn.persist(False) @builder def current_data(): return download_data() @builder def summary(current_data): return summarize(current_data) This fails because if we call ``flow.get('summary')`` and Bionic finds a cached value, it will return the cached value because it doesn't know that current_data ought to be recomputed. On the other hand, if we replace ``persist(False)`` with ``changes_per_run`` -- as in the example below -- then ``current_data`` will be recomputed each time (and ``summary`` will be recomputed if ``current_data`` changes). Parameters ---------- enabled: Boolean, optional (default ``True``) Whether this function's output changes per run. Returns ------- Function: A decorator which can be applied to an entity function. Example usage: .. code-block:: python @builder @bn.changes_per_run @builder def current_data(): return download_data() @builder def summary(current_data): return summarize(current_data) """ if callable(enabled): func = enabled return changes_per_run()(func) if enabled is None: enabled = True if not isinstance(enabled, bool): raise ValueError(f"Argument must be a boolean; got {enabled!r}") return decorator_updating_accumulator( lambda acc: acc.wrap_provider(AttrUpdateProvider, "changes_per_run", enabled) )
[docs]def output(name): """ Renames an entity. The entity function must have a single value. When this is used to decorate an entity function, the provided name is used as the entity name, instead of using the function's name. Parameters ---------- name: String The new name for the entity. Returns ------- Function: A decorator which can be applied to an entity function. """ return decorator_updating_accumulator( lambda acc: acc.wrap_provider(RenamingProvider, name) )
[docs]def outputs(*names): """ Indicates that a result produces a (fixed-size) collection of values, and assigns a name to each value. When this is used to decorate an entity function, the function will actually define multiple entities, one for each provided name. The decorated function must return a sequence with exactly as many values as the provided list of names. Any other decorators which would normally modify the definition of the entity (such as protocols) will be applied to each of the final entities. Parameters ---------- names: Sequence of strings The names of the defined entities. Returns ------- Function: A decorator which can be applied to an entity function. """ return returns(",".join(names) + ",")
[docs]def docs(*docs): """ Assigns documentation strings to the entities defined by the decorated function. Typically used in conjuction with ``@outputs`` for functions that return multiple entity values. (In the more common case where your function returns a single entity value, you can just use a regular Python docstring.) Parameters ---------- docs: Sequence of strings Documentation strings for each of the defined entities. Returns ------- Function: A decorator which can be applied to an entity function. """ return decorator_updating_accumulator( lambda acc: acc.update_attr("docs", docs, "@docs") )
[docs]def gather(over, also=None, into="gather_df"): """ Gathers multiple instances of entities into a single dataframe. Gathers all values of the ``over`` entity (or entities) along with associated values of the ``also`` entity (or entities) into a single dataframe, which is provided to the decorated function as an argument whose name is determined by ``into``. Parameters ---------- over: String or sequence of strings Primary names to collect. Any cases that differ only in these names will be grouped together in the same frame. also: String or sequence of strings Secondary names to include. These entity values are added to the frame but don't affect the grouping. into: String, optional (default ``'gather_df'``) The argument name of the gathered frame. Returns ------- Function: A decorator which can be applied to a entity function. Example usage: .. code-block:: python builder = FlowBuilder('my_flow') builder.assign('color', values=['red', 'blue']) builder.assign('shape', values=['square', 'circle']) @builder def colored_shape(color, shape): return color + ' ' + shape @builder @gather('color', 'colored_shape', 'df') def all_color_shapes(df): return ', '.join(df.colored_shape.sort_values()) flow = builder.build() flow.get('colored_shape', set) # Returns {'red square', 'blue square', 'red circle', 'blue circle'} flow.get('all_color_shapes', set) # Returns {'blue square, red square', 'blue circle, red circle'} # Note that the colored shapes are gathered into two groups: within # each group, the color varies but the shape does not. """ over = interpret.str_or_seq_as_list(over) also = interpret.str_or_seq_or_none_as_list(also) return decorator_updating_accumulator( lambda acc: acc.wrap_provider( GatherProvider, primary_names=over, secondary_names=also, gathered_dep_name=into, ) )
[docs]def pyplot(name=None, savefig_kwargs=None): """ Provides a Matplotlib pyplot module to the decorated entity function. By default the module is provided as an argument named ``"pyplot"``, but this can be changed with the ``name`` argument. The entity's Python function should use the pyplot module to create a plot, but should not return any values. The output of the final entity will be a ``Pillow.Image`` containing the plot. Parameters ---------- name: String, optional (default "pyplot") The argument name of the module provided to the decorated function. savefig_kwargs: Dict, optional Additional arguments to pass to `matplotlib.pytplot.savefig` when converting the plot to an image. By default, passes ``format=png`` and ``bbox_inches="tight"``; any arguments passed in this dict will override the default values. Returns ------- Function: A decorator which can be applied to an entity function. """ if callable(name): func = name return pyplot()(func) if name is None: name = "pyplot" return decorator_updating_accumulator( lambda acc: acc.wrap_provider(PyplotProvider, name, savefig_kwargs) )
def accepts(**descriptors_by_arg_name): """ Indicates that some of the decorated function's arguments should be supplied according to certain descriptors. Each keyword argument to this decorator corresponds to an argument of the decorated function -- that argument's value will correspond to the descriptor passed to the decorator. For example: @builder @accepts(pair="x, y") def f(pair, z): `f` will be called with two arguments: the first (`pair`) will be a tuple containing the values of entities `x` and `y`, and the second (`z`) will just be the value of entity `z`. This decorator is currently experimental and does not have any additional user-facing documentation. It may change in non-backwards-compatible ways. """ outer_dnodes_by_inner = { entity_dnode_from_descriptor(arg_name): nondraft_dnode_from_descriptor( descriptor ) for arg_name, descriptor in descriptors_by_arg_name.items() } return decorator_updating_accumulator( lambda acc: acc.wrap_provider( ArgDescriptorSubstitutionProvider, outer_dnodes_by_inner ) ) def returns(out_descriptor): """ Indicates that the decorated function returns a value corresponding to the provided descriptor. This decorator is currently experimental and does not have any additional user-facing documentation. It may change in non-backwards-compatible ways. """ nondraft_out_dnode = nondraft_dnode_from_descriptor(out_descriptor) return decorator_updating_accumulator( lambda acc: acc.wrap_provider(NewOutputDescriptorProvider, nondraft_out_dnode) )
[docs]def run_in_aip(machine, worker_count=None, worker_machine=None): """ Indicates that the decorated function should be computed in AIP. This decorator requires AIP based distributed execution to be enabled, which can be done by setting ``core__aip_execution__enabled`` core entity. This decorator is currently experimental and does not have any additional user-facing documentation. It may change in non-backwards-compatible ways. Parameters ---------- machine: String The machine type that should be used to compute the function on AIP. worker_count: String, optional The number of workers that should be used to compute the function on AIP. worker_machine: String, optional The machine type that should be used by the worker nodes. Returns ------- Function: A decorator which can be applied to an entity function. """ config = AipTaskConfig( machine=machine, worker_count=worker_count, worker_machine=worker_machine, ) return decorator_updating_accumulator( lambda acc: acc.wrap_provider(AttrUpdateProvider, "aip_task_config", config) )
immediate = persist(False) immediate.__doc__ = """ Guarantees that an entity can be computed during bootstrap resolution. Currently ``@immediate`` is equivalent to ``@persist(False)``. """