Flow and FlowBuilder

Introduction

FlowBuilder and Flow are the primary interfaces for constructing and running Bionic flows. Either of them can be used to represent the collection of interdependent entities that make up a single analysis. The difference is that a FlowBuilder is a mutable object which can be updated, while a Flow is an immutable object which can perform computation.

The typical pattern is to start with an empty FlowBuilder, incrementally add entity definitions to it, then use FlowBuilder.build() to generate a Flow. This Flow can be used immediately to compute entity values, or passed to other code, which might reconfigure or extend it.

Although Flow objects are immutable, there is a mechanism for modifying them: instead of a method like set that mutates the Flow, there is a setting method that returns a new copy with the requested change. This allows Flows to be easily customized without worrying about shared state. However, this API can only be used to update existing entities; if you want to define new entities, you’ll need to convert the Flow back to a FlowBuilder using to_builder.

See the Concepts documentation for more details.

FlowBuilder API

class bionic.FlowBuilder(name, _config=None)[source]

A mutable builder for Flows.

Allows Flow objects to be constructed incrementally. Use declare, assign, set, and/or __call__ to add entities to the builder, then use build to convert it into a Flow.

Parameters

name (String) – Identifies the flow and provides a namespace for cached data.

add_case(*name_values)[source]

Adds a “case”: a collection of associated values for a set of entities.

Assigning entity values by case is an alternative to set (or assign). If set is used to set multiple values for some entities, then every combination of those values will be considered for downstream entities. On the other hand, if add_case is used, only the specified combinations will be considered.

Example Using assign:

builder = FlowBuilder()

builder.assign('first_name', values=['Alice', 'Bob'])
builder.assign('last_name', values=['Smith', 'Jones'])

@builder
def full_name(first_name, last_name):
    return first_name + ' ' + last_name

# Prints: {'Alice Jones', 'Alice Smith', 'Bob Jones', 'Bob Smith'}
print(builder.build().get('full_name', set))

Example using add_case:

builder = FlowBuilder()

builder.declare('first_name')
builder.declare('last_name')

builder.add_case('first_name', 'Alice', 'last_name', 'Jones')
builder.add_case('first_name', 'Alice', 'last_name', 'Smith')
builder.add_case('first_name', 'Bob', 'last_name', 'Smith')

@builder
def full_name(first_name, last_name):
    return first_name + ' ' + last_name

print(builder.build().get('full_name', set))
# Prints: {'Alice Jones', 'Alice Smith', 'Bob Smith'}

All entities must already exist. They may have existing values, but those values must have been set case-by-case with the same structure as this call.

Parameters

name_values (String/Object) – Alternating entity names and values.

Returns

An object which can be used to set values on additional entities with this case.

Return type

FlowCase

assign(name, value=None, values=None, protocol=None, doc=None, docstring=None, persist=None)[source]

Creates a new entity and assigns it a value.

Exactly one of value or values must be provided. The entity must not already exist.

Parameters
  • name (String) – The name of the new entity.

  • value (Object, optional) – A single value for the entity.

  • values (Sequence, optional) – A sequence of values for the entity.

  • protocol (Protocol, optional) – The entity’s protocol. The default is a smart type-detecting protocol.

  • doc (String, optional) – Description of the new entity.

  • persist (Boolean, optional) – Whether this entity’s values should be cached persistently. The only reason to set this to False is if an internal Bionic entity depends on it; in this case, persistence is impossible because Bionic’s cache won’t be constructed by the time this entity is calculated. The downside of setting this to False is that it won’t be possible to retrieve a serialized file for this entity using the mode argument to Flow.get.

build()[source]

Constructs a Flow object from this builder’s configuration.

The returned flow is immutable and will not be affected by future changes to this builder’s configuration.

clear_cases(*names)[source]

Removes all values assigned to one or more entities.

The values will still exist, but not have any values, as if they had just been created with declare. If any of the entities were set in a group using add_case, they must all be cleared together.

Parameters

names (Sequence of strings) – The entities whose values should be cleared.

declare(name, protocol=None, doc=None, docstring=None, persist=None)[source]

Creates a new entity but does not assign it a value.

The entity must not already exist.

