Module olivia.lib.aggregators

Aggregator base classes for computing metrics on Directed Acyclic Graphs.

Expand source code
"""Aggregator base classes for computing metrics on Directed Acyclic Graphs."""

from abc import abstractmethod, ABC
import numpy as np
import networkx as nx
from intbitset import intbitset

from olivia.lib.transientsequence import TransientSequence


class AscendentAggregator(ABC):

    """
    Base class for the network-wide computation of ascendent aggregation metrics.

    Ascendent aggregation metrics are values computed for each node as a function of the node and the set of its
    transitive descendants. The aggregation function must be implemented in the subclasses.

    The reversed topological order of the network is used to compute descendant sets in a relatively efficient manner,
    using a modified version of the Goralcikova-Koubek algorithm [1].

    Thus input graph must be acyclic and it is expected to be indexed by reversed topological order.

    [1] Goralčíková, Alla, and Václav Koubek. "A reduct-and-closure algorithm for graphs." International Symposium
    on Mathematical Foundations of Computer Science. Springer, Berlin, Heidelberg, 1979.

    Parameters
    ----------
    G: Networkx DiGraph
        Input acyclic graph indexed in reverse topological order.
    save_memory: bool, optional
         Set to True to free descendant sets that are no longer needed. Depending on the network
         topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
    compression_threshold: int
         Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in
         memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this
         may reduce drastically the amount of RAM needed at the expense of speed.
    mapping: dict
        If a mapping is provided, computation returns a dictionary values for each key according to value indexes.
        If not, a raw array indexed by node is returned.

    Notes
    -----
    Stores descendant sets in instances of ~olivia.lib.transientsequence.TransientSequence

    """

    def __init__(self, G, save_memory=False, compression_threshold=1000, mapping=None):
        """
        Create and inits an AscendentAggregator.
    
        Parameters
        ----------
        G: Networkx DiGraph
            Input acyclic graph indexed in reverse topological order.
        save_memory: bool, optional
             Set to True to free descendant sets that are no longer needed. Depending on the network
             topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
        compression_threshold: int
             Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in 
             memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this
             may reduce drastically the amount of RAM needed at the expense of speed.
        mapping: dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        self._G = G
        self._topological_order = range(len(G))
        self._save_memory = save_memory
        self._compression_threshold = compression_threshold
        self._descendants = None
        self._mapping = mapping
        self._dag_result = None

    def _ascendent_aggregation(self):
        """
        Compute the aggregation function over the network.

        Computes descendant sets in reversed topological order and stores the result of the aggregation
        function in _dag_result

        Returns
        -------
        None

        """
        for n in self._topological_order:
            if not n % 1000:
                print('     Processing node: '+str(n // 1000)+'K      ', end='\r', flush=True)
            tempset = intbitset()
            for m in self._G[n]:
                if m not in tempset:
                    tempset.update(self._descendants[m])
                    tempset.add(m)
            self._descendants[n] = tempset
            self._dag_result[n] = self._aggregation(n, tempset)
        print()

    @abstractmethod
    def _aggregation(self, n, descendants):
        """
        Return the value of the aggregation function.

        Parameters
        ----------
        n: int
            index of the current node
        descendants: iterable
            set of transitive descendants of n

        Returns
        -------
        value: int or object
            Value of the aggregation function for n and its descendants.

        """
        pass

    def _setup(self):
        """
        Init internal structures for computation.

        Returns
        -------
        None

        """

        def intbitset_decompressor(v):
            out = intbitset()
            intbitset.fastload(out, v)
            return out

        self._dag_result = np.zeros(len(self._G), dtype='int32')
        if self._save_memory:
            expiry = [self._G.in_degree()[n] for n in self._G]
        else:
            expiry = None
        self._descendants = TransientSequence(len(self._G),
                                              class_type=intbitset,
                                              compressor=intbitset.fastdump,
                                              decompressor=intbitset_decompressor,
                                              compression_threshold=self._compression_threshold,
                                              expiry_array=expiry)

    def compute(self):
        """
        Compute the ascendent aggregation metric defined by the aggregation function.

        Returns
        -------
        result: numpy.array or dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        self._setup()
        self._ascendent_aggregation()
        if self._mapping is None:
            return self._dag_result
        else:
            return {n: self._dag_result[self._mapping[n]] for n in self._mapping}


