Emitters

Emitters log configuration data and time-series data somewhere.

class vivarium.core.emitter.DatabaseEmitter(config: Dict[str, Any])[source]

Bases: vivarium.core.emitter.Emitter

Emit data to a mongoDB database

Example:

>>> config = {
...     'host': 'localhost:27017',
...     'database': 'DB_NAME',
... }
>>> # The line below works only if you have to have 27017 open locally
>>> # emitter = DatabaseEmitter(config)

config may have ‘host’ and ‘database’ items.

client_dict: Dict[int, pymongo.mongo_client.MongoClient] = {}
classmethod create_indexes(table: Any, columns: List[Any])None[source]

Create the listed column indexes for the given DB table.

default_host = 'localhost:27017'
emit(data: Dict[str, Any])None[source]
get_data(query: Optional[list] = None)dict[source]
write_emit(table: Any, emit_data: Dict[str, Any])None[source]

Check that data size is less than emit limit.

Break up large emits into smaller pieces and emit them individually

class vivarium.core.emitter.Emitter(config: Dict[str, str])[source]

Bases: object

Base class for emitters.

This emitter simply emits to STDOUT.

Parameters

config – Emitter configuration.

emit(data: Dict[str, Any])None[source]

Emit data.

Parameters

data – The data to emit. This gets called by the Vivarium engine with a snapshot of the simulation state.

get_data(query: Optional[list] = None)dict[source]

Get the emitted data.

Returns

The data that has been emitted to the database in the raw data format. For this particular class, an empty dictionary is returned.

get_data_deserialized(query: Optional[list] = None) → Any[source]

Get the emitted data with variable values deserialized.

Returns

The data that has been emitted to the database in the raw data format. Before being returned, serialized values in the data are deserialized.

get_data_unitless(query: Optional[list] = None) → Any[source]

Get the emitted data with units stripped from variable values.

Returns

The data that has been emitted to the database in the raw data format. Before being returned, units are stripped from values.

get_path_timeseries(query: Optional[list] = None)dict[source]

Get the deserialized data as a path timeseries.

Returns

The deserialized emitted data, formatted as a path timeseries.

get_timeseries(query: Optional[list] = None)dict[source]

Get the deserialized data as an embedded timeseries.

Returns

The deserialized emitted data, formatted as an embedded timeseries.

class vivarium.core.emitter.NullEmitter(config: Dict[str, str])[source]

Bases: vivarium.core.emitter.Emitter

Don’t emit anything

Base class for emitters.

This emitter simply emits to STDOUT.

Parameters

config – Emitter configuration.

emit(data: Dict[str, Any])None[source]
class vivarium.core.emitter.RAMEmitter(config: Dict[str, Any])[source]

Bases: vivarium.core.emitter.Emitter

Accumulate the timeseries history portion of the “emitted” data to a table in RAM.

emit(data: Dict[str, Any])None[source]

Emit the timeseries history portion of data, which is data['data'] if data['table'] == 'history' and put it at data['data']['time'] in the history.

get_data(query: Optional[list] = None)dict[source]

Return the accumulated timeseries history of “emitted” data.

class vivarium.core.emitter.SharedRamEmitter(config: Dict[str, Any])[source]

Bases: vivarium.core.emitter.RAMEmitter

Accumulate the timeseries history portion of the “emitted” data to a table in RAM that is shared across all instances of the emitter.

saved_data: Dict[float, Dict[str, Any]] = {}
vivarium.core.emitter.apply_func(document: Any, field: Tuple, f: Optional[Callable[[], Any]] = None) → Any[source]
vivarium.core.emitter.assemble_data(data: list)dict[source]

re-assemble data

vivarium.core.emitter.breakdown_data(limit: float, data: Any, path: Tuple = (), size: Optional[float] = None)list[source]
vivarium.core.emitter.data_from_database(experiment_id: str, client: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1) → Tuple[dict, Any][source]

Fetch something from a MongoDB.

Parameters
  • experiment_id – the experiment id which is being retrieved

  • client – a MongoClient instance connected to the DB

  • query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]

  • func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}

  • f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.

  • filters – MongoDB query arguments to further filter results beyond matching the experiment ID.

  • start_time – first and last simulation time to query

  • end_time – first and last simulation time to query

  • cpus – splits query into this many chunks to run in parallel

