Process Classes

vivarium.core.process.Deriver

Deriver is just an alias for Step now that Derivers have been deprecated.

alias of vivarium.core.process.Step

class vivarium.core.process.ParallelProcess(process: vivarium.core.process.Process, profile: bool = False, stats_objs: Optional[List[pstats.Stats]] = None)[source]

Bases: vivarium.core.process.Process

Wraps a Process for multiprocessing.

To run a simulation distributed across multiple processors, we use Python’s multiprocessing tools. This object runs in the main process and manages communication between the main (parent) process and the child process with the Process that this object manages.

Most methods pass their name and arguments to Process.run_command.

Parameters
  • process – The Process to manage.

  • profile – Whether to use cProfile to profile the subprocess.

  • stats_objs – List to add cProfile stats objs to when process is deleted. Only used if profile is true.

calculate_timestep(states: Optional[Dict[str, Any]])float[source]
property condition_path
end()None[source]

End the child process.

If profiling was enabled, then when the child process ends, it will compile its profiling stats and send those to the parent. The parent then saves those stats in self.stats.

generate_flow(config: Optional[dict] = None)Dict[str, Sequence[Tuple[str, ]]][source]
generate_processes(config: Optional[dict] = None)Dict[str, Any][source]
generate_steps(config: Optional[dict] = None)Dict[str, Any][source]
generate_topology(config: Optional[dict] = None)Dict[str, Union[Tuple[str, ], dict, object]][source]
get_command_result()Dict[str, Any][source]

Get the result of a command sent to the parallel process.

Commands and their results work like a queue, so unlike Process, you can technically call this method multiple times and get different return values each time. This behavior is subject to change, so you should not rely on it.

Returns

The command result.

get_private_state()Dict[str, Any][source]
initial_state(config: Optional[dict] = None)Dict[str, Any][source]
is_step()bool[source]
merge_overrides(override: Dict[str, Any])None[source]
next_update(timestep: float, states: Dict[str, Any])Dict[str, Any][source]
property parameters
ports_schema()Dict[str, Any][source]
property schema
property schema_override
send_command(command: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None, run_pre_check: bool = True)None[source]

Send a command to the parallel process.

See :py:func:_handle_parallel_process for details on how the command will be handled.

update_condition(timestep: float, states: Dict[str, Any])bool[source]
class vivarium.core.process.Process(parameters: Optional[dict] = None)[source]

Bases: object

Process parent class.

All process classes must inherit from this class. Each class can provide a defaults class variable to specify the process defaults as a dictionary.

Note that subclasses should call the superclass init function first. This allows the superclass to correctly save the initial parameters before they are mutated by subclass constructor code. We need access to the original parameters for serialization to work properly.

Parameters

parameters

Override the class defaults. This dictionary may also contain the following special keys:

ATTRIBUTE_READ_COMMANDS = ('schema_override', 'parameters', 'condition_path', 'schema')
ATTRIBUTE_WRITE_COMMANDS = ('set_schema',)
METHOD_COMMANDS = ('initial_state', 'generate_processes', 'generate_steps', 'generate_topology', 'generate_flow', 'merge_overrides', 'calculate_timestep', 'is_step', 'get_private_state', 'ports_schema', 'next_update', 'update_condition')
calculate_timestep(states: Optional[Dict[str, Any]]) → Union[float, int][source]

Return the next process time step

A process subclass may override this method to implement adaptive timesteps. By default it returns self.parameters[‘timestep’].

property condition_path
default_state()Dict[str, Any][source]

Get the default values of the variables in each port.

The default values are computed based on the schema.

Returns

A state dictionary that assigns each variable’s default value to that variable.

defaults: Dict[str, Any] = {}
generate(config: Optional[dict] = None, path: Tuple[str, ] = ())Dict[source]
generate_flow(config: Optional[dict] = None)Dict[str, Sequence[Tuple[str, ]]][source]
generate_processes(config: Optional[dict] = None)Dict[str, Any][source]

Do not override this method.

generate_steps(config: Optional[dict] = None)Dict[str, Any][source]

Do not override this method.

generate_topology(config: Optional[dict] = None)Dict[str, Union[Tuple[str, ], dict, object]][source]

Do not override this method.

get_command_result() → Any[source]

Retrieve the result from the last-run command.

Returns