class DescendentAggregator(AscendentAggregator, ABC):

    """
    Abstract subclass of DescendentAggregator for inverting the computing direction.

    Aggregation process is carried in topological order over the reversed input graph.
    """

    def __init__(self, *args, **kwargs):
        """Create and init an DescendentAggregator."""
        super(DescendentAggregator, self).__init__(*args, **kwargs)
        self._topological_order = reversed(self._topological_order)

    def compute(self):
        """
        Compute the descendent aggregation metric defined by the aggregation function.

        Returns
        -------
        result: numpy.array or dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        with nx.utils.contextmanagers.reversed(self._G):
            return super(DescendentAggregator, self).compute()

Classes

class AscendentAggregator (G, save_memory=False, compression_threshold=1000, mapping=None)

Base class for the network-wide computation of ascendent aggregation metrics.

Ascendent aggregation metrics are values computed for each node as a function of the node and the set of its transitive descendants. The aggregation function must be implemented in the subclasses.

The reversed topological order of the network is used to compute descendant sets in a relatively efficient manner, using a modified version of the Goralcikova-Koubek algorithm [1].

Thus input graph must be acyclic and it is expected to be indexed by reversed topological order.

[1] Goralčíková, Alla, and Václav Koubek. "A reduct-and-closure algorithm for graphs." International Symposium on Mathematical Foundations of Computer Science. Springer, Berlin, Heidelberg, 1979.

Parameters

G : Networkx DiGraph
Input acyclic graph indexed in reverse topological order.
save_memory : bool, optional
Set to True to free descendant sets that are no longer needed. Depending on the network topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
compression_threshold : int
Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this may reduce drastically the amount of RAM needed at the expense of speed.
mapping : dict
If a mapping is provided, computation returns a dictionary values for each key according to value indexes. If not, a raw array indexed by node is returned.

Notes

Stores descendant sets in instances of ~olivia.lib.transientsequence.TransientSequence

Create and inits an AscendentAggregator.

Parameters

G : Networkx DiGraph
Input acyclic graph indexed in reverse topological order.
save_memory : bool, optional
Set to True to free descendant sets that are no longer needed. Depending on the network topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
compression_threshold : int
Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this may reduce drastically the amount of RAM needed at the expense of speed.
mapping : dict
If a mapping is provided, computation returns a dictionary with values for each key according to value indexes. If not, a raw array indexed by node is returned.
Expand source code
class AscendentAggregator(ABC):

    """
    Base class for the network-wide computation of ascendent aggregation metrics.

    Ascendent aggregation metrics are values computed for each node as a function of the node and the set of its
    transitive descendants. The aggregation function must be implemented in the subclasses.

    The reversed topological order of the network is used to compute descendant sets in a relatively efficient manner,
    using a modified version of the Goralcikova-Koubek algorithm [1].

    Thus input graph must be acyclic and it is expected to be indexed by reversed topological order.

    [1] Goralčíková, Alla, and Václav Koubek. "A reduct-and-closure algorithm for graphs." International Symposium
    on Mathematical Foundations of Computer Science. Springer, Berlin, Heidelberg, 1979.

    Parameters
    ----------
    G: Networkx DiGraph
        Input acyclic graph indexed in reverse topological order.
    save_memory: bool, optional
         Set to True to free descendant sets that are no longer needed. Depending on the network
         topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
    compression_threshold: int
         Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in
         memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this
         may reduce drastically the amount of RAM needed at the expense of speed.
    mapping: dict
        If a mapping is provided, computation returns a dictionary values for each key according to value indexes.
        If not, a raw array indexed by node is returned.

    Notes
    -----
    Stores descendant sets in instances of ~olivia.lib.transientsequence.TransientSequence

    """

    def __init__(self, G, save_memory=False, compression_threshold=1000, mapping=None):
        """
        Create and inits an AscendentAggregator.
    
        Parameters
        ----------
        G: Networkx DiGraph
            Input acyclic graph indexed in reverse topological order.
        save_memory: bool, optional
             Set to True to free descendant sets that are no longer needed. Depending on the network
             topology this can save 10 to 50% (and possibly more in large sparse networks) RAM required by the proccess.
        compression_threshold: int
             Descendant sets with sizes larger that this value will be dynamically compressed and decompressed in 
             memory. Use 0 to compress all descendant sets and np.inf for no compression. Depending on the network this
             may reduce drastically the amount of RAM needed at the expense of speed.
        mapping: dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        self._G = G
        self._topological_order = range(len(G))
        self._save_memory = save_memory
        self._compression_threshold = compression_threshold
        self._descendants = None
        self._mapping = mapping
        self._dag_result = None

    def _ascendent_aggregation(self):
        """
        Compute the aggregation function over the network.

        Computes descendant sets in reversed topological order and stores the result of the aggregation
        function in _dag_result

        Returns
        -------
        None

        """
        for n in self._topological_order:
            if not n % 1000:
                print('     Processing node: '+str(n // 1000)+'K      ', end='\r', flush=True)
            tempset = intbitset()
            for m in self._G[n]:
                if m not in tempset:
                    tempset.update(self._descendants[m])
                    tempset.add(m)
            self._descendants[n] = tempset
            self._dag_result[n] = self._aggregation(n, tempset)
        print()

    @abstractmethod
    def _aggregation(self, n, descendants):
        """
        Return the value of the aggregation function.

        Parameters
        ----------
        n: int
            index of the current node
        descendants: iterable
            set of transitive descendants of n

        Returns
        -------
        value: int or object
            Value of the aggregation function for n and its descendants.

        """
        pass

    def _setup(self):
        """
        Init internal structures for computation.

        Returns
        -------
        None

        """

        def intbitset_decompressor(v):
            out = intbitset()
            intbitset.fastload(out, v)
            return out

        self._dag_result = np.zeros(len(self._G), dtype='int32')
        if self._save_memory:
            expiry = [self._G.in_degree()[n] for n in self._G]
        else:
            expiry = None
        self._descendants = TransientSequence(len(self._G),
                                              class_type=intbitset,
                                              compressor=intbitset.fastdump,
                                              decompressor=intbitset_decompressor,
                                              compression_threshold=self._compression_threshold,
                                              expiry_array=expiry)

    def compute(self):
        """
        Compute the ascendent aggregation metric defined by the aggregation function.

        Returns
        -------
        result: numpy.array or dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        self._setup()
        self._ascendent_aggregation()
        if self._mapping is None:
            return self._dag_result
        else:
            return {n: self._dag_result[self._mapping[n]] for n in self._mapping}

Ancestors

  • abc.ABC

Subclasses

Methods

def compute(self)

Compute the ascendent aggregation metric defined by the aggregation function.

Returns

result : numpy.array or dict
If a mapping is provided, computation returns a dictionary with values for each key according to value indexes. If not, a raw array indexed by node is returned.
Expand source code
def compute(self):
    """
    Compute the ascendent aggregation metric defined by the aggregation function.

    Returns
    -------
    result: numpy.array or dict
        If a mapping is provided, computation returns a dictionary with values for each key
        according to value indexes.
        If not, a raw array indexed by node is returned.

    """
    self._setup()
    self._ascendent_aggregation()
    if self._mapping is None:
        return self._dag_result
    else:
        return {n: self._dag_result[self._mapping[n]] for n in self._mapping}
class DescendentAggregator (*args, **kwargs)

Abstract subclass of DescendentAggregator for inverting the computing direction.

Aggregation process is carried in topological order over the reversed input graph.

Create and init an DescendentAggregator.

Expand source code
class DescendentAggregator(AscendentAggregator, ABC):

    """
    Abstract subclass of DescendentAggregator for inverting the computing direction.

    Aggregation process is carried in topological order over the reversed input graph.
    """

    def __init__(self, *args, **kwargs):
        """Create and init an DescendentAggregator."""
        super(DescendentAggregator, self).__init__(*args, **kwargs)
        self._topological_order = reversed(self._topological_order)

    def compute(self):
        """
        Compute the descendent aggregation metric defined by the aggregation function.

        Returns
        -------
        result: numpy.array or dict
            If a mapping is provided, computation returns a dictionary with values for each key
            according to value indexes.
            If not, a raw array indexed by node is returned.

        """
        with nx.utils.contextmanagers.reversed(self._G):
            return super(DescendentAggregator, self).compute()

Ancestors

Subclasses

Methods

def compute(self)

Compute the descendent aggregation metric defined by the aggregation function.

Returns

result : numpy.array or dict
If a mapping is provided, computation returns a dictionary with values for each key according to value indexes. If not, a raw array indexed by node is returned.
Expand source code
def compute(self):
    """
    Compute the descendent aggregation metric defined by the aggregation function.

    Returns
    -------
    result: numpy.array or dict
        If a mapping is provided, computation returns a dictionary with values for each key
        according to value indexes.
        If not, a raw array indexed by node is returned.

    """
    with nx.utils.contextmanagers.reversed(self._G):
        return super(DescendentAggregator, self).compute()