r"""PyGenStability code to solve generalized Markov Stability including Markov stability.
The generalized Markov Stability is of the form
.. math::
Q_{gen}(t,H) = \mathrm{Tr} \left [H^T \left (F(t)-\sum_{k=1}^m v_{2k-1} v_{2k}^T\right)H\right]
where :math:`F(t)` is the quality matrix and :math:`v_k` are null model vectors.
The choice of the quality matrix and null model vectors are arbitrary in the generalized
Markov Stability setting, and can be parametrised via built-in constructors, or specified by
the user via the constructor module.
"""
import itertools
import logging
import multiprocessing
from collections import defaultdict
from functools import partial
from functools import wraps
from time import time
try:
import igraph as ig
import leidenalg
_NO_LEIDEN = False
except ImportError: # pragma: no cover
_NO_LEIDEN = True
import numpy as np
import scipy.sparse as sp
from sklearn.metrics import mutual_info_score
from sklearn.metrics.cluster import entropy
from tqdm import tqdm
try:
from pygenstability import generalized_louvain
_NO_LOUVAIN = False
except ImportError: # pragma: no cover
_NO_LOUVAIN = True
from pygenstability.constructors import load_constructor
from pygenstability.io import save_results
from pygenstability.optimal_scales import identify_optimal_scales
L = logging.getLogger(__name__)
_DTYPE = np.float64
def _timing(f): # pragma: no cover
"""Use as decorator to time a function excecution if logging is in DEBUG mode."""
@wraps(f)
def wrap(*args, **kw):
if logging.root.level == logging.DEBUG:
t_start = time()
result = f(*args, **kw)
t_end = time()
with open("timing.csv", "a", encoding="utf-8") as file:
print(f"{f.__name__}, {t_end - t_start}", file=file)
else:
result = f(*args, **kw)
return result
return wrap
def _get_chunksize(n_comp, pool):
"""Split jobs accross workers for speedup."""
return max(1, int(n_comp / pool._processes)) # pylint: disable=protected-access
def _graph_checks(graph, dtype=_DTYPE):
"""Do some checks and preprocessing of the graph."""
graph = sp.csr_matrix(graph, dtype=dtype)
if sp.csgraph.connected_components(graph)[0] > 1:
raise Exception(
f"Graph not connected, with {sp.csgraph.connected_components(graph)[0]} components"
)
if sp.linalg.norm(graph - graph.T) > 0:
L.warning("Your graph is directed!")
if np.min(graph) < 0:
L.warning("You have negative weights, consider using signed constructor.")
graph.eliminate_zeros()
return graph
def _get_scales(min_scale=-2.0, max_scale=0.5, n_scale=20, log_scale=True, scales=None):
"""Get the scale vectors."""
if scales is not None:
return scales
if log_scale:
return np.logspace(min_scale, max_scale, n_scale)
return np.linspace(min_scale, max_scale, n_scale)
def _get_params(all_locals):
"""Get run paramters from the local variables."""
del all_locals["graph"]
if hasattr(all_locals["constructor"], "get_data"):
all_locals["constructor"] = "custom constructor"
return all_locals
@_timing
def _get_constructor_data(constructor, scales, pool, tqdm_disable=False):
return list(
tqdm(
pool.imap(constructor.get_data, scales),
total=len(scales),
disable=tqdm_disable,
)
)
def _check_method(method): # pragma: no cover
if _NO_LEIDEN and _NO_LOUVAIN:
raise Exception("Without Louvain or Leiden solver, we cannot run PyGenStability")
if method == "louvain" and _NO_LOUVAIN:
print("Louvain is not available, we fallback to leiden.")
return "leiden"
if method == "leiden" and _NO_LEIDEN:
print("Leiden is not available, we fallback to louvain.")
return "louvain"
return method
[docs]@_timing
def run(
graph=None,
constructor="linearized",
min_scale=-2.0,
max_scale=0.5,
n_scale=20,
log_scale=True,
scales=None,
n_tries=100,
with_all_tries=False,
with_NVI=True,
n_NVI=20,
with_postprocessing=True,
with_ttprime=True,
with_spectral_gap=False,
exp_comp_mode="spectral",
result_file="results.pkl",
n_workers=4,
tqdm_disable=False,
with_optimal_scales=True,
optimal_scales_kwargs=None,
method="louvain",
constructor_kwargs=None,
):
"""This is the main function to compute graph clustering across scales with Markov Stability.
This function needs a graph object as an adjacency matrix encoded with scipy.csgraph.
The default settings will provide a fast and generic run with linearized Markov Stability,
which corresponds to modularity with a scale parameter. Other built-in constructors are
available to perform Markov Stability with matrix exponential computations. Custom constructors
can be added via the constructor module.
Additional parameters can be used to set the range and number of scales, number of trials for
generalized Markov Stability optimisation, with Louvain or Leiden algorithm.
Args:
graph (scipy.csgraph): graph to cluster, if None, the constructor cannot be a str
constructor (str/function): name of the generalized Markov Stability constructor,
or custom constructor function. It must have two arguments, graph and scale.
min_scale (float): minimum Markov scale
max_scale (float): maximum Markov scale
n_scale (int): number of scale steps
log_scale (bool): use linear or log scales for scales
scales (array): custom scale vector, if provided, it will override the other scale arguments
n_tries (int): number of generalized Markov Stability optimisation evaluations
with_all_tries (bools): store all partitions with stability values found in different
optimisation evaluations
with_NVI (bool): compute NVI(t) between generalized Markov Stability optimisations
at each scale t
n_NVI (int): number of randomly chosen generalized Markov Stability optimisations
to estimate NVI
with_postprocessing (bool): apply the final postprocessing step
with_ttprime (bool): compute the NVI(t,tprime) matrix to compare scales t and tprime
with_spectral_gap (bool): normalise scale by spectral gap
exp_comp_mode (str): mode to compute matrix exponential, can be expm or spectral
result_file (str): path to the result file
n_workers (int): number of workers for multiprocessing
tqdm_disable (bool): disable progress bars
with_optimal_scales (bool): apply optimal scale selection algorithm
optimal_scales_kwargs (dict): kwargs to pass to optimal scale selection, see
optimal_scale module.
method (str): optimiation method, louvain or leiden
constructor_kwargs (dict): additional kwargs to pass to constructor prepare method
Returns:
Results dict with the following entries
- 'run_params': dict with parameters used to run the code
- 'scales': scales of the scan
- 'number_of_communities': number of communities at each scale
- 'stability': value of stability cost function at each scale
- 'community_id': community node labels at each scale
- 'all_tries': all community node labels with stability values found in different
optimisation evaluations at each scale (included if with_all_tries==True)
- 'NVI': NVI(t) at each scale
- 'ttprime': NVI(t,tprime) matrix
- 'block_nvi': block NVI curve (included if with_optimal_scales==True)
- 'selected_partitions': selected partitions (included if with_optimal_scales==True)
"""
method = _check_method(method)
run_params = _get_params(locals())
graph = _graph_checks(graph)
scales = _get_scales(
min_scale=min_scale,
max_scale=max_scale,
n_scale=n_scale,
log_scale=log_scale,
scales=scales,
)
assert exp_comp_mode in ["spectral", "expm"]
if constructor in ("directed", "linearized_directed", "signed"):
L.info("We cannot use spectral exponential computation for directed contructor")
exp_comp_mode = "expm"
if constructor_kwargs is None:
constructor_kwargs = {}
constructor_kwargs.update(
{"with_spectral_gap": with_spectral_gap, "exp_comp_mode": exp_comp_mode}
)
constructor = load_constructor(constructor, graph, **constructor_kwargs)
with multiprocessing.Pool(n_workers) as pool:
L.info("Precompute constructors...")
constructor_data = _get_constructor_data(
constructor, scales, pool, tqdm_disable=tqdm_disable
)
if method == "leiden":
# pragma: no cover
for data in constructor_data:
assert all(data["null_model"][0] == data["null_model"][1])
L.info("Optimise stability...")
all_results = defaultdict(list)
all_results["run_params"] = run_params
for i, t in tqdm(enumerate(scales), total=n_scale, disable=tqdm_disable):
results = _run_optimisations(constructor_data[i], n_tries, pool, method)
communities = _process_runs(t, results, all_results)
if with_NVI:
_compute_NVI(communities, all_results, pool, n_partitions=min(n_NVI, n_tries))
if with_all_tries:
all_results["all_tries"].append(results)
save_results(all_results, filename=result_file)
if with_postprocessing:
L.info("Apply postprocessing...")
_apply_postprocessing(all_results, pool, constructor_data, tqdm_disable, method=method)
if with_ttprime or with_optimal_scales:
L.info("Compute ttprimes...")
_compute_ttprime(all_results, pool)
if with_optimal_scales:
L.info("Identify optimal scales...")
if optimal_scales_kwargs is None:
optimal_scales_kwargs = {
"kernel_size": max(2, int(0.1 * n_scale)),
"window_size": max(2, int(0.1 * n_scale)),
"basin_radius": max(1, int(0.01 * n_scale)),
}
all_results = identify_optimal_scales(all_results, **optimal_scales_kwargs)
save_results(all_results, filename=result_file)
return dict(all_results)
def _process_runs(scale, results, all_results):
"""Convert the optimisation outputs to useful data and save it."""
stabilities = np.array([res[0] for res in results])
communities = np.array([res[1] for res in results])
best_run_id = np.argmax(stabilities)
all_results["scales"].append(scale)
all_results["number_of_communities"].append(np.max(communities[best_run_id]) + 1)
all_results["stability"].append(stabilities[best_run_id])
all_results["community_id"].append(communities[best_run_id])
return communities
@_timing
def _compute_NVI(communities, all_results, pool, n_partitions=10):
"""Compute NVI measure between the first n_partitions."""
selected_partitions = communities[:n_partitions]
worker = partial(evaluate_NVI, partitions=selected_partitions)
index_pairs = [[i, j] for i in range(n_partitions) for j in range(n_partitions)]
chunksize = _get_chunksize(len(index_pairs), pool)
all_results["NVI"].append(np.mean(list(pool.imap(worker, index_pairs, chunksize=chunksize))))
[docs]def evaluate_NVI(index_pair, partitions):
r"""Evaluations of Normalized Variation of Information (NVI).
NVI is defined for two partitions :math:`p1` and :math:`p2` as:
.. math::
NVI = \frac{E(p1) + E(p2) - 2MI(p1, p2)}{JE(p1,p2)}
where :math:`E` is the entropy, :math:`JE` the joint entropy
and :math:`MI` the mutual information.
Args:
index_pair (list): list of two indices to select pairs of partitions
partitions (list): list of partitions
Returns:
float, Normalized Variation Information
"""
MI = mutual_info_score(partitions[index_pair[0]], partitions[index_pair[1]])
Ex = entropy(partitions[index_pair[0]])
Ey = entropy(partitions[index_pair[1]])
JE = Ex + Ey - MI
if abs(JE) < 1e-8:
return 0.0
return (JE - MI) / JE
def _to_indices(matrix, directed=False):
"""Convert a sparse matrix to indices and values.
Args:
matrix (sparse): sparse matrix to convert
directed (bool): used for Leiden, which works if graph is full
"""
if not directed:
matrix = sp.tril(matrix)
rows, cols, values = sp.find(matrix)
return (rows, cols), values
@_timing
def _optimise(_, quality_indices, quality_values, null_model, global_shift, method="louvain"):
"""Worker for generalized Markov Stability optimisation runs."""
if method == "louvain":
stability, community_id = generalized_louvain.run_louvain(
quality_indices[0],
quality_indices[1],
quality_values,
len(quality_values),
null_model,
np.shape(null_model)[0],
1.0,
)
if method == "leiden":
# this implementation uses the trick suggested by V. Traag here:
# https://github.com/vtraag/leidenalg/pull/109#issuecomment-1283963065
G = ig.Graph(edges=zip(*quality_indices), directed=True)
partitions = []
n_null = int(len(null_model) / 2)
for null in null_model[::2]:
partitions.append(
leidenalg.CPMVertexPartition(
G,
weights=quality_values,
node_sizes=null.tolist(),
correct_self_loops=True,
)
)
optimiser = leidenalg.Optimiser()
optimiser.set_rng_seed(np.random.randint(1e8))
stability = sum(partition.quality() for partition in partitions) / n_null
stability += optimiser.optimise_partition_multiplex(
partitions, layer_weights=n_null * [1.0 / n_null]
)
community_id = partitions[0].membership
return stability + global_shift, community_id
def _evaluate_quality(partition_id, qualities_index, null_model, global_shift, method="louvain"):
"""Worker for generalized Markov Stability optimisation runs."""
if method == "louvain":
quality = generalized_louvain.evaluate_quality(
qualities_index[0][0],
qualities_index[0][1],
qualities_index[1],
len(qualities_index[1]),
null_model,
np.shape(null_model)[0],
1.0,
partition_id,
)
if method == "leiden":
quality = np.mean(
[
leidenalg.CPMVertexPartition(
ig.Graph(edges=zip(*qualities_index[0]), directed=True),
initial_membership=partition_id,
weights=qualities_index[1],
node_sizes=null.tolist(),
correct_self_loops=True,
).quality()
for null in null_model[::2]
]
)
return quality + global_shift
def _run_optimisations(constructor, n_runs, pool, method="louvain"):
"""Run several generalized Markov Stability optimisation on the current quality matrix."""
quality_indices, quality_values = _to_indices(
constructor["quality"], directed=method == "leiden"
)
worker = partial(
_optimise,
quality_indices=quality_indices,
quality_values=quality_values,
null_model=constructor["null_model"],
global_shift=constructor.get("shift", 0.0),
method=method,
)
chunksize = _get_chunksize(n_runs, pool)
return pool.map(worker, range(n_runs), chunksize=chunksize)
@_timing
def _compute_ttprime(all_results, pool):
"""Compute ttprime from the stability results."""
index_pairs = list(itertools.combinations(range(len(all_results["scales"])), 2))
worker = partial(evaluate_NVI, partitions=all_results["community_id"])
chunksize = _get_chunksize(len(index_pairs), pool)
ttprime_list = pool.map(worker, index_pairs, chunksize=chunksize)
all_results["ttprime"] = np.zeros([len(all_results["scales"]), len(all_results["scales"])])
for i, ttp in enumerate(ttprime_list):
all_results["ttprime"][index_pairs[i][0], index_pairs[i][1]] = ttp
all_results["ttprime"] += all_results["ttprime"].T
@_timing
def _apply_postprocessing(all_results, pool, constructors, tqdm_disable=False, method="louvain"):
"""Apply postprocessing."""
all_results_raw = all_results.copy()
for i, constructor in tqdm(
enumerate(constructors), total=len(constructors), disable=tqdm_disable
):
worker = partial(
_evaluate_quality,
qualities_index=_to_indices(constructor["quality"]),
null_model=constructor["null_model"],
global_shift=constructor.get("shift", 0.0),
method=method,
)
best_quality_id = np.argmax(
pool.map(
worker,
all_results_raw["community_id"],
chunksize=_get_chunksize(len(all_results_raw["community_id"]), pool),
)
)
all_results["community_id"][i] = all_results_raw["community_id"][best_quality_id]
all_results["stability"][i] = all_results_raw["stability"][best_quality_id]
all_results["number_of_communities"][i] = all_results_raw["number_of_communities"][
best_quality_id
]