The result of the last command run. Note that this method should only be called once immediately after each call to send_command().

Raises

RuntimeError – When there is no command pending. This can happen when this method is called twice without an intervening call to send_command().

get_private_state()Dict[str, Any][source]

Get the process’s private state.

Processes can store state in instance variables instead of in the stores that hold the simulation-wide state. These instance variables hold a private state that is not shared with other processes. You can override this function to let experiments emit the process’s private state.

Returns

An empty dictionary. You may override this behavior to return your process’s private state.

get_schema(override: Optional[Dict[str, Any]] = None)dict[source]

Get the process’s schema, optionally with a schema override.

Parameters

override – Override schema

Returns

The combined schema.

initial_state(config: Optional[dict] = None)Dict[str, Any][source]

Get initial state in embedded path dictionary.

Every subclass may override this method.

Parameters

config (dict) – A dictionary of configuration options. All subclass implementation must accept this parameter, but some may ignore it.

Returns

Subclass implementations must return a dictionary mapping state paths to initial values.

Return type

dict

is_deriver()bool[source]

Check whether this process is a deriver.

Deprecated since version 0.3.14: Derivers have been deprecated in favor of steps, so please override is_step instead of is_deriver. Support for Derivers may be removed in a future release.

Returns

Whether this process is a deriver. This class always returns False, but subclasses may change this.

is_step()bool[source]

Check whether this process is a step.

Returns

Whether this process is a step. This class always returns False, but subclasses may change this behavior.

merge_overrides(override: Dict[str, Any])None[source]

Add a schema override to the process’s schema overrides.

Parameters

override – The schema override to add.

abstract next_update(timestep: Union[float, int], states: Dict[str, Any])Dict[str, Any][source]

Compute the next update to the simulation state.

Parameters
  • timestep – The duration for which the update should be computed.

  • states – The pre-update simulation state. This will take the same form as the process’s schema, except with a value for each variable.

Returns

An empty dictionary for now. This should be overridden by each subclass to return an update.

property parallel
property parameters
ports()Dict[str, List[str]][source]

Get ports and each port’s variables.

Returns

A map from port names to lists of the variables that go into that port.

abstract ports_schema()Dict[str, Any][source]

Get the schemas for each port.

This must be overridden by any subclasses.

Use the glob ‘*’ schema to declare expected sub-store structure, and view all child values of the store:

schema = {
    'port1': {
        '*': {
            '_default': 1.0
        }
    }
}

Use the glob ‘**’ schema to connect to an entire sub-branch, including child nodes, grandchild nodes, etc:

schema = {
    'port1': '**'
}

Ports flagged as output-only won’t be viewed through the next_update’s states, which can save some overhead time:

schema = {
  'port1': {
     '_output': True,
     'A': {'_default': 1.0},
  }
}
Returns

A dictionary that declares which states are expected by the processes, and how each state will behave. State keys can be assigned properties through schema_keys declared in vivarium.core.store.Store.

pre_send_command(command: str, args: Optional[tuple], kwargs: Optional[dict])None[source]

Run pre-checks before starting a command.

This method should be called at the start of every implementation of send_command().

Parameters
  • command – The name of the command to run.

  • args – A tuple of positional arguments for the command.

  • kwargs – A dictionary of keyword arguments for the command.

Raises

RuntimeError – Raised when a user tries to send a command while a previous command is still pending (i.e. the user hasn’t called get_command_result() yet for the previous command).

run_command(command: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None) → Any[source]

Helper function that sends a command and returns result.

run_command_method(command: str, args: tuple, kwargs: dict) → Any[source]

Run a command whose name and interface match a method.

Parameters
  • command – The command name, which must equal to a method of self.

  • args – The positional arguments to pass to the method.

  • kwargs – The keywords arguments for the method.

Returns

The result of calling self.command(*args, **kwargs) is returned for command command.

property schema
property schema_override
send_command(command: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None, run_pre_check: bool = True)None[source]

Handle process commands.

This method handles the commands listed in METHOD_COMMANDS by passing args and kwargs to the method of self with the name of the command and saving the return value as the result.

This method handles the commands listed in ATTRIBUTE_READ_COMMANDS by returning the attribute of self with the name matching the command, and it handles the commands listed in ATTRIBUTE_WRITE_COMMANDS by setting the attribute in the command to the first argument in args. The command must be named set_attr for attribute attr.

