Source code for vivarium.library.topology

'''
==================
Topology Utilities
==================
'''

import copy
import re

from vivarium.library.dict_utils import deep_merge, deep_merge_multi_update


[docs]def get_in(d, path, default=None): '''Get the value from a dictionary by its path. >>> d = {'a': {'b': 'c', 'd': 'e'}} >>> get_in(d, ('a', 'b')) 'c' >>> get_in(d, ('a', 'z')) >>> get_in(d, ('a', 'z'), 'y') 'y' ''' if path: head = path[0] if head in d: return get_in(d[head], path[1:], default) return default return d
[docs]def delete_in(d, path): '''Delete an item from a dictionary by its path. >>> d = {'a': {'b': 'c', 'd': 'e'}} >>> delete_in(d, ('a', 'b')) >>> d {'a': {'d': 'e'}} ''' if len(path) > 0: head = path[0] if len(path) == 1: # at the node to be deleted if head in d: del d[head] elif head in d: # down = d[head] delete_in(d[head], path[1:])
[docs]def assoc_path(d, path, value): '''Insert ``value`` into the dictionary ``d`` at ``path``. >>> d = {'a': {'b': 'c'}} >>> assoc_path(d, ('a', 'd'), 'e') {'a': {'b': 'c', 'd': 'e'}} >>> d {'a': {'b': 'c', 'd': 'e'}} Create new dictionaries recursively as needed. ''' if path: head = path[0] if len(path) == 1: d[head] = value else: if head not in d: d[head] = {} assoc_path(d[head], path[1:], value) elif isinstance(value, dict): deep_merge(d, value) return d
[docs]def update_in(d, path, f): '''Update every value in a dictionary based on ``f``. Args: d: The dictionary ``path`` applies to. This object is not modified. path: Path to the sub-dictionary within ``d`` that should be updated. f: Function to call on every value in the dictionary to update. The updated dictionary's values will be return values from ``f``. Returns: A copy of ``d`` with all the values under ``path`` updated to the value returned when ``f`` is called on the original value. ''' if path: head = path[0] d.setdefault(head, {}) # updated = copy.deepcopy(d) updated = copy.copy(d) updated[head] = update_in(d[head], path[1:], f) return updated return f(d)
[docs]def paths_to_dict(path_list, f=lambda x: x): '''Create a new dictionary that has the paths in ``path_list``. Args: path_list: A list of tuples ``(path, value)``. f: A function to apply to each value before inserting it into the dictionary. Returns: A new dictionary with the specified values (after being passed through ``f``) at each associated path. ''' d = {} for path, node in path_list: assoc_path(d, path, f(node)) return d
[docs]def dict_to_paths(root, d): """Get all the paths in a dictionary. For example: >>> root = ('root', 'subroot') >>> d = { ... 'a': { ... 'b': 'c', ... }, ... 'd': 'e', ... } >>> dict_to_paths(root, d) [(('root', 'subroot', 'a', 'b'), 'c'), (('root', 'subroot', 'd'), 'e')] """ if isinstance(d, dict): deeper = [] for key, down in d.items(): paths = dict_to_paths(root + (key,), down) deeper.extend(paths) return deeper else: return [(root, d)]
[docs]def inverse_topology(outer, update, topology, inverse=None, multi_updates=True): ''' Transform an update from the form its process produced into one aligned to the given topology. The inverse of this function (using a topology to construct a view for the perspective of a Process ports_schema()) lives in `Store`, called `topology_state`. This one stands alone as it does not require a store to calculate. ''' inverse = inverse or {} for key, path in topology.items(): if key == '*': if isinstance(path, dict): node = inverse if '_path' in path: path = path.copy() inner = normalize_path(outer + path.pop('_path')) else: inner = outer for child, child_update in update.items(): inverse = inverse_topology( inner + (child,), update[child], path, inverse, multi_updates) else: for child, child_update in update.items(): inner = normalize_path(outer + path + (child,)) if isinstance(child_update, dict): inverse = update_in( inverse, inner, lambda current: deep_merge( current, child_update)) else: assoc_path(inverse, inner, child_update) elif key in update: value = update[key] if isinstance(path, dict): if '_path' in path: path = path.copy() inner = normalize_path(outer + path.pop('_path')) for update_key in update[key].keys(): if update_key not in path and '*' not in path: path[update_key] = (update_key,) else: inner = outer inverse = inverse_topology( inner, value, path, inverse, multi_updates) else: inner = normalize_path(outer + path) if isinstance(value, dict): if multi_updates: inverse = update_in( inverse, inner, lambda current: deep_merge_multi_update(current, value)) # Do not allow multiupdates when forming initial state else: inverse = update_in( inverse, inner, lambda current: deep_merge(current, value)) else: assoc_path(inverse, inner, value) return inverse
[docs]def normalize_path(path): """Make a path absolute by resolving ``..`` elements.""" progress = [] for step in path: if step == '..' and len(progress) > 0: progress = progress[:-1] else: progress.append(step) return tuple(progress)
[docs]def convert_path_style(path): if isinstance(path, str): path = re.sub(r'<', '..<', path) path = tuple(re.split('<|>', path)) return path
[docs]class TestUpdateIn: d = { 'foo': { 1: { 'a': 'b', }, }, 'bar': { 'c': 'd', }, } def test_simple(self): updated = copy.deepcopy(self.d) updated = update_in( updated, ('foo', 1, 'a'), lambda current: 'updated') expected = { 'foo': { 1: { 'a': 'updated', }, }, 'bar': { 'c': 'd', }, } assert updated == expected def test_add_leaf(self): updated = copy.deepcopy(self.d) updated = update_in( updated, ('foo', 1, 'new'), lambda current: 'updated') expected = { 'foo': { 1: { 'a': 'b', 'new': 'updated', }, }, 'bar': { 'c': 'd', }, } assert updated == expected def test_add_dict(self): updated = copy.deepcopy(self.d) updated = update_in( updated, ('foo', 2), lambda current: {'a': 'updated'}) expected = { 'foo': { 1: { 'a': 'b', }, 2: { 'a': 'updated', }, }, 'bar': { 'c': 'd', }, } assert updated == expected def test_complex_merge(self): updated = copy.deepcopy(self.d) updated = update_in( updated, ('foo',), lambda current: deep_merge( current, {'foo': {'a': 'updated'}, 'b': 2}), ) expected = { 'foo': { 'foo': { 'a': 'updated', }, 'b': 2, 1: { 'a': 'b', }, }, 'bar': { 'c': 'd', }, } assert updated == expected def test_add_to_root(self): updated = copy.deepcopy(self.d) updated = update_in( updated, tuple(), lambda current: deep_merge(current, ({'a': 'updated'})), ) expected = { 'foo': { 1: { 'a': 'b', }, }, 'bar': { 'c': 'd', }, 'a': 'updated' } assert updated == expected def test_set_root(self): updated = copy.deepcopy(self.d) updated = update_in( updated, tuple(), lambda current: {'a': 'updated'}) expected = { 'a': 'updated', } assert updated == expected
def test_inverse_topology(): update = { 'port1': { 'a': 5}, 'port2': { 'b': 10}, 'port3': { 'b': 10}, 'global': { 'c': 20}} topology = { 'port1': ('boundary', 'x'), 'global': ('boundary',), 'port2': ('boundary', 'y'), 'port3': ('boundary', 'x')} path = ('agent',) inverse = inverse_topology(path, update, topology) expected_inverse = { 'agent': { 'boundary': { 'x': { 'a': 5, 'b': 10}, 'y': { 'b': 10}, 'c': 20}}} assert inverse == expected_inverse def test_deletion(): nested = { 'A': { 'AA': 5, 'AB': { 'ABC': 11}}, 'B': { 'BA': 6}} delete_in(nested, ('A', 'AA')) assert 'AA' not in nested['A'] def test_in(): blank = {} path = ['where', 'are', 'we'] assoc_path(blank, path, 5) print(blank) print(get_in(blank, path)) blank = update_in(blank, path, lambda x: x + 6) print(blank) def test_path_declare(): path_down = 'path>to>store' new_path_down = convert_path_style(path_down) assert new_path_down == ('path', 'to', 'store') path_up = '<<store' new_path_up = convert_path_style(path_up) assert new_path_up == ('..', '..', 'store') if __name__ == '__main__': test_path_declare()