"""
==============================================
Composite, Composer, and MetaComposer Classes
==============================================
"""
import abc
import copy
from typing import Dict, Any, Optional, Iterable, List
from vivarium.core.process import (
_override_schemas, assoc_in, _get_parameters, Process)
from vivarium.core.store import (
Store, generate_state)
from vivarium.core.types import (
Processes, Topology, HierarchyPath, State, Schema, Steps, Flow)
from vivarium.library.datum import Datum
from vivarium.library.dict_utils import (
deep_merge, deep_merge_check, deep_copy_internal)
from vivarium.library.topology import inverse_topology
def _get_composite_state_recur(
processes: Processes,
steps: Steps,
topology: Any,
state_type: Optional[str] = 'initial',
path: Optional[HierarchyPath] = None,
config: Optional[dict] = None,
) -> Optional[State]:
path = path or tuple()
config = config or {}
processes = processes or {}
steps = steps or {}
topology = topology or {}
state: dict = {}
all_keys = set(processes.keys() | steps.keys())
for key in all_keys:
sub_path: HierarchyPath = path + (key,)
sub_topology: Any = topology.get(key)
sub_processes: Any = processes.get(key)
sub_steps: Any = steps.get(key)
sub_state: Any = None
if isinstance(sub_processes, dict) or isinstance(sub_steps, dict):
sub_state = _get_composite_state_recur(
processes=sub_processes,
steps=sub_steps,
topology=sub_topology,
state_type=state_type,
path=sub_path,
config=config.get(key),
)
elif isinstance(sub_processes, Process) or \
isinstance(sub_steps, Process):
node: Process = sub_processes or sub_steps
if state_type == 'initial':
process_state = node.initial_state(config.get(node.name))
elif state_type == 'default':
process_state = node.default_state()
# Prevent multiupdates from forming when a single process has
# multiple ports to the same stores holding a dictionary
sub_state = inverse_topology(
path, process_state, sub_topology, multi_updates=False)
else:
Exception(f'invalid processes {sub_processes} or steps {sub_steps}')
state = deep_merge(state, sub_state)
return state
def _get_composite_state(
processes: Processes,
steps: Steps,
topology: Any,
state_type: Optional[str] = 'initial',
path: Optional[HierarchyPath] = None,
initial_state: Optional[State] = None,
config: Optional[dict] = None,
) -> Optional[State]:
state = _get_composite_state_recur(
processes=processes,
steps=steps,
topology=topology,
state_type=state_type,
path=path,
config=config,
)
state = deep_merge(state, initial_state)
return state
[docs]class Composite(Datum):
"""Composite parent class.
Contains keys for processes and topology
"""
processes: Processes = {}
steps: Steps = {}
flow: Flow = {}
topology: Topology = {}
state: State = {}
defaults: Dict[str, Any] = {
'processes': {},
'steps': {},
'flow': {},
'topology': {},
'state': {},
}
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
store: Optional[Store] = None,
processes: Optional[Processes] = None,
steps: Optional[Steps] = None,
flow: Optional[Flow] = None,
topology: Optional[Topology] = None,
state: Optional[State] = None,
) -> None:
if not config:
if store:
composite = get_composite_from_store(store)
processes = composite.processes
topology = composite.topology
steps = composite.steps
flow = composite.flow
state = composite.state
config = {
'processes': processes or {},
'topology': topology or {},
'steps': steps or {},
'flow': flow or {},
'state': state or {}
}
super().__init__(config)
self._schema = config.get('_schema', {})
processes_and_steps = deep_copy_internal(self.processes)
deep_merge_check(processes_and_steps, self.steps)
_override_schemas(self._schema, processes_and_steps)
[docs] def generate_store(self, config: Optional[dict] = None) -> Store:
config = config or {}
initial_state = self.initial_state(config)
return generate_state(
processes=self.processes,
steps=self.steps,
topology=self.topology,
flow=self.flow,
initial_state=initial_state)
[docs] def initial_state(self, config: Optional[dict] = None) -> Optional[State]:
""" Merge all processes' initial states
Arguments:
config (dict): A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
(dict): Subclass implementations must return a dictionary
mapping state paths to initial values.
"""
config = config or {}
initial_state = config.get('initial_state', {})
initial_state = deep_merge(copy.deepcopy(self.state), initial_state)
return _get_composite_state(
processes=self.processes,
steps=self.steps,
topology=self.topology,
state_type='initial',
initial_state=initial_state,
config=config)
[docs] def default_state(self, config: Optional[dict] = None) -> Optional[State]:
""" Merge all processes' default states
Arguments:
config (dict): A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
(dict): Subclass implementations must return a dictionary
mapping state paths to default values.
"""
return _get_composite_state(
processes=self.processes,
steps=self.steps,
topology=self.topology,
state_type='default',
config=config)
[docs] def merge(
self,
composite: Optional['Composite'] = None,
processes: Optional[Dict[str, 'Process']] = None,
topology: Optional[Topology] = None,
steps: Optional[Steps] = None,
flow: Optional[Flow] = None,
state: Optional[State] = None,
path: Optional[HierarchyPath] = None,
schema_override: Optional[Schema] = None,
) -> None:
composite = composite or Composite({})
processes = processes or {}
topology = topology or {}
steps = steps or {}
flow = flow or {}
path = path or tuple()
state = state or {}
schema_override = schema_override or {}
# get the processes and topology to merge
merge_processes = {}
merge_topology = {}
merge_steps = {}
merge_flow = {}
merge_state = {}
if composite:
merge_processes.update(composite['processes'])
merge_topology.update(composite['topology'])
merge_steps.update(composite['steps'])
merge_flow.update(composite['flow'])
merge_state.update(composite.get('state', {}))
deep_merge(merge_processes, processes)
deep_merge(merge_topology, topology)
deep_merge(merge_steps, steps)
deep_merge(merge_flow, flow)
deep_merge(merge_state, state)
merge_processes = assoc_in({}, path, merge_processes)
merge_topology = assoc_in({}, path, merge_topology)
merge_steps = assoc_in({}, path, merge_steps)
merge_flow = assoc_in({}, path, merge_flow)
merge_state = assoc_in({}, path, merge_state)
# merge with instance processes and topology
deep_merge(self.processes, merge_processes)
deep_merge(self.topology, merge_topology)
deep_merge(self.steps, merge_steps)
deep_merge(self.flow, merge_flow)
deep_merge(self.state, merge_state)
self._schema.update(schema_override)
processes_and_steps = deep_copy_internal(self.processes)
deep_merge_check(processes_and_steps, self.steps)
_override_schemas(self._schema, processes_and_steps)
[docs] def get_parameters(self) -> Dict:
"""Get the parameters for all :term:`processes`.
Returns:
A map from process names to parameters.
"""
return _get_parameters(self.processes)
[docs]def get_composite_from_store(store: Store) -> Composite:
"""Make a :term:`Composite` from a :term:`Store`"""
return Composite(
processes=store.get_processes(),
topology=store.get_topology(),
steps=store.get_steps(),
flow=store.get_flow(),
state=store.get_value(),
)
[docs]class Composer(metaclass=abc.ABCMeta):
defaults: Dict[str, Any] = {}
def __init__(self, config: Optional[dict] = None) -> None:
"""Base class for :term:`composer` classes.
Composers generate :term:`composites`.
All :term:`composer` classes must inherit from this class.
Args:
config: Dictionary of configuration options that can
override the class defaults.
"""
config = config or {}
if 'name' in config:
self.name = config['name']
elif not hasattr(self, 'name'):
self.name = self.__class__.__name__
self.config = copy.deepcopy(self.defaults)
self.config = deep_merge(self.config, config)
self.schema_override = self.config.pop('_schema', {})
[docs] def generate_store(self, config: Optional[dict] = None) -> Store:
composite = self.generate()
return composite.generate_store(config)
[docs] @abc.abstractmethod
def generate_processes(
self,
config: Optional[dict]) -> Processes:
"""Generate processes dictionary.
Every subclass must override this method. For backwards
compatibility, :py:class:`vivarium.core.process.Step` objects
may be included in the returned dictionary, but this practice is
discouraged and may be disallowed in a future release.
Args:
config: A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
Subclass implementations must return a dictionary
mapping process names to instantiated and configured
:py:class:`vivarium.core.process.Process` objects.
"""
return {} # pragma: no cover
[docs] def generate_steps(self, config: Optional[dict]) -> Steps:
'''Generate the steps dictionary.
Subclasses that want to include :term:`steps` should override
this method. This method is the preferred way to specify steps,
though they may also be returned by
:py:meth:`generate_processes`.
Args:
config: A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
Subclass implementations should return a dictionary mapping
step names to instantiated and configured
:py:class:`vivarium.core.process.Step` objects.
'''
_ = config
return {} # pragma: no cover
[docs] def generate_flow(self, config: Optional[dict]) -> Flow:
'''Generate the flow of :term:`step` dependencies.
Args:
config: A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
Subclass implementations should return a dictionary mapping
step names to sequences (e.g. lists or tuples) of
:term:`paths`. **Steps with no dependencies must be
included,** but they should be mapped to an empty sequence.
Any steps returned by :py:meth:`generate_steps` or
:py:meth:`generate_processes` that are not included in the
flow will be treated as if they depend on every step
previously added to the :term:`engine`.
'''
_ = config
return {} # pragma: no cover
[docs] @abc.abstractmethod
def generate_topology(self, config: Optional[dict]) -> Topology:
"""Generate topology dictionary.
Every subclass must override this method.
Args:
config: A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
Subclass implementations must return a :term:`topology`
dictionary.
"""
return {} # pragma: no cover
[docs] def generate(
self,
config: Optional[dict] = None,
path: HierarchyPath = ()) -> Composite:
"""Generate processes and topology dictionaries.
Args:
config: Updates values in the configuration declared
in the constructor.
path: Tuple with ('path', 'to', 'level') associates
the processes and topology at this level.
Returns:
Dictionary with the following keys
* ``processes``: Generated by :py:meth:`generate_processes`.
* ``steps``: Generated by :py:meth:`generate_steps`.
* ``flow``: Generated by :py:meth:`generate_flow`.
* ``topology``: Generated by :py:meth:`generate_topology`.
The values of these keys are all dictionaries suitable to be
passed to the constructor for
:py:class:`vivarium.core.engine.Engine`.
"""
if config is None:
config = self.config
else:
default = copy.deepcopy(self.config)
config = deep_merge(default, config)
processes = self.generate_processes(config)
steps = self.generate_steps(config)
flow = self.generate_flow(config)
topology = self.generate_topology(config)
processes_and_steps = deep_copy_internal(processes)
deep_merge_check(processes_and_steps, steps)
_override_schemas(self.schema_override, processes_and_steps)
return Composite({
'processes': assoc_in({}, path, processes),
'steps': assoc_in({}, path, steps),
'flow': assoc_in({}, path, flow),
'topology': assoc_in({}, path, topology),
})
[docs] def initial_state(self, config: Optional[dict] = None) -> Optional[State]:
""" Merge all processes' initial states
Every subclass may override this method.
Arguments:
config (dict): A dictionary of configuration options. All
subclass implementation must accept this parameter, but
some may ignore it.
Returns:
dict: Subclass implementations must return a dictionary
mapping state paths to initial values.
"""
composite = self.generate(config)
return composite.initial_state(config)
[docs] def get_parameters(self) -> dict:
"""Get the parameters for all :term:`processes`.
Returns:
A map from process names to dictionaries of those processes'
parameters.
"""
composite = self.generate()
return composite.get_parameters()