r"""Module to create constructors of quality matrix and null models.
The generalized Markov Stability is given as
.. math::
Q_{gen}(t,H) = \mathrm{Tr} \left [H^T \left (F(t)-\sum_{k=1}^m v_{2k-1} v_{2k}^T\right)H\right]
where :math:`F(t)` is the quality matrix and :math:`v_k` are null model vectors.
In the following we denote by :math:`A` the adjacency matrix of a graph with :math:`N` nodes and
:math:`M` edges. The out-degree of the graph is given by :math:`d=A\boldsymbol{1}`, where
:math:`\boldsymbol{1}` is the vector of ones, and we denote the diagonal degree matrix by
:math:`D=\mathrm{diag}(d)`.
"""
import logging
import sys
import numpy as np
import numpy.linalg as la
import scipy.sparse as sp
from threadpoolctl import threadpool_limits
L = logging.getLogger(__name__)
THRESHOLD = 1e-8
_DTYPE = np.float64
[docs]def load_constructor(constructor, graph, **kwargs):
"""Load a constructor from its name, or as a custom Constructor class."""
if isinstance(constructor, str):
if graph is None:
raise Exception(f"No graph was provided with a generic constructor {constructor}")
try:
return getattr(sys.modules[__name__], f"constructor_{constructor}")(graph, **kwargs)
except AttributeError as exc:
raise Exception(f"Could not load constructor {constructor}") from exc
if not isinstance(constructor, Constructor):
raise Exception("Only Constructor class object can be used.")
return constructor
def _limit_numpy(f):
"""Wrapper to limit threads used by numpy."""
@threadpool_limits.wrap(limits=1, user_api="blas")
@threadpool_limits.wrap(limits=1, user_api="openmp")
def limit(*args, **kwargs):
return f(*args, **kwargs)
return limit
def _compute_spectral_decomp(matrix):
"""Solve eigenalue problem for symmetric matrix."""
lambdas, v = la.eigh(matrix.toarray())
vinv = la.inv(v.real)
return lambdas.real, v.real, vinv
def _check_total_degree(degrees):
"""Ensures the sum(degree) > 0."""
if degrees.sum() < 1e-10:
raise Exception("The total degree = 0, we cannot proceed further")
def _get_spectral_gap(laplacian):
"""Compute spectral gap."""
spectral_gap = np.round(max(np.real(sp.linalg.eigs(laplacian, which="SM", k=2)[0])), 8)
L.info("Spectral gap = 10^%s", np.around(np.log10(spectral_gap), 2))
return spectral_gap
[docs]class Constructor:
"""Parent class for generalized Markov Stability constructor.
This class encodes generalized Markov Stability through the quality matrix and null models.
Use the method prepare to load and compute scale independent quantities, and the method get_data
to return quality matrix, null model, and possible global shift.
"""
def __init__(self, graph, with_spectral_gap=False, exp_comp_mode="spectral", **kwargs):
"""The constructor calls the prepare method upon initialisation.
Args:
graph (csgraph): graph for which to run clustering
with_spectral_gap (bool): set to True to use spectral gap to rescale
kwargs (dict): any other properties to pass to the constructor.
exp_comp_mode (str): mode to compute matrix exponential, can be expm or spectral
"""
self.graph = sp.csr_matrix(graph)
self.with_spectral_gap = with_spectral_gap
self.spectral_gap = None
self.exp_comp_mode = exp_comp_mode
# these three variable can be used in prepare method
self.partial_quality_matrix = None
self.partial_null_model = None
self.spectral_decomp = None, None, None
self.threshold = THRESHOLD
self.prepare(**kwargs)
def _get_exp(self, scale):
"""Compute matrix exponential at a given scale."""
if self.exp_comp_mode == "expm":
exp = sp.linalg.expm(-scale * self.partial_quality_matrix.toarray().astype(_DTYPE))
if self.exp_comp_mode == "spectral":
lambdas, v, vinv = self.spectral_decomp
exp = v @ np.diag(np.exp(-scale * lambdas)) @ vinv
exp[np.abs(exp) < self.threshold * np.max(exp)] = 0.0
return sp.csc_matrix(exp)
[docs] def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
[docs] def get_data(self, scale):
"""Return quality and null model at given scale as well as global shift (or None).
User has to define the _get_data so we can enure numpy does not use multiple threads
"""
return self._get_data(scale)
def _get_data(self, scale):
"""Method to be defined in child classes for get_data."""
[docs]class constructor_linearized(Constructor):
r"""Constructor for continuous linearized Markov Stability.
The quality matrix is:
.. math::
F(t) = t\frac{A}{2M},
and the associated null model is :math:`v_1=v_2=\frac{d}{2M}`.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
degrees = np.array(self.graph.sum(1)).flatten()
_check_total_degree(degrees)
pi = degrees / degrees.sum()
self.partial_null_model = np.array([pi, pi])
if self.with_spectral_gap:
laplacian = sp.csgraph.laplacian(self.graph, normed=False)
self.spectral_gap = _get_spectral_gap(laplacian)
self.partial_quality_matrix = (self.graph / degrees.sum()).astype(_DTYPE)
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
if self.with_spectral_gap:
scale /= self.spectral_gap
return {
"quality": scale * self.partial_quality_matrix,
"null_model": self.partial_null_model,
"shift": float(1 - scale),
}
[docs]class constructor_continuous_combinatorial(Constructor):
r"""Constructor for continuous combinatorial Markov Stability.
This implementation follows equation (16) in [1]_. The quality matrix is:
.. math::
F(t) = \Pi\exp(-tL/<d>)
where :math:`<d>=(\boldsymbol{1}^T D \boldsymbol{1})/N` is the average degree,
:math:`L=D-A` is the combinatorial Laplacian and :math:`\Pi=\mathrm{diag}(\pi)`,
with null model :math:`v_1=v_2=\pi=\frac{\boldsymbol{1}}{N}`.
References:
.. [1] Lambiotte, R., Delvenne, J.-C., & Barahona, M. (2019). Random Walks, Markov
Processes and the Multiscale Modular Organization of Complex Networks.
IEEE Trans. Netw. Sci. Eng., 1(2), p. 76-90.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
laplacian, degrees = sp.csgraph.laplacian(self.graph, return_diag=True, normed=False)
_check_total_degree(degrees)
laplacian /= degrees.mean()
pi = np.ones(self.graph.shape[0]) / self.graph.shape[0]
self.partial_null_model = np.array([pi, pi], dtype=_DTYPE)
if self.with_spectral_gap:
self.spectral_gap = _get_spectral_gap(laplacian)
if self.exp_comp_mode == "spectral":
self.spectral_decomp = _compute_spectral_decomp(laplacian)
if self.exp_comp_mode == "expm":
self.partial_quality_matrix = laplacian
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
if self.with_spectral_gap:
scale /= self.spectral_gap
exp = self._get_exp(scale)
quality_matrix = sp.diags(self.partial_null_model[0]).dot(exp)
return {"quality": quality_matrix, "null_model": self.partial_null_model}
[docs]class constructor_continuous_normalized(Constructor):
r"""Constructor for continuous normalized Markov Stability.
This implementation follows equation (10) in [1]_. The quality matrix is:
.. math::
F(t) = \Pi\exp(-tL)
where :math:`L=D^{-1}(D-A)` is the random-walk normalized Laplacian and
:math:`\Pi=\mathrm{diag}(\pi)` with null model :math:`v_1=v_2=\pi=\frac{d}{2M}`.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
laplacian, degrees = sp.csgraph.laplacian(self.graph, return_diag=True, normed=False)
_check_total_degree(degrees)
normed_laplacian = sp.diags(1.0 / degrees).dot(laplacian)
pi = degrees / degrees.sum()
self.partial_null_model = np.array([pi, pi], dtype=_DTYPE)
if self.with_spectral_gap:
self.spectral_gap = _get_spectral_gap(normed_laplacian)
if self.exp_comp_mode == "spectral":
self.spectral_decomp = _compute_spectral_decomp(normed_laplacian)
if self.exp_comp_mode == "expm":
self.partial_quality_matrix = normed_laplacian
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
if self.with_spectral_gap:
scale /= self.spectral_gap
exp = self._get_exp(scale)
quality_matrix = sp.diags(self.partial_null_model[0]).dot(exp)
return {"quality": quality_matrix, "null_model": self.partial_null_model}
[docs]class constructor_signed_modularity(Constructor):
"""Constructor of signed modularity.
This implementation is equation (18) of [2]_, where quality is the adjacency matrix and
the null model is the difference between the standard modularity null models based on
positive and negative degree vectors.
References:
.. [2] Gomez, S., Jensen, P., & Arenas, A. (2009). Analysis of community structure in
networks of correlated data. Physical Review E, 80(1), 016114.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
adj_pos = self.graph.copy()
adj_pos[self.graph < 0] = 0.0
adj_neg = -self.graph.copy()
adj_neg[self.graph > 0] = 0.0
deg_plus = adj_pos.sum(1).flatten()
deg_neg = adj_neg.sum(1).flatten()
deg_norm = deg_plus.sum() + deg_neg.sum()
self.partial_null_model = np.array(
[
deg_plus / deg_norm,
deg_plus / deg_plus.sum(),
-deg_neg / deg_neg.sum(),
deg_neg / deg_norm,
]
)
self.partial_quality_matrix = self.graph / deg_norm
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
return {
"quality": scale * self.partial_quality_matrix,
"null_model": self.partial_null_model,
}
[docs]class constructor_signed_combinatorial(Constructor):
r"""Constructor for continuous signed combinatorial Markov Stability.
This implementation follows equation (19) in [3]_. The quality matrix is:
.. math::
F(t) = \exp(-tL)^T\exp(-tL)
where :math:`L=D_{\mathrm{abs}}-A` is the signed combinatorial Laplacian,
:math:`D_{\mathrm{abs}}=\mathrm{diag}(d_\mathrm{abs})` the diagonal matrix of absolute node
strengths :math:`d_\mathrm{abs}`, and the associated null model is given by
:math:`v_1=v_2=\boldsymbol{0}`, where :math:`\boldsymbol{0}` is the vector of zeros.
References:
.. [3] Schaub, M., Delvenne, J.-C., Lambiotte, R., & Barahona, M. (2019). Multiscale
dynamical embeddings of complex networks. Physical Review E, 99(6), 062308.
"""
[docs] def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
degrees_abs = np.array(abs(self.graph).sum(1)).flatten()
laplacian = sp.diags(degrees_abs) - self.graph
if self.exp_comp_mode == "spectral": # pragma: no cover
self.spectral_decomp = _compute_spectral_decomp(laplacian)
if self.exp_comp_mode == "expm":
self.partial_quality_matrix = laplacian
zeros = np.zeros(self.graph.shape[0])
self.partial_null_model = np.array([zeros, zeros])
[docs] def get_data(self, scale):
"""Return quality and null model at given scale."""
exp = self._get_exp(scale)
quality_matrix = exp.T.dot(exp)
return {"quality": quality_matrix, "null_model": self.partial_null_model}
[docs]class constructor_directed(Constructor):
r"""Constructor for directed Markov stability.
The quality matrix is:
.. math::
F(t)=\Pi \exp\left(t \left(M(\alpha)-I\right)\right)
where :math:`I` denotes the identity matrix, :math:`M(\alpha)` is the transition matrix of a
random walk with teleportation and damping factor :math:`0\le \alpha < 1`, and
:math:`\Pi=\mathrm{diag}(\pi)` for the associated null model :math:`v_1=v_2=\pi` given by the
eigenvector solving :math:`\pi M(\alpha) = \pi`, which is related to PageRank. See [1]_ for
details.
The transition matrix :math:`M(\alpha)` is given by
.. math::
M(\alpha) = \alpha D^{-1}A+\left((1-\alpha)I+\alpha \mathrm{diag}(a)\right)
\frac{\boldsymbol{1}\boldsymbol{1}^T}{N},
where :math:`D` denotes the diagonal matrix of out-degrees with :math:`D_{ii}=1` if the
out-degree :math:`d_i=0` and :math:`a` denotes the vector of dangling nodes, i.e. :math:`a_i=1`
if the out-degree :math:`d_i=0` and :math:`a_i=0` otherwise.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
assert (
self.exp_comp_mode == "expm"
), 'exp_comp_mode="expm" is required for "constructor_directed"'
alpha = kwargs.get("alpha", 0.8)
n_nodes = self.graph.shape[0]
ones = np.ones((n_nodes, n_nodes)) / n_nodes
out_degrees = np.array(self.graph.sum(1)).flatten()
_check_total_degree(out_degrees)
dinv = np.divide(1, out_degrees, where=out_degrees != 0)
self.partial_quality_matrix = sp.csr_matrix(
alpha * np.diag(dinv).dot(self.graph.toarray())
+ ((1 - alpha) * np.diag(np.ones(n_nodes)) + np.diag(alpha * (dinv == 0.0))).dot(ones)
- np.eye(n_nodes)
)
pi = abs(sp.linalg.eigs(self.partial_quality_matrix.transpose(), which="SM", k=1)[1][:, 0])
pi /= pi.sum()
self.partial_null_model = np.array([pi, pi])
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
exp = self._get_exp(-scale)
quality_matrix = sp.diags(self.partial_null_model[0]).dot(exp)
return {"quality": quality_matrix, "null_model": self.partial_null_model}
[docs]class constructor_linearized_directed(Constructor):
r"""Constructor for linearized directed Markov stability.
The quality matrix is:
.. math::
F(t)=\Pi t M(\alpha)
where :math:`M(\alpha)` is the transition matrix of a random walk with teleportation and
damping factor :math:`0\le \alpha < 1`, and :math:`\Pi=\mathrm{diag}(\pi)` for the associated
null model :math:`v_1=v_2=\pi` given by the eigenvector solving :math:`\pi M(\alpha) = \pi`,
which is related to PageRank.
The transition matrix :math:`M(\alpha)` is given by
.. math::
M(\alpha) = \alpha D^{-1}A+\left((1-\alpha)I+\alpha \mathrm{diag}(a)\right)
\frac{\boldsymbol{1}\boldsymbol{1}^T}{N},
where :math:`I` denotes the identity matrix, :math:`D` denotes the diagonal matrix of
out-degrees with :math:`D_{ii}=1` if the out-degree :math:`d_i=0` and :math:`a` denotes the
vector of dangling nodes, i.e. :math:`a_i=1` if the out-degree :math:`d_i=0` and :math:`a_i=0`
otherwise.
"""
[docs] @_limit_numpy
def prepare(self, **kwargs):
"""Prepare the constructor with non-scale dependent computations."""
alpha = kwargs.get("alpha", 0.8)
n_nodes = self.graph.shape[0]
out_degrees = np.array(self.graph.sum(1)).flatten()
_check_total_degree(out_degrees)
dinv = np.divide(1, out_degrees, where=out_degrees != 0)
if alpha < 1:
ones = np.ones((n_nodes, n_nodes)) / n_nodes
self.partial_quality_matrix = sp.csr_matrix(
alpha * np.diag(dinv).dot(self.graph.toarray())
+ ((1 - alpha) * np.diag(np.ones(n_nodes)) + np.diag(alpha * (dinv == 0.0))).dot(
ones
)
- np.eye(n_nodes)
)
if alpha == 1:
self.partial_quality_matrix = sp.csr_matrix(
sp.diags(dinv).dot(self.graph) - sp.diags(np.ones(n_nodes))
)
pi = abs(sp.linalg.eigs(self.partial_quality_matrix.transpose(), which="SM", k=1)[1][:, 0])
pi /= pi.sum()
self.partial_null_model = np.array([pi, pi])
@_limit_numpy
def _get_data(self, scale):
"""Return quality and null model at given scale."""
quality_matrix = sp.diags(self.partial_null_model[0]).dot(
scale * self.partial_quality_matrix
)
return {"quality": quality_matrix, "null_model": self.partial_null_model}