Emitters¶
Emitters log configuration data and time-series data somewhere.
-
class
vivarium.core.emitter.DatabaseEmitter(config: Dict[str, Any])[source]¶ Bases:
vivarium.core.emitter.EmitterEmit data to a mongoDB database
Example:
>>> config = { ... 'host': 'localhost:27017', ... 'database': 'DB_NAME', ... } >>> # The line below works only if you have to have 27017 open locally >>> # emitter = DatabaseEmitter(config)
config may have ‘host’ and ‘database’ items.
-
client_dict: Dict[int, pymongo.mongo_client.MongoClient] = {}¶
-
classmethod
create_indexes(table: Any, columns: List[Any]) → None[source]¶ Create the listed column indexes for the given DB table.
-
default_host= 'localhost:27017'¶
-
-
class
vivarium.core.emitter.Emitter(config: Dict[str, str])[source]¶ Bases:
objectBase class for emitters.
This emitter simply emits to STDOUT.
- Parameters
config – Emitter configuration.
-
emit(data: Dict[str, Any]) → None[source]¶ Emit data.
- Parameters
data – The data to emit. This gets called by the Vivarium engine with a snapshot of the simulation state.
-
get_data(query: Optional[list] = None) → dict[source]¶ Get the emitted data.
- Returns
The data that has been emitted to the database in the raw data format. For this particular class, an empty dictionary is returned.
-
get_data_deserialized(query: Optional[list] = None) → Any[source]¶ Get the emitted data with variable values deserialized.
- Returns
The data that has been emitted to the database in the raw data format. Before being returned, serialized values in the data are deserialized.
-
get_data_unitless(query: Optional[list] = None) → Any[source]¶ Get the emitted data with units stripped from variable values.
- Returns
The data that has been emitted to the database in the raw data format. Before being returned, units are stripped from values.
-
get_path_timeseries(query: Optional[list] = None) → dict[source]¶ Get the deserialized data as a path timeseries.
- Returns
The deserialized emitted data, formatted as a path timeseries.
-
get_timeseries(query: Optional[list] = None) → dict[source]¶ Get the deserialized data as an embedded timeseries.
- Returns
The deserialized emitted data, formatted as an embedded timeseries.
-
class
vivarium.core.emitter.NullEmitter(config: Dict[str, str])[source]¶ Bases:
vivarium.core.emitter.EmitterDon’t emit anything
Base class for emitters.
This emitter simply emits to STDOUT.
- Parameters
config – Emitter configuration.
-
class
vivarium.core.emitter.RAMEmitter(config: Dict[str, Any])[source]¶ Bases:
vivarium.core.emitter.EmitterAccumulate the timeseries history portion of the “emitted” data to a table in RAM.
Bases:
vivarium.core.emitter.RAMEmitterAccumulate the timeseries history portion of the “emitted” data to a table in RAM that is shared across all instances of the emitter.
-
vivarium.core.emitter.apply_func(document: Any, field: Tuple, f: Optional[Callable[[…], Any]] = None) → Any[source]¶
-
vivarium.core.emitter.breakdown_data(limit: float, data: Any, path: Tuple = (), size: Optional[float] = None) → list[source]¶
-
vivarium.core.emitter.data_from_database(experiment_id: str, client: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[…], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1) → Tuple[dict, Any][source]¶ Fetch something from a MongoDB.
- Parameters
experiment_id – the experiment id which is being retrieved
client – a MongoClient instance connected to the DB
query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]
func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}
f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.
filters – MongoDB query arguments to further filter results beyond matching the experiment ID.
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – splits query into this many chunks to run in parallel
- Returns
data (dict)
-
vivarium.core.emitter.data_to_database(data: Dict[float, dict], environment_config: Any, client: Any) → Any[source]¶ Insert something into a MongoDB.
-
vivarium.core.emitter.delete_experiment(host: str = 'localhost', port: Any = 27017, query: Optional[dict] = None) → None[source]¶ Helper function to delete experiment data in parallel
- Parameters
host – Host name of database. This can usually be left as the default.
port – Port number of database. This can usually be left as the default.
query – Filter for documents to delete.
-
vivarium.core.emitter.delete_experiment_from_database(experiment_id: str, host: str = 'localhost', port: Any = 27017, cpus: int = 1) → None[source]¶ Delete an experiment’s data from a database.
- Parameters
experiment_id – Identifier of experiment.
host – Host name of database. This can usually be left as the default.
port – Port number of database. This can usually be left as the default.
cpus – Number of chunks to split delete operation into to be run in parallel. Useful if single-threaded delete does not saturate I/O.
-
vivarium.core.emitter.get_atlas_client(secrets_path: str) → Any[source]¶ Open a MongoDB client using the named secrets config JSON file.
-
vivarium.core.emitter.get_atlas_database_emitter_config(username: str, password: str, cluster_subdomain: Any, database: str) → Dict[str, Any][source]¶ Construct an Emitter config for a MongoDB on the Atlas service.
-
vivarium.core.emitter.get_data_chunks(history_collection: Any, experiment_id: str, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 8) → list[source]¶ Helper function to get chunks for parallel queries
- Parameters
history_collection – the MongoDB history collection to query
experiment_id – the experiment id which is being retrieved
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – number of chunks to create
- Returns
List of ObjectId tuples that represent chunk boundaries. For each tuple, include
{'_id': {$gte: tuple[0], $lt: tuple[1]}}in the query to search its corresponding chunk.
-
vivarium.core.emitter.get_emitter(config: Optional[Dict[str, str]]) → vivarium.core.emitter.Emitter[source]¶ Construct an Emitter using the provided config.
The available Emitter type names and their classes are:
database:DatabaseEmitternull:NullEmitterprint:Emitter, prints to stdouttimeseries:RAMEmitter
- Parameters
config – Must comtain the
typekey, which specifies the emitter type name (e.g.database).- Returns
A new Emitter instance.
-
vivarium.core.emitter.get_experiment_database(port: Any = 27017, database_name: str = 'simulations') → Any[source]¶ Get a database object.
- Parameters
port – Port number of database. This can usually be left as the default.
database_name – Name of the database table. This can usually be left as the default.
- Returns
The database object.
-
vivarium.core.emitter.get_history_data_db(history_collection: Any, experiment_id: Any, query: Optional[list] = None, func_dict: Optional[dict] = None, f: Optional[Callable[[…], Any]] = None, filters: Optional[dict] = None, start_time: Union[int, bson.min_key.MinKey] = MinKey(), end_time: Union[int, bson.max_key.MaxKey] = MaxKey(), cpus: int = 1, host: str = 'localhost', port: Any = '27017') → Dict[float, dict][source]¶ Query MongoDB for history data.
- Parameters
history_collection – a MongoDB collection
experiment_id – the experiment id which is being retrieved
query – a list of tuples pointing to fields within the experiment data. In the format: [(‘path’, ‘to’, ‘field1’), (‘path’, ‘to’, ‘field2’)]
func_dict – a dict which maps the given query paths to a function that operates on the retrieved values and returns the results. If None then the raw values are returned. In the format: {(‘path’, ‘to’, ‘field1’): function}
f – a function that applies equally to all fields in query. func_dict is the recommended approach and takes priority over f.
filters – MongoDB query arguments to further filter results beyond matching the experiment ID.
start_time – first and last simulation time to query
end_time – first and last simulation time to query
cpus – splits query into this many chunks to run in parallel, useful if single-threaded query does not saturate I/O (e.g. on Google Cloud)
host – used if cpus>1 to create MongoClient in parallel processes
port – used if cpus>1 to create MongoClient in parallel processes
- Returns
data (dict)
-
vivarium.core.emitter.get_local_client(host: str, port: Any, database_name: str) → Any[source]¶ Open a MongoDB client onto the given host, port, and DB.
-
vivarium.core.emitter.get_query(projection: dict, host: str, port: Any, query: dict) → list[source]¶ Helper function for parallel queries
- Parameters
projection – a MongoDB projection in dictionary form
host – used to create new MongoClient for each parallel process
port – used to create new MongoClient for each parallel process
query – a MongoDB query in dictionary form
- Returns
List of projected documents for given query
-
vivarium.core.emitter.path_timeseries_from_data(data: dict) → dict[source]¶ Convert from raw data to a path timeseries.
-
vivarium.core.emitter.path_timeseries_from_embedded_timeseries(embedded_timeseries: dict) → dict[source]¶ Convert an embedded timeseries to a path timeseries.