Returns

data (dict)

vivarium.core.emitter.data_to_database(data: Dict[float, dict], environment_config: Any, client: Any) → Any[source]

Insert something into a MongoDB.

vivarium.core.emitter.delete_experiment(host: str = 'localhost', port: Any = 27017, query: Optional[dict] = None)None[source]

Helper function to delete experiment data in parallel

Parameters
  • host – Host name of database. This can usually be left as the default.

  • port – Port number of database. This can usually be left as the default.

  • query – Filter for documents to delete.

vivarium.core.emitter.delete_experiment_from_database(experiment_id: str, host: str = 'localhost', port: Any = 27017, cpus: int = 1)None[source]

Delete an experiment’s data from a database.

Parameters
  • experiment_id – Identifier of experiment.

  • host – Host name of database. This can usually be left as the default.

  • port – Port number of database. This can usually be left as the default.

  • cpus – Number of chunks to split delete operation into to be run in parallel. Useful if single-threaded delete does not saturate I/O.

vivarium.core.emitter.get_atlas_client(secrets_path: str) → Any[source]

Open a MongoDB client using the named secrets config JSON file.

vivarium.core.emitter.get_atlas_database_emitter_config(username: str, password: str, cluster_subdomain: Any, database: str)Dict[str, Any][source]

Construct an Emitter config for a MongoDB on the Atlas service.

vivarium.core.emitter.get_data_chunks(history_collection: Any, experiment_id: str, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 8)list[source]

Helper function to get chunks for parallel queries

Parameters
  • history_collection – the MongoDB history collection to query

  • experiment_id – the experiment id which is being retrieved

  • start_time – first and last simulation time to query

  • end_time – first and last simulation time to query

  • cpus – number of chunks to create

Returns

List of ObjectId tuples that represent chunk boundaries. For each tuple, include {'_id': {$gte: tuple[0], $lt: tuple[1]}} in the query to search its corresponding chunk.

vivarium.core.emitter.get_emitter(config: Optional[Dict[str, str]])vivarium.core.emitter.Emitter[source]

Construct an Emitter using the provided config.

The available Emitter type names and their classes are:

Parameters

config – Must comtain the type key, which specifies the emitter type name (e.g. database).

Returns

A new Emitter instance.

vivarium.core.emitter.get_experiment_database(port: Any = 27017, database_name: str = 'simulations') → Any[source]

Get a database object.

Parameters
  • port – Port number of database. This can usually be left as the default.

  • database_name – Name of the database table. This can usually be left as the default.

Returns

The database object.

vivarium.core.emitter.get_history_data_db(history_collection: Any, experiment_id: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1, host: str = 'localhost', port: Any = '27017')Dict[float, dict][source]

Query MongoDB for history data.

Parameters
  • history_collection – a MongoDB collection

  • experiment_id – the experiment id which is being retrieved

  • query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]

  • func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}

  • f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.

  • filters – MongoDB query arguments to further filter results beyond matching the experiment ID.

  • start_time – first and last simulation time to query

  • end_time – first and last simulation time to query

  • cpus – splits query into this many chunks to run in parallel, useful if single-threaded query does not saturate I/O (e.g. on Google Cloud)

  • host – used if cpus>1 to create MongoClient in parallel processes

  • port – used if cpus>1 to create MongoClient in parallel processes

Returns

data (dict)

vivarium.core.emitter.get_local_client(host: str, port: Any, database_name: str) → Any[source]

Open a MongoDB client onto the given host, port, and DB.

vivarium.core.emitter.get_query(projection: dict, host: str, port: Any, query: dict)list[source]

Helper function for parallel queries

Parameters
  • projection – a MongoDB projection in dictionary form

  • host – used to create new MongoClient for each parallel process

  • port – used to create new MongoClient for each parallel process

  • query – a MongoDB query in dictionary form

Returns

List of projected documents for given query

vivarium.core.emitter.path_timeseries_from_data(data: dict)dict[source]

Convert from raw data to a path timeseries.

vivarium.core.emitter.path_timeseries_from_embedded_timeseries(embedded_timeseries: dict)dict[source]

Convert an embedded timeseries to a path timeseries.

vivarium.core.emitter.timeseries_from_data(data: dict)dict[source]

Convert raw data to an embedded timeseries.