To add support for a custom command, override this function in your subclass. Each command is defined by a name (a string) and accepts both positional and keyword arguments. Any custom commands you add should have associated methods such that:

  • The command name matches the method name.

  • The command and method accept the same positional and keyword arguments.

  • The command and method return the same values.

If all of the above are satisfied, you can use Process.run_command_method() to handle the command.

Your implementation of this function needs to handle all the commands you want to support. When presented with an unknown command, you should call the superclass method, which will either handle the command or call its superclass method. At the top of this recursive chain, this Process.send_command() method handles some built-in commands and will raise an error for unknown commands.

Any overrides of this method must also call pre_send_command() at the start of the method. This call will check that no command is currently pending to avoid confusing behavior when multiple commands are started without intervening retrievals of command results. Since your overriding method will have already performed the pre-check, it should pass run_pre_check=False when calling the superclass method.

Parameters
  • command – The name of the command to run.

  • args – A tuple of positional arguments for the command.

  • kwargs – A dictionary of keyword arguments for the command.

  • run_pre_check – Whether to run the pre-checks implemented in pre_send_command(). This should be left at its default value unless the pre-checks have already been performed (e.g. if this method is being called by a subclass’s overriding method.)

Returns

None. This method just starts the command running.

Raises

ValueError – For unknown commands.

update_condition(timestep: Union[float, int], states: Dict[str, Any]) → Any[source]

Determine whether this process runs.

Parameters
  • timestep – The duration for which an update.

  • states – The pre-update simulation state.

Returns

Boolean for whether this process runs. True by default.

class vivarium.core.process.Step(parameters: Optional[dict] = None)[source]

Bases: vivarium.core.process.Process

Base class for steps.

is_step()bool[source]

Returns True to signal that this process is a step.

class vivarium.core.process.ToyParallelProcess(parameters: Optional[dict] = None)[source]

Bases: vivarium.core.process.Process

compare_pid(pid: float)bool[source]
next_update(timestep: float, states: Dict[str, Any])Dict[str, Any][source]
ports_schema()Dict[str, Any][source]
send_command(command: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None, run_pre_check: bool = True)None[source]
class vivarium.core.process.ToySerializedProcess(parameters: Optional[dict] = None)[source]

Bases: vivarium.core.process.Process

defaults: Dict[str, list] = {'list': []}
next_update(timestep: float, states: Dict[str, Any])Dict[str, Any][source]
ports_schema()Dict[str, Any][source]
class vivarium.core.process.ToySerializedProcessInheritance(parameters: Optional[dict] = None)[source]

Bases: vivarium.core.process.Process

defaults: Dict[str, Any] = {'1': 1}
next_update(timestep: float, states: Dict[str, Any])Dict[str, Any][source]
ports_schema()Dict[str, Any][source]
vivarium.core.process._handle_parallel_process(connection: multiprocessing.connection.Connection, process: vivarium.core.process.Process, profile: bool)None[source]

Handle a parallel Vivarium process.

This function is designed to be passed as target to Multiprocess(). In a loop, it receives process commands from a pipe, passes those commands to the parallel process, and passes the result back along the pipe.

The special command end is handled directly by this function. This command causes the function to exit and therefore shut down the OS process created by multiprocessing.

Parameters
  • connection – The child end of a multiprocessing pipe. All communications received from the pipe should be a 3-tuple of the form (command, args, kwargs), and the tuple contents will be passed to Process.run_command(). The result, which may be of any type, will be sent back through the pipe.

  • process – The process running in parallel.

  • profile – Whether to profile the process.

vivarium.core.process.assoc_in(d: dict, path: Tuple[str, ], value: Any)dict[source]

Insert a value into a dictionary at an arbitrary depth.

Empty dictionaries will be created as needed to insert the value at the specified depth.

>>> d = {'a': {'b': 1}}
>>> assoc_in(d, ('a', 'c', 'd'), 2)
{'a': {'b': 1, 'c': {'d': 2}}}
Parameters
  • d – Dictionary to insert into.

  • path – Path in the dictionary where the value will be inserted. Each element of the path is dictionary key, which will be added if not already present. Any given element (except the first) refers to a key in the dictionary that is the value associated with the immediately preceding path element.

  • value – The value to insert.

Returns

Dictionary with the value inserted.