Emitters¶
Emitters log configuration data and time-series data somewhere.
-
class
vivarium.core.emitter.
DatabaseEmitter
(config: Dict[str, Any])[source]¶ Bases:
vivarium.core.emitter.Emitter
Emit data to a mongoDB database
Example:
>>> config = { ... 'host': 'localhost:27017', ... 'database': 'DB_NAME', ... } >>> # The line below works only if you have to have 27017 open locally >>> # emitter = DatabaseEmitter(config)
config may have ‘host’ and ‘database’ items.
-
client_dict
: Dict[int, pymongo.mongo_client.MongoClient] = {}¶
-
classmethod
create_indexes
(table: Any, columns: List[Any]) → None[source]¶ Create the listed column indexes for the given DB table.
-
default_host
= 'localhost:27017'¶
-
-
class
vivarium.core.emitter.
Emitter
(config: Dict[str, str])[source]¶ Bases:
object
Base class for emitters.
This emitter simply emits to STDOUT.
- Parameters
config – Emitter configuration.
-
emit
(data: Dict[str, Any]) → None[source]¶ Emit data.
- Parameters
data – The data to emit. This gets called by the Vivarium engine with a snapshot of the simulation state.
-
get_data
(query: Optional[list] = None) → dict[source]¶ Get the emitted data.
- Returns
The data that has been emitted to the database in the raw data format. For this particular class, an empty dictionary is returned.
-
get_data_deserialized
(query: Optional[list] = None) → Any[source]¶ Get the emitted data with variable values deserialized.
- Returns
The data that has been emitted to the database in the raw data format. Before being returned, serialized values in the data are deserialized.
-
get_data_unitless
(query: Optional[list] = None) → Any[source]¶ Get the emitted data with units stripped from variable values.
- Returns
The data that has been emitted to the database in the raw data format. Before being returned, units are stripped from values.
-
get_path_timeseries
(query: Optional[list] = None) → dict[source]¶ Get the deserialized data as a path timeseries.
- Returns
The deserialized emitted data, formatted as a path timeseries.
-
get_timeseries
(query: Optional[list] = None) → dict[source]¶ Get the deserialized data as an embedded timeseries.
- Returns
The deserialized emitted data, formatted as an embedded timeseries.
-
class
vivarium.core.emitter.
NullEmitter
(config: Dict[str, str])[source]¶ Bases:
vivarium.core.emitter.Emitter
Don’t emit anything
Base class for emitters.
This emitter simply emits to STDOUT.
- Parameters
config – Emitter configuration.
-
class
vivarium.core.emitter.
RAMEmitter
(config: Dict[str, Any])[source]¶ Bases:
vivarium.core.emitter.Emitter
Accumulate the timeseries history portion of the “emitted” data to a table in RAM.
Bases:
vivarium.core.emitter.RAMEmitter
Accumulate the timeseries history portion of the “emitted” data to a table in RAM that is shared across all instances of the emitter.
-
vivarium.core.emitter.
apply_func
(document: Any, field: Tuple, f: Optional[Callable[[…], Any]] = None) → Any[source]¶
-
vivarium.core.emitter.
breakdown_data
(limit: float, data: Any, path: Tuple = (), size: Optional[float] = None) → list[source]¶
-
vivarium.core.emitter.
data_from_database
(experiment_id: str, client: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[…], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1) → Tuple[dict, Any][source]¶ Fetch something from a MongoDB.
- Parameters
experiment_id – the experiment id which is being retrieved
client – a MongoClient instance connected to the DB
query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]
func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}
f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.
filters – MongoDB query arguments to further filter results beyond matching the experiment ID.
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – splits query into this many chunks to run in parallel
- Returns
data (dict)
-
vivarium.core.emitter.
data_to_database
(data: Dict[float, dict], environment_config: Any, client: Any) → Any[source]¶ Insert something into a MongoDB.
-
vivarium.core.emitter.
delete_experiment
(host: str = 'localhost', port: Any = 27017, query: Optional[dict] = None) → None[source]¶ Helper function to delete experiment data in parallel
- Parameters
host – Host name of database. This can usually be left as the default.
port – Port number of database. This can usually be left as the default.
query – Filter for documents to delete.
-
vivarium.core.emitter.
delete_experiment_from_database
(experiment_id: str, host: str = 'localhost', port: Any = 27017, cpus: int = 1) → None[source]¶ Delete an experiment’s data from a database.
- Parameters
experiment_id – Identifier of experiment.
host – Host name of database. This can usually be left as the default.
port – Port number of database. This can usually be left as the default.
cpus – Number of chunks to split delete operation into to be run in parallel. Useful if single-threaded delete does not saturate I/O.
-
vivarium.core.emitter.
get_atlas_client
(secrets_path: str) → Any[source]¶ Open a MongoDB client using the named secrets config JSON file.
-
vivarium.core.emitter.
get_atlas_database_emitter_config
(username: str, password: str, cluster_subdomain: Any, database: str) → Dict[str, Any][source]¶ Construct an Emitter config for a MongoDB on the Atlas service.
-
vivarium.core.emitter.
get_data_chunks
(history_collection: Any, experiment_id: str, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 8) → list[source]¶ Helper function to get chunks for parallel queries
- Parameters
history_collection – the MongoDB history collection to query
experiment_id – the experiment id which is being retrieved
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – number of chunks to create
- Returns
List of ObjectId tuples that represent chunk boundaries. For each tuple, include
{'_id': {$gte: tuple[0], $lt: tuple[1]}}
in the query to search its corresponding chunk.
-
vivarium.core.emitter.
get_emitter
(config: Optional[Dict[str, str]]) → vivarium.core.emitter.Emitter[source]¶ Construct an Emitter using the provided config.
The available Emitter type names and their classes are:
database
:DatabaseEmitter
null
:NullEmitter
print
:Emitter
, prints to stdouttimeseries
:RAMEmitter
- Parameters
config – Must comtain the
type
key, which specifies the emitter type name (e.g.database
).- Returns
A new Emitter instance.
-
vivarium.core.emitter.
get_experiment_database
(port: Any = 27017, database_name: str = 'simulations') → Any[source]¶ Get a database object.
- Parameters
port – Port number of database. This can usually be left as the default.
database_name – Name of the database table. This can usually be left as the default.
- Returns
The database object.
-
vivarium.core.emitter.
get_history_data_db
(history_collection: Any, experiment_id: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[…], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1, host: str = 'localhost', port: Any = '27017') → Dict[float, dict][source]¶ Query MongoDB for history data.
- Parameters
history_collection – a MongoDB collection
experiment_id – the experiment id which is being retrieved
query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]
func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}
f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.
filters – MongoDB query arguments to further filter results beyond matching the experiment ID.
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – splits query into this many chunks to run in parallel, useful if single-threaded query does not saturate I/O (e.g. on Google Cloud)
host – used if cpus>1 to create MongoClient in parallel processes
port – used if cpus>1 to create MongoClient in parallel processes
- Returns
data (dict)
-
vivarium.core.emitter.
get_local_client
(host: str, port: Any, database_name: str) → Any[source]¶ Open a MongoDB client onto the given host, port, and DB.
-
vivarium.core.emitter.
get_query
(projection: dict, host: str, port: Any, query: dict) → list[source]¶ Helper function for parallel queries
- Parameters
projection – a MongoDB projection in dictionary form
host – used to create new MongoClient for each parallel process
port – used to create new MongoClient for each parallel process
query – a MongoDB query in dictionary form
- Returns
List of projected documents for given query
-
vivarium.core.emitter.
path_timeseries_from_data
(data: dict) → dict[source]¶ Convert from raw data to a path timeseries.
-
vivarium.core.emitter.
path_timeseries_from_embedded_timeseries
(embedded_timeseries: dict) → dict[source]¶ Convert an embedded timeseries to a path timeseries.