Module olivia.model
Olivia Model for studying the vulnerability of package dependency networks.
Olivia stands for 'Open-source Library Indexes Vulnerability Identification and Analysis'. Includes tools for the analysis of package dependency networks vulnerability to failures and attacks.
Expand source code
"""
Olivia Model for studying the vulnerability of package dependency networks.
Olivia stands for 'Open-source Library Indexes Vulnerability Identification and Analysis'.
Includes tools for the analysis of package dependency networks vulnerability to failures and attacks.
"""
import networkx as nx
import gzip
import pickle
import numpy as np
from olivia.coupling import coupling_interface, coupling_profile
class PackageInfoView:
"""
Helper class for retrieving individual package information and metrics.
For network-wide analysis use suitable classes and methods from ~networkmetrics, ~packagemetrics or ~coupling
instead, as individual computation can be orders of magnitude slower.
"""
def __init__(self, network, name):
"""Create and initialize a PackageInfoView object."""
self._model = network
self._name = name
def transitive_dependencies(self):
"""Return a set containing the transitive dependencies of the package."""
return nx.ancestors(self._model.network, self._name)
def transitive_dependants(self):
"""Return a set containing the transitive dependants of the package."""
return nx.descendants(self._model.network, self._name)
def direct_dependencies(self):
"""Return a set containing the direct dependencies of the package."""
return set(self._model.network.predecessors(self._name))
def direct_dependants(self):
"""Return a set containing the direct dependants of the package."""
return set(self._model.network.successors(self._name))
def scc(self):
"""
Return the strongly connected component(SCC) of the package.
The SCC of a package is the set of packages in the network that are strongly connected to it in
the underlying graph -that is to say each of the packages in the scc are transitively dependant on
each other.-
"""
dag_id = self._model.dag.graph['mapping'][self._name]
return self._model.dag.nodes[dag_id]['members']
def reach(self):
"""Return the value of the Olivia reach metric for the package."""
return len(nx.descendants(self._model.network, self._name)) + 1
def surface(self):
"""Return the value of the Olivia surface metric for the package."""
return len(nx.ancestors(self._model.network, self._name)) + 1
def impact(self):
"""Return the value of the Olivia impact metric for the package."""
out_degrees = self._model.network.subgraph({self._name} |
nx.descendants(self._model.network, self._name)).out_degree()
return np.array([n[1] for n in out_degrees]).sum()
def coupling_interface_to(self, n):
"""Return the coupling interface for this package over n."""
return coupling_interface(self._model, self._name, n)
def coupling_interface_from(self, n):
"""Return the coupling interface for n over this package."""
return coupling_interface(self._model, n, self._name)
def coupling_profile(self):
"""Return the coupling profile for this package."""
return coupling_profile(self._model, self._name)
class OliviaNetwork:
"""
Model for studying the vulnerability of package dependency networks.
Uses a directed acyclic graph to represent the fundamental structure of the network. Also acts as a gateway for
querying cached metric values for the packages in the network.
"""
def __init__(self, file=None):
"""
Create and initializes an OliviaNetwork object.
Parameters
----------
file: file or string
File or file name to read an OliviaNetwork model from.
"""
if file is None:
self._dag = {}
self._metrics_cache = dict() # In-model metrics cache
self._network = {}
else:
self.load(file)
def load(self, file):
"""
Load an OliviaNetwork model from file.
Parameters
----------
file: file or string
File or file name to read an OliviaNetwork model from.
Returns
-------
None
"""
with gzip.GzipFile(file, 'rb') as f:
load_dict = pickle.load(f)
self._network = nx.from_dict_of_lists(load_dict['network'], create_using=nx.DiGraph())
self._dag = load_dict['dag']
self._metrics_cache = load_dict['cache']
def save(self, file):
"""
Save an OliviaNetwork model to file.
Parameters
----------
file: file or string
File or file name to write an OliviaNetwork model to.
Returns
-------
None
"""
save_dict = {'network': nx.to_dict_of_lists(self._network),
'dag': self._dag,
'cache': self._metrics_cache}
with gzip.GzipFile(file, 'wb') as f:
pickle.dump(save_dict, f, protocol=pickle.HIGHEST_PROTOCOL)
@property
def network(self):
"""Return the package network."""
return self._network
@property
def dag(self):
"""Return the model's underlying DAG graph."""
return self._dag
def get_metric(self, metric_class, **kwargs):
"""
Compute or get form the internal cache metric values for the packages in the network.
If metric values are not available, the are computed instantiating metric_class and calling compute(),and
are subsequently stored for future use.
Parameters
----------
metric_class: class
Class implementing compute()
kwargs: keyword args
Arguments for the metric class constructor.
Returns
-------
ms: object
An object containing the computed metric values. For Olivia metrics this is always a MetricStats instance.
"""
if metric_class.__name__ in self._metrics_cache:
print(f'{metric_class.__name__} retrieved from metrics cache')
else:
self._metrics_cache[metric_class.__name__] = metric_class(self, **kwargs).compute()
return self._metrics_cache[metric_class.__name__]
def sccs(self):
"""
Return a generator of strongly connected components (SCCs) present in the network.
SCCs are sets of packages in which all the packages are transitively
dependent on each other, so SCCs of more than one package imply the existence of cycles in the network.
This method generates the partition into strongly connected components of the directed graph
underlying the package network.
"""
for p in self.dag:
yield self.dag.nodes[p]['members']
def sorted_clusters(self):
"""Return a list of clusters in reverse size order."""
return sorted(self.sccs(), key=lambda x: len(x), reverse=True)
def __getitem__(self, package):
"""Return a ~DegreeInfoView of the package."""
return PackageInfoView(self, package)
def __len__(self):
"""Return the number of packages in the network."""
return len(self.network)
def __iter__(self):
"""Return an iterator over the packages in the network."""
return iter(self.network)
def build_model(self, source):
"""
Build the model from specified source.
Parameters
----------
source: File or file name or Networkx DiGraph
Source to build the model from. Files should be in adjacency list format.
Filenames ending in .gz or .bz2 will be uncompressed.
Returns
-------
None
"""
if isinstance(source, nx.DiGraph):
self._network = source
else:
print("Reading dependencies file...")
self._network = nx.read_adjlist(source, create_using=nx.DiGraph())
print("Building Olivia Model")
print(' Finding strongly connected components (SCCs)...')
scc = nx.strongly_connected_components(self._network)
print(' Building condensation network...')
GC = nx.condensation(self._network, list(scc))
print(' Adding structural meta-data...')
# Model includes weighted edges to represent
# ingoing/outgoing dependencies to/from SCC
for e in GC.edges:
GC.edges[e]['weight'] = 0
for n in GC.nodes:
GC.nodes[n]['intra_edges'] = 0
for n in self._network:
for e in self._network.in_edges(n):
u, v = e
map_u = GC.graph['mapping'][u]
map_v = GC.graph['mapping'][v]
if map_u == map_v:
# Number of edges inside SCC
GC.nodes[map_u]['intra_edges'] += 1
else:
# Weight for edge to/from SCC
GC.edges[(map_u, map_v)]['weight'] += 1
self._dag = GC
print(" Done")
Classes
class OliviaNetwork (file=None)
-
Model for studying the vulnerability of package dependency networks.
Uses a directed acyclic graph to represent the fundamental structure of the network. Also acts as a gateway for querying cached metric values for the packages in the network.
Create and initializes an OliviaNetwork object.
Parameters
file
:file
orstring
- File or file name to read an OliviaNetwork model from.
Expand source code
class OliviaNetwork: """ Model for studying the vulnerability of package dependency networks. Uses a directed acyclic graph to represent the fundamental structure of the network. Also acts as a gateway for querying cached metric values for the packages in the network. """ def __init__(self, file=None): """ Create and initializes an OliviaNetwork object. Parameters ---------- file: file or string File or file name to read an OliviaNetwork model from. """ if file is None: self._dag = {} self._metrics_cache = dict() # In-model metrics cache self._network = {} else: self.load(file) def load(self, file): """ Load an OliviaNetwork model from file. Parameters ---------- file: file or string File or file name to read an OliviaNetwork model from. Returns ------- None """ with gzip.GzipFile(file, 'rb') as f: load_dict = pickle.load(f) self._network = nx.from_dict_of_lists(load_dict['network'], create_using=nx.DiGraph()) self._dag = load_dict['dag'] self._metrics_cache = load_dict['cache'] def save(self, file): """ Save an OliviaNetwork model to file. Parameters ---------- file: file or string File or file name to write an OliviaNetwork model to. Returns ------- None """ save_dict = {'network': nx.to_dict_of_lists(self._network), 'dag': self._dag, 'cache': self._metrics_cache} with gzip.GzipFile(file, 'wb') as f: pickle.dump(save_dict, f, protocol=pickle.HIGHEST_PROTOCOL) @property def network(self): """Return the package network.""" return self._network @property def dag(self): """Return the model's underlying DAG graph.""" return self._dag def get_metric(self, metric_class, **kwargs): """ Compute or get form the internal cache metric values for the packages in the network. If metric values are not available, the are computed instantiating metric_class and calling compute(),and are subsequently stored for future use. Parameters ---------- metric_class: class Class implementing compute() kwargs: keyword args Arguments for the metric class constructor. Returns ------- ms: object An object containing the computed metric values. For Olivia metrics this is always a MetricStats instance. """ if metric_class.__name__ in self._metrics_cache: print(f'{metric_class.__name__} retrieved from metrics cache') else: self._metrics_cache[metric_class.__name__] = metric_class(self, **kwargs).compute() return self._metrics_cache[metric_class.__name__] def sccs(self): """ Return a generator of strongly connected components (SCCs) present in the network. SCCs are sets of packages in which all the packages are transitively dependent on each other, so SCCs of more than one package imply the existence of cycles in the network. This method generates the partition into strongly connected components of the directed graph underlying the package network. """ for p in self.dag: yield self.dag.nodes[p]['members'] def sorted_clusters(self): """Return a list of clusters in reverse size order.""" return sorted(self.sccs(), key=lambda x: len(x), reverse=True) def __getitem__(self, package): """Return a ~DegreeInfoView of the package.""" return PackageInfoView(self, package) def __len__(self): """Return the number of packages in the network.""" return len(self.network) def __iter__(self): """Return an iterator over the packages in the network.""" return iter(self.network) def build_model(self, source): """ Build the model from specified source. Parameters ---------- source: File or file name or Networkx DiGraph Source to build the model from. Files should be in adjacency list format. Filenames ending in .gz or .bz2 will be uncompressed. Returns ------- None """ if isinstance(source, nx.DiGraph): self._network = source else: print("Reading dependencies file...") self._network = nx.read_adjlist(source, create_using=nx.DiGraph()) print("Building Olivia Model") print(' Finding strongly connected components (SCCs)...') scc = nx.strongly_connected_components(self._network) print(' Building condensation network...') GC = nx.condensation(self._network, list(scc)) print(' Adding structural meta-data...') # Model includes weighted edges to represent # ingoing/outgoing dependencies to/from SCC for e in GC.edges: GC.edges[e]['weight'] = 0 for n in GC.nodes: GC.nodes[n]['intra_edges'] = 0 for n in self._network: for e in self._network.in_edges(n): u, v = e map_u = GC.graph['mapping'][u] map_v = GC.graph['mapping'][v] if map_u == map_v: # Number of edges inside SCC GC.nodes[map_u]['intra_edges'] += 1 else: # Weight for edge to/from SCC GC.edges[(map_u, map_v)]['weight'] += 1 self._dag = GC print(" Done")
Instance variables
var dag
-
Return the model's underlying DAG graph.
Expand source code
@property def dag(self): """Return the model's underlying DAG graph.""" return self._dag
var network
-
Return the package network.
Expand source code
@property def network(self): """Return the package network.""" return self._network
Methods
def build_model(self, source)
-
Build the model from specified source.
Parameters
source
:File
orfile name
orNetworkx DiGraph
- Source to build the model from. Files should be in adjacency list format. Filenames ending in .gz or .bz2 will be uncompressed.
Returns
None
Expand source code
def build_model(self, source): """ Build the model from specified source. Parameters ---------- source: File or file name or Networkx DiGraph Source to build the model from. Files should be in adjacency list format. Filenames ending in .gz or .bz2 will be uncompressed. Returns ------- None """ if isinstance(source, nx.DiGraph): self._network = source else: print("Reading dependencies file...") self._network = nx.read_adjlist(source, create_using=nx.DiGraph()) print("Building Olivia Model") print(' Finding strongly connected components (SCCs)...') scc = nx.strongly_connected_components(self._network) print(' Building condensation network...') GC = nx.condensation(self._network, list(scc)) print(' Adding structural meta-data...') # Model includes weighted edges to represent # ingoing/outgoing dependencies to/from SCC for e in GC.edges: GC.edges[e]['weight'] = 0 for n in GC.nodes: GC.nodes[n]['intra_edges'] = 0 for n in self._network: for e in self._network.in_edges(n): u, v = e map_u = GC.graph['mapping'][u] map_v = GC.graph['mapping'][v] if map_u == map_v: # Number of edges inside SCC GC.nodes[map_u]['intra_edges'] += 1 else: # Weight for edge to/from SCC GC.edges[(map_u, map_v)]['weight'] += 1 self._dag = GC print(" Done")
def get_metric(self, metric_class, **kwargs)
-
Compute or get form the internal cache metric values for the packages in the network.
If metric values are not available, the are computed instantiating metric_class and calling compute(),and are subsequently stored for future use.
Parameters
metric_class
:class
- Class implementing compute()
kwargs
:keyword args
- Arguments for the metric class constructor.
Returns
ms
:object
- An object containing the computed metric values. For Olivia metrics this is always a MetricStats instance.
Expand source code
def get_metric(self, metric_class, **kwargs): """ Compute or get form the internal cache metric values for the packages in the network. If metric values are not available, the are computed instantiating metric_class and calling compute(),and are subsequently stored for future use. Parameters ---------- metric_class: class Class implementing compute() kwargs: keyword args Arguments for the metric class constructor. Returns ------- ms: object An object containing the computed metric values. For Olivia metrics this is always a MetricStats instance. """ if metric_class.__name__ in self._metrics_cache: print(f'{metric_class.__name__} retrieved from metrics cache') else: self._metrics_cache[metric_class.__name__] = metric_class(self, **kwargs).compute() return self._metrics_cache[metric_class.__name__]
def load(self, file)
-
Load an OliviaNetwork model from file.
Parameters
file
:file
orstring
- File or file name to read an OliviaNetwork model from.
Returns
None
Expand source code
def load(self, file): """ Load an OliviaNetwork model from file. Parameters ---------- file: file or string File or file name to read an OliviaNetwork model from. Returns ------- None """ with gzip.GzipFile(file, 'rb') as f: load_dict = pickle.load(f) self._network = nx.from_dict_of_lists(load_dict['network'], create_using=nx.DiGraph()) self._dag = load_dict['dag'] self._metrics_cache = load_dict['cache']
def save(self, file)
-
Save an OliviaNetwork model to file.
Parameters
file
:file
orstring
- File or file name to write an OliviaNetwork model to.
Returns
None
Expand source code
def save(self, file): """ Save an OliviaNetwork model to file. Parameters ---------- file: file or string File or file name to write an OliviaNetwork model to. Returns ------- None """ save_dict = {'network': nx.to_dict_of_lists(self._network), 'dag': self._dag, 'cache': self._metrics_cache} with gzip.GzipFile(file, 'wb') as f: pickle.dump(save_dict, f, protocol=pickle.HIGHEST_PROTOCOL)
def sccs(self)
-
Return a generator of strongly connected components (SCCs) present in the network.
SCCs are sets of packages in which all the packages are transitively dependent on each other, so SCCs of more than one package imply the existence of cycles in the network. This method generates the partition into strongly connected components of the directed graph underlying the package network.
Expand source code
def sccs(self): """ Return a generator of strongly connected components (SCCs) present in the network. SCCs are sets of packages in which all the packages are transitively dependent on each other, so SCCs of more than one package imply the existence of cycles in the network. This method generates the partition into strongly connected components of the directed graph underlying the package network. """ for p in self.dag: yield self.dag.nodes[p]['members']
def sorted_clusters(self)
-
Return a list of clusters in reverse size order.
Expand source code
def sorted_clusters(self): """Return a list of clusters in reverse size order.""" return sorted(self.sccs(), key=lambda x: len(x), reverse=True)
class PackageInfoView (network, name)
-
Helper class for retrieving individual package information and metrics.
For network-wide analysis use suitable classes and methods from ~networkmetrics, ~packagemetrics or ~coupling instead, as individual computation can be orders of magnitude slower.
Create and initialize a PackageInfoView object.
Expand source code
class PackageInfoView: """ Helper class for retrieving individual package information and metrics. For network-wide analysis use suitable classes and methods from ~networkmetrics, ~packagemetrics or ~coupling instead, as individual computation can be orders of magnitude slower. """ def __init__(self, network, name): """Create and initialize a PackageInfoView object.""" self._model = network self._name = name def transitive_dependencies(self): """Return a set containing the transitive dependencies of the package.""" return nx.ancestors(self._model.network, self._name) def transitive_dependants(self): """Return a set containing the transitive dependants of the package.""" return nx.descendants(self._model.network, self._name) def direct_dependencies(self): """Return a set containing the direct dependencies of the package.""" return set(self._model.network.predecessors(self._name)) def direct_dependants(self): """Return a set containing the direct dependants of the package.""" return set(self._model.network.successors(self._name)) def scc(self): """ Return the strongly connected component(SCC) of the package. The SCC of a package is the set of packages in the network that are strongly connected to it in the underlying graph -that is to say each of the packages in the scc are transitively dependant on each other.- """ dag_id = self._model.dag.graph['mapping'][self._name] return self._model.dag.nodes[dag_id]['members'] def reach(self): """Return the value of the Olivia reach metric for the package.""" return len(nx.descendants(self._model.network, self._name)) + 1 def surface(self): """Return the value of the Olivia surface metric for the package.""" return len(nx.ancestors(self._model.network, self._name)) + 1 def impact(self): """Return the value of the Olivia impact metric for the package.""" out_degrees = self._model.network.subgraph({self._name} | nx.descendants(self._model.network, self._name)).out_degree() return np.array([n[1] for n in out_degrees]).sum() def coupling_interface_to(self, n): """Return the coupling interface for this package over n.""" return coupling_interface(self._model, self._name, n) def coupling_interface_from(self, n): """Return the coupling interface for n over this package.""" return coupling_interface(self._model, n, self._name) def coupling_profile(self): """Return the coupling profile for this package.""" return coupling_profile(self._model, self._name)
Methods
def coupling_interface_from(self, n)
-
Return the coupling interface for n over this package.
Expand source code
def coupling_interface_from(self, n): """Return the coupling interface for n over this package.""" return coupling_interface(self._model, n, self._name)
def coupling_interface_to(self, n)
-
Return the coupling interface for this package over n.
Expand source code
def coupling_interface_to(self, n): """Return the coupling interface for this package over n.""" return coupling_interface(self._model, self._name, n)
def coupling_profile(self)
-
Return the coupling profile for this package.
Expand source code
def coupling_profile(self): """Return the coupling profile for this package.""" return coupling_profile(self._model, self._name)
def direct_dependants(self)
-
Return a set containing the direct dependants of the package.
Expand source code
def direct_dependants(self): """Return a set containing the direct dependants of the package.""" return set(self._model.network.successors(self._name))
def direct_dependencies(self)
-
Return a set containing the direct dependencies of the package.
Expand source code
def direct_dependencies(self): """Return a set containing the direct dependencies of the package.""" return set(self._model.network.predecessors(self._name))
def impact(self)
-
Return the value of the Olivia impact metric for the package.
Expand source code
def impact(self): """Return the value of the Olivia impact metric for the package.""" out_degrees = self._model.network.subgraph({self._name} | nx.descendants(self._model.network, self._name)).out_degree() return np.array([n[1] for n in out_degrees]).sum()
def reach(self)
-
Return the value of the Olivia reach metric for the package.
Expand source code
def reach(self): """Return the value of the Olivia reach metric for the package.""" return len(nx.descendants(self._model.network, self._name)) + 1
def scc(self)
-
Return the strongly connected component(SCC) of the package.
The SCC of a package is the set of packages in the network that are strongly connected to it in the underlying graph -that is to say each of the packages in the scc are transitively dependant on each other.-
Expand source code
def scc(self): """ Return the strongly connected component(SCC) of the package. The SCC of a package is the set of packages in the network that are strongly connected to it in the underlying graph -that is to say each of the packages in the scc are transitively dependant on each other.- """ dag_id = self._model.dag.graph['mapping'][self._name] return self._model.dag.nodes[dag_id]['members']
def surface(self)
-
Return the value of the Olivia surface metric for the package.
Expand source code
def surface(self): """Return the value of the Olivia surface metric for the package.""" return len(nx.ancestors(self._model.network, self._name)) + 1
def transitive_dependants(self)
-
Return a set containing the transitive dependants of the package.
Expand source code
def transitive_dependants(self): """Return a set containing the transitive dependants of the package.""" return nx.descendants(self._model.network, self._name)
def transitive_dependencies(self)
-
Return a set containing the transitive dependencies of the package.
Expand source code
def transitive_dependencies(self): """Return a set containing the transitive dependencies of the package.""" return nx.ancestors(self._model.network, self._name)