Parameters
  • name (String) – The name of the new entity.

  • protocol (Protocol, optional) – The entity’s protocol. The default is a smart type-detecting protocol.

  • doc (String, optional) – Description of the new entity.

  • persist (Boolean, optional) – Whether this entity’s values should be cached persistently. The only reason to set this to False is if an internal Bionic entity depends on it; in this case, persistence is impossible because Bionic’s cache won’t be constructed by the time this entity is calculated. The downside of setting this to False is that it won’t be possible to retrieve a serialized file for this entity using the mode argument to Flow.get.

delete(*names)[source]

Deletes one or more entities.

If any of the entities were set in a group using add_case, they must all be cleared together.

Parameters

names (Sequence of strings) – The entities to be deleted.

derive(func)[source]

(Deprecated) An alias for __call__; use that instead.

merge(flow, keep='error', allow_name_match=False)[source]

Updates this builder by importing all entities from another flow.

If any incoming entity has the same name as an existing entity, the conflict is resolved by apply the following rules, in order:

  1. The name (core__flow_name) of this builder is never changed; the original value is always kept.

  2. Entities that were set by default (not explicitly set by the user) are never imported and can be overwritten.

  3. Assignments (definitions with values) take precedence over declarations (definitions with no values).

  4. Otherwise, the keep parameter can be used to specify which entity to keep.

Parameters
  • flow (Flow) – Any Bionic Flow.

  • keep ('error', 'self', or 'arg' (default: 'error')) –

    How to handle conflicting entity names. Options:

    • ’error’: throw an AlreadyDefinedEntityError

    • ’self’ or ‘old’: use the definition from this builder

    • ’arg’ or ‘new’: use the definition from flow

  • allow_name_match (boolean (default: False)) – Allows the incoming flow to have the same name as this builder. (If this is False, we handle duplicate names by throwing an exception. It’s technically possible to share a name between flows, but it’s generally not good practice.)

set(name, value=None, values=None)[source]

Sets the value of an existing entity.

Exactly one of value or values must be provided. The entity must already exist and may already have a value (which will be overwritten).

Parameters
  • name (String) – The name of the new entity.

  • value (Object, optional) – A single value for the entity.

  • values (Sequence, optional) – A sequence of values for the entity.

FlowCase API

class bionic.flow.FlowCase(builder, key)[source]

A specific case for which entities can have associated values.

These should be constructed by the FlowBuilder object, not by users.

then_set(name, value)[source]

Sets a single value for an entity for this case.

Flow API

class bionic.Flow(config, _official=False)[source]

An immutable workflow object. You can use get() to compute any entity in the workflow, or setting() to create a new workflow with modifications. Not all modifications are possible with this interface, but to_builder() can be used to get a mutable FlowBuilder version of a Flow.

adding_case(*name_values)[source]

Like FlowBuilder.add_case, but returns a new copy of this flow.

all_entity_names(include_core=False)[source]

Returns a list of all declared entity names in this flow.

Parameters

include_core (Boolean, optional (default false)) – Include internal entities used for Bionic infrastructure.

assigning(name, value=None, values=None, protocol=None)[source]

Like FlowBuilder.assign, but returns a new copy of this flow.

clearing_cases(*names)[source]

Like FlowBuilder.clear_cases, but returns a new copy of this flow.

declaring(name, protocol=None)[source]

Like FlowBuilder.declare, but returns a new copy of this flow.

entity_doc(name)[source]

Returns the doc for the named entity if one is defined, otherwise return None.

Parameters

name (String) – The name of an entity.

entity_docstring(name)[source]

(Deprecated in favor of entity_doc.) Returns the doc for the named entity if one is defined, otherwise return None.

Parameters

name (String) – The name of an entity.

entity_protocol(name)[source]

Returns the protocol for a given entity.

Parameters

name (String) – The name of an entity.

export(name, file_path=None, dir_path=None)[source]

Provides access to the persisted file corresponding to an entity. Note: this method is deprecated and the same functionality is available through Flow#get.

Can be called in three ways:

# Returns a path to the persisted file.
export(name)

# Copies the persisted file to the specified file path.
export(name, file_path=path)

# Copies the persisted file to the specified directory.
export(name, dir_path=path)

The entity must be persisted and have only one instance. The dir_path and file_path options support paths on GCS, specified like: gs://mybucket/subdir/

get(name, collection=None, fmt=None, mode=<class 'object'>)[source]

Computes the value(s) associated with an entity.

If the entity has multiple values, the collection parameter indicates how to handle them. It can have any of the following values:

  • None: return a single value or throw an exception

  • list or 'list': return a list of values

  • set or 'set': return a set of values

  • pandas.Series or 'series': return a series whose index is the root cases distinguishing the different values

The user can specify the type of object (implicitly specifying in-memory vs. persisted data) to return in the collection using the mode parameter. It can have any of the following values: * object or 'object' for a value in-memory * 'FileCopier' for a wrapper for a path to the persisted file for the computed entity * Path or 'path' for a path to persisted file * 'filename' for a string representing a path to a persisted file

Parameters
  • name (String) – The name of an entity.

  • collection (String or type, optional, default is None) – The data structure to use if the entity has multiple values.

  • fmt (String or type, optional, default is None) – The data structure to use if the entity has multiple values. Deprecated in favor of collection and will be removed in future release.

  • mode (String or type, optional, default is object) – The type of object to return in the collection.

Return type

The value of the entity, or a collection containing its values.

merging(flow, keep='error')[source]

Like FlowBuilder.merge, but returns a new copy of this flow.

property name

Returns the name of this flow.

reload()[source]

Attempts to reload all modules used directly by this flow, updates this flow instance in place, and then returns the flow instance.

For safety, this only works if this flow meets the following requirements:

  • is the first Flow built by its FlowBuilder

  • has never been modified (i.e., isn’t derived from another Flow)

  • is assigned to a top-level variable in a module that one of its functions is defined in

You will need to use versioning to ensure that any code changes are detected properly. Otherwise, the flow may keep using cached values from previous versions of the code.

The most straightforward way to meet these requirements is to define your flow in a module as:

builder = ...

@builder
def ...

...

flow = builder.build()

and then import in the notebook like so:

from mymodule import flow
...
flow.reload()
flow.get('my_entity')

This will update the flow instance to use the reloaded modules before doing the get().

This resets the flow’s in-memory cache. Entities that have not been persisted to disk, or that have been marked with @changes_per_run, will be recomputed.

reloading()[source]

Returns a new copy of this flow in in which all the modules used directly by the flow are reloaded.

This method is similar to the reload() method, but the existing flow instance remains unchanged.

Please see the comments on reload() method for safety requirements.

render_dag(include_core=False, vertical=False, curvy_lines=False, _include_detail=False)[source]

Returns a FlowImage with a visualization of this flow’s DAG. This object behaves similarly to a Pillow Image object.

Will fail if Graphviz is not installed on the system.

setting(name, value=None, values=None)[source]

Like FlowBuilder.set, but returns a new copy of this flow.

then_setting(name, value)[source]

Like FlowCase.then_set, but returns a new copy of this flow.

Use after calling Flow.adding_case.

to_builder()[source]

Returns a FlowBuilder with a copy of this Flow’s configuration.

Since this flow is immutable, it won’t be affected by any changes to the returned builder.

Cache API

class bionic.cache_api.Cache(deriver)[source]

A programmatic interface to Bionic’s persistent cache.

Accessible as an attribute named cache on a Flow object. Use get_entries to iterate through the set of cache entries.

get_entries()[source]

Returns a sequence of CacheEntry objects, one for each artifact in Bionic’s persistent cache.

Cached artifacts are stored by flow name, so this will include any artifacts generated by a flow with the same name as this one; this typically includes the current Flow object, as well as any older or modified versions.

Artifacts are returned for all cache tiers that are enabled for the current flow. For example, if GCS caching is enabled, this method will return entities from both the “local” (on-disk”) and “cloud” (GCS) tiers.

CacheEntry API

class bionic.cache_api.CacheEntry(cache, inv_item)[source]

Represents an artifact in Bionic’s persistent cache.

Has the following fields:

  • tier: “local” or “cloud”, depending on which tier of the cache the artifact is in.

  • entity: the name of the cached entity, or None if the artifact is does not correspond to an entity

  • artifact_url: a URL to the cached artifact file or blob

  • metadata_url: a URL to the metadata file or blob describing the artifact

  • artifact_path: a Path object locating the artifact file (if it’s a local file) or None (if it’s a cloud blob)

  • metadata_path: a Path object locating the metadata file (if it’s a local file) or None (if it’s a cloud blob)

delete()[source]

Safely deletes the artifact and its metadata from the cache.

Returns True if the artifact was deleted and False if it was not found. Throws a CacheEntryDeletionFailureError if the deletion fails.

(Note that if two entries refer to the same artifact and delete is called on both, the first call with return True and the second will return False.)