Source code for teneto.communitydetection.spectral


"""Temporal community detection by fitting a Grassmannian geodesic to spectral embeddings."""
from __future__ import annotations

import logging
import time
from abc import ABC, abstractmethod
from typing import Iterable, List, Tuple

import numpy as np
from scipy import sparse
from scipy.linalg import cosm, sinm
from scipy.ndimage import gaussian_filter1d
from scipy.signal import medfilt
from scipy.sparse import csr_matrix, issparse
from scipy.sparse.linalg import eigsh, svds
from sklearn.cluster import KMeans

from teneto.classes import TemporalNetwork
from teneto.utils import process_input


__all__ = ["temporal_spectral"]

logger = logging.getLogger(__name__)


def _as_sparse_snapshots(tnet: TemporalNetwork) -> Tuple[list[csr_matrix], int, int]:
    """Return sparse adjacency snapshots and their dimensions."""
    arr = tnet.df_to_array(start_at="auto")
    if arr.ndim != 3:
        raise ValueError(f"Expected a 3D array (nodes, nodes, time); got {arr.shape}")

    n_nodes, _, n_time = arr.shape
    snapshots: list[csr_matrix] = []
    for t in range(n_time):
        # Copy to avoid mutating the TemporalNetwork backing array.
        mat = np.array(arr[:, :, t], copy=True)
        np.fill_diagonal(mat, 0)
        mat = np.triu(mat, k=1)
        mat = mat + mat.T
        snapshots.append(csr_matrix(mat))
    return snapshots, n_nodes, n_time


def _labels_list_to_matrix(labels_list: Iterable[Iterable[int]]) -> np.ndarray:
    """Convert a list of per-time label vectors into a (node, time) array."""
    label_series = list(labels_list)
    if not label_series:
        raise ValueError("No community assignments were returned.")

    n_nodes = len(label_series[0])
    labels = np.empty((n_nodes, len(label_series)), dtype=int)
    for t_idx, lab in enumerate(label_series):
        lab = np.asarray(lab, dtype=int)
        if lab.shape[0] != n_nodes:
            raise ValueError("Community label lengths must match node count at each timepoint.")
        labels[:, t_idx] = lab
    return labels


[docs] def temporal_spectral( tnet, ke: int, kc: int | list[int] | str = "auto", mode: str = "simple-nsc", stable_communities: bool = False, smoothing_filter: str | None = None, smoothing_parameter: float | None = None, return_embeddings: bool = False, use_intermediate_iterations: bool = False, num_intermediate_iterations: int = 20, which_eig: str = "smallest", **mode_kwargs, ) -> np.ndarray | Tuple[np.ndarray, List[np.ndarray]]: """ Detect temporal communities by geodesically modeling spectral embeddings. Parameters ---------- tnet : array | dict | TemporalNetwork Temporal network input accepted by :func:`teneto.utils.process_input`. ke : int The dimension of the learned spectral embeddings. Should be an upper bound on the expected number of communities at any time. (In classical static spectral clustering, ``ke`` is chosen to be equal to the target number of communities ``kc`` .) kc : int | list[int] | 'auto' Number of communities to detect per snapshot. Either a single int to detect a fixed given number of communities at every snapshot, a list of ints coindexed with the snapshots, or 'auto' to select per snapshot using a benefit curve. mode : str Algorithm mode, e.g. ``'simple-nsc'`` (normalized spectral clustering, default), ``'simple-usc'`` (unnormalized), ``'simple-smm'`` (spectral modularity maximization). stable_communities : bool If ``True`` and ``kc='auto'``, a single community count is enforced across time by maximizing the benefit curve aggregated over all snapshots. smoothing_filter : str | None ``'median'`` or ``'gaussian'`` to optionally smooth the benefit-vs-``k`` curve when ``kc='auto'`` toward (slightly) more stable community counts. smoothing_parameter : float | None Parameter for the selected smoothing filter (kernel size or sigma). return_embeddings : bool If ``True``, also return the per-time embeddings used for clustering. use_intermediate_iterations : bool If ``True``, keep intermediate geodesic-fitting iterations and concatenate their embeddings. num_intermediate_iterations : int Number of intermediate iterations retained if ``use_intermediate_iterations`` is set. The idea is that since MM algorithms monotonically improve their objective, intermediate fits may also be useful as 'softer regularization' than the final geodesic fit. which_eig : str Controls which eigensolver is used inside the base smoother (``'smallest'``, ``'largest'``, ``'svd'``). mode_kwargs : Additional keyword arguments forwarded to the smoother subclass. Returns ------- communities : ndarray (nodes, time) node,time array of community assignment embeddings : list[np.ndarray], optional Returned when ``return_embeddings`` is ``True``. References ---------- Hume, J. and Balzano, L. (2025). A Spectral Framework for Tracking Communities in Evolving Networks. In *Proceedings of the Third Learning on Graphs Conference (LoG 2024)*, Proceedings of Machine Learning Research, vol. 269, pp. 9:1–9:34. Available at https://proceedings.mlr.press/v269/hume25a.html. """ tnet = process_input(tnet, ["C", "G", "TN"], "TN") snapshots, n_nodes, n_time = _as_sparse_snapshots(tnet) assignments, embeddings = _spectral_geodesic_smoothing( sadj_list=snapshots, T=n_time, num_nodes=n_nodes, ke=ke, kc=kc, stable_communities=stable_communities, mode=mode, smoothing_filter=smoothing_filter, smoothing_parameter=smoothing_parameter, return_geo_embeddings_only=False, use_intermediate_iterations=use_intermediate_iterations, num_intermediate_iterations=num_intermediate_iterations, which_eig=which_eig, **mode_kwargs, ) communities = _labels_list_to_matrix(assignments) if return_embeddings: return communities, embeddings return communities
def _is_symmetric(matrix, tol=1e-8): if issparse(matrix): return (matrix != matrix.T).nnz == 0 return np.allclose(matrix, matrix.T, atol=tol) def _theta_majorizer(H, Y, X): T = len(X) k = H.shape[1] r = np.empty((T, k), dtype=np.float64) phi = np.empty((T, k), dtype=np.float64) bias = np.empty((T, k), dtype=np.float64) symmetric_flags = [_is_symmetric(Xi) for Xi in X] if not H.flags['C_CONTIGUOUS']: H = np.ascontiguousarray(H) if not Y.flags['C_CONTIGUOUS']: Y = np.ascontiguousarray(Y) for ii in range(T): Xi = X[ii] symmetric = symmetric_flags[ii] if symmetric: XiT_H = Xi @ H XiT_Y = Xi @ Y else: if issparse(Xi): XiT = Xi.transpose().tocsr() else: XiT = Xi.T XiT_H = XiT @ H XiT_Y = XiT @ Y a = np.sum(np.abs(XiT_H)**2, axis=0) b = np.real(np.sum(XiT_Y.conj() * XiT_H, axis=0)) c = np.sum(np.abs(XiT_Y)**2, axis=0) a_minus_c_over_2 = (a - c) / 2.0 two_b = 2.0 * b np.sqrt(a_minus_c_over_2**2 + b**2, out=r[ii, :]) np.arctan2(two_b, a - c, out=phi[ii, :]) bias[ii, :] = (a + c) / 2.0 return r, phi, bias def _estimate_theta(H, Y, X, t, niter=5, tH=0, Theta_init=None): r, phi, bias = _theta_majorizer(H, Y, X) t = np.asarray(t, dtype=np.float64)[:, np.newaxis] - tH tt = 2.0 * t n2tr = -tt * r L = tt * n2tr if Theta_init is None: Theta = np.ones(H.shape[1], dtype=np.float64) # Shape: (k,) else: Theta = Theta_init.astype(np.float64).copy() # Shape: (k,) for _ in range(niter): arg = tt * Theta arg -= phi gradf = n2tr * np.sin(arg) denom = (arg + np.pi) % (2 * np.pi) - np.pi with np.errstate(divide='ignore', invalid='ignore'): curvf = np.divide(tt * gradf, denom, out=np.zeros_like(tt * gradf), where=denom!=0) curvf += L * (denom == 0) gradf_sum = np.sum(gradf, axis=0) curvf_sum = np.sum(curvf, axis=0) step = np.divide(gradf_sum, curvf_sum, out=np.zeros_like(gradf_sum), where=curvf_sum!=0) # Shape: (k,) Theta -= step return Theta def _estimate_point_tangent(H, Y, Theta, X, t, tH=0): M_AB = 0.0 for ti, Xi in zip(t, X): arg = Theta * (ti - tH) # (k,) cos_arg = np.cos(arg) # (k,) sin_arg = np.sin(arg) # (k,) Ui = H * cos_arg + Y * sin_arg if _is_symmetric(Xi): G = Xi @ Ui # (n_i, k) XG = G # for symmetric matrices Xi @ G.T = G else: G = Ui.T @ Xi # (k, n_i) XG = Xi @ G.T # (d, k) XGcos = XG * cos_arg XGsin = XG * sin_arg M_AB_i = np.concatenate((XGcos, XGsin), axis=1) #(d, 2k) M_AB += M_AB_i return M_AB def _sfit_point_tangent_geodesic(data, k, max_iter, tol=1e-5, rel_tol=1e-2, return_intermediate_iterations=False, num_intermediate_iterations=20): X, t = data # X is list of matrices, t is list of times init_start = time.time() # Extract M1 and MT M1 = X[0] MT = X[-1] # are M1 and MT symmetric? M1_symmetric = _is_symmetric(M1) MT_symmetric = _is_symmetric(MT) logger.debug("M1 symmetric: %s", M1_symmetric) logger.debug("MT symmetric: %s", MT_symmetric) # rank-k truncated SVD of M1 if M1_symmetric: if issparse(M1): S1, U1 = eigsh(M1, k=k, which='LM') else: S1, U1 = np.linalg.eigh(M1) idx = np.argsort(-S1)[:k] U1 = U1[:, idx] S1 = S1[idx] else: U1, S1, _ = svds(M1, k=k) idx = np.argsort(-S1) U1 = U1[:, idx] S1 = S1[idx] H1 = U1 # (d, k) # ..... and for MT also if MT_symmetric: if issparse(MT): ST, UT = eigsh(MT, k=k, which='LM') else: ST, UT = np.linalg.eigh(MT) idx = np.argsort(-ST)[:k] UT = UT[:, idx] ST = ST[idx] else: UT, ST, _ = svds(MT, k=k) idx = np.argsort(-ST) UT = UT[:, idx] ST = ST[idx] H_T = UT # (d, k) H1T_H_T = H1.T @ H_T # (k, k) Z, S, Q_T = np.linalg.svd(H1T_H_T, full_matrices=False) Q = Q_T.T # (k, k) if np.any((S < -1 - 1e-6) | (S > 1 + 1e-6)): # clip S logger.warning("Singular values exceeded [-1,1] by more than 1e-6; clipping.") S = np.clip(S, -1, 1) H_TQ = H_T @ Q # (d, k) # orthogonal component H1_H1T_HTQ = H1 @ (H1.T @ H_TQ) # (d, k) Orth_comp = H_TQ - H1_H1T_HTQ # (d, k) # SVD of orthocomplement F, D, G_T = np.linalg.svd(Orth_comp, full_matrices=False) # direction Y = F G_T Y = F @ G_T # (d, k) # Theta init (see paper) Theta = np.arccos(S) # (k,) #print("init time:", time.time() - init_start) #print("Starting iterations") iter_start = time.time() point_tangent_times = [] theta_times = [] conv_check_times = [] # init H = H1 H = H1.copy() # set the initial values for convergence checking H_old = H.copy() Y_old = Y.copy() Theta_old = Theta.copy() # Storage for intermediate iterations if requested if return_intermediate_iterations: intermediate_H = [] intermediate_Y = [] intermediate_Theta = [] # fix tH=0, at least until there seems to be a reason to not do so tH = 0 for itr in range(max_iter): if itr == max_iter - 1: logger.warning("Maximum point-tangent iterations reached without convergence.") #print(f"Iteration {itr}") # P-Update, P = [H, Y] pt_time = time.time() M_AB = _estimate_point_tangent(H, Y, Theta, X, t, tH) try: U, _, Vh = np.linalg.svd(M_AB, full_matrices=False) except np.linalg.LinAlgError as err: logger.error("SVD failed for M_AB (shape=%s): %s", M_AB.shape, err) logger.debug("Matrix contains NaN? %s | Inf? %s", np.any(np.isnan(M_AB)), np.any(np.isinf(M_AB))) logger.info("Retrying SVD for M_AB after adding jitter.") M_AB += 1e-6 * np.random.randn(*M_AB.shape) U, _, Vh = np.linalg.svd(M_AB, full_matrices=False) C = U @ Vh # (d, 2k) H = C[:, :k] Y = C[:, k:] point_tangent_times.append(time.time() - pt_time) # Theta-Update theta_time = time.time() Theta = _estimate_theta(H, Y, X, t, niter=5, tH=tH, Theta_init=Theta) theta_times.append(time.time() - theta_time) # Store intermediate iterations if requested (only last num_intermediate_iterations) if return_intermediate_iterations: intermediate_H.append(H.copy()) intermediate_Y.append(Y.copy()) intermediate_Theta.append(Theta.copy()) # Keep only the last num_intermediate_iterations if len(intermediate_H) > num_intermediate_iterations: intermediate_H.pop(0) intermediate_Y.pop(0) intermediate_Theta.pop(0) # Check convergence? conv_check_time = time.time() delta_H = np.linalg.norm(H - H_old) delta_Y = np.linalg.norm(Y - Y_old) delta_Theta = np.linalg.norm(Theta - Theta_old) rel_delta_H = delta_H / (np.linalg.norm(H) + 1e-15) rel_delta_Y = delta_Y / (np.linalg.norm(Y) + 1e-15) rel_delta_Theta = delta_Theta / (np.linalg.norm(Theta) + 1e-15) if (delta_H < tol and delta_Y < tol and delta_Theta < tol) or (rel_delta_H < rel_tol and rel_delta_Y < rel_tol and rel_delta_Theta < rel_tol): logger.info("Point-tangent geodesic converged in %s iterations.", itr) break # update the old values H_old = H.copy() Y_old = Y.copy() Theta_old = Theta.copy() conv_check_times.append(time.time() - conv_check_time) #print("point tangent time", np.sum(np.array(point_tangent_times))) #print("theta time", np.sum(np.array(theta_times))) #print("conv check time", np.sum(np.array(conv_check_times))) if return_intermediate_iterations: return H, Y, Theta, intermediate_H, intermediate_Y, intermediate_Theta return H, Y, Theta class _SpectralGeodesicSmoother(ABC): def __init__(self, *args, d, T, sadj_list=None, precomputed_embeddings=None, ke='auto', kc_list='auto', stable_communities=False, which_eig='smallest', t=None, benefit_fn=None, smoothing_filter=None, smoothing_parameter=None, max_iter=1000, use_intermediate_iterations=False, num_intermediate_iterations=20): if len(args) > 0: raise ValueError("This class does not accept positional arguments") if precomputed_embeddings is not None and sadj_list is not None: logger.warning("Both precomputed_embeddings and sadj_list are provided. The sadj_list will be ignored.") self.d = d self.max_iter = max_iter self.sadj_list = sadj_list self.precomputed_embeddings = precomputed_embeddings self.ke = ke if ke != 'auto' else self.choose_ke() if isinstance(kc_list, int): kc_list = [kc_list]*T self.kc_list = kc_list self.stable_communities = stable_communities self.which_eig = which_eig if t is not None: self.t = t elif sadj_list is not None: self.t = np.linspace(0.0, 1.0, len(sadj_list)) elif precomputed_embeddings is not None: self.t = np.linspace(0.0, 1.0, len(precomputed_embeddings)) else: raise ValueError("Either sadj_list or precomputed_embeddings must be provided") self.t = t if t is not None else np.linspace(0.0, 1.0, len(sadj_list)) if sadj_list is not None else np.linspace(0.0, 1.0, len(precomputed_embeddings)) self.benefit_fn = benefit_fn self.smoothing_filter = smoothing_filter self.smoothing_parameter = smoothing_parameter self.T = T self.use_intermediate_iterations = use_intermediate_iterations self.num_intermediate_iterations = num_intermediate_iterations if self.stable_communities and kc_list != 'auto' and len(set(kc_list)) > 1: raise ValueError("stable_communities=True requires a single kc value or 'auto'.") def benefit_fn_broadcast(self, sadj_list, labels_list): return [self.benefit_fn(sadj, labels) for sadj, labels in zip(sadj_list, labels_list)] @abstractmethod def make_clustering_matrix(self, sadj): pass def choose_ke(self): raise NotImplementedError("must implement choose_ke") def make_clustering_matrices(self): self.clustering_matrices = [self.make_clustering_matrix(sadj) for sadj in self.sadj_list] def make_modeled_clustering_matrices(self): # this is permitted to be overwritten by subclasses, but usually no need; the following default implementation usually works fine assert self.clustering_matrices is not None, "You must first run make_clustering_matrices" self.modeled_clustering_matrices = [] for R in self.clustering_matrices: frobenius_norm = sparse.linalg.norm(R, 'fro') n,m= R.shape I = sparse.eye(n, m, format=R.format) if self.which_eig == 'smallest': self.modeled_clustering_matrices.append(frobenius_norm*I - R) elif self.which_eig == 'largest': self.modeled_clustering_matrices.append(frobenius_norm*I + R) elif self.which_eig == 'svd': self.modeled_clustering_matrices.append(R) else: raise ValueError("Invalid value for which_eig") # this never overwritten by subclasses def get_geodesic_embeddings(self): # This method now assumes self.Xs is already prepared. # Get geodesic parameters, optionally with intermediate iterations if self.use_intermediate_iterations: H, Y, Theta, intermediate_H, intermediate_Y, intermediate_Theta = _sfit_point_tangent_geodesic( (self.Xs, self.t), k=self.ke, max_iter=self.max_iter, return_intermediate_iterations=True, num_intermediate_iterations=self.num_intermediate_iterations ) self.intermediate_H = intermediate_H self.intermediate_Y = intermediate_Y self.intermediate_Theta = intermediate_Theta else: H, Y, Theta = _sfit_point_tangent_geodesic((self.Xs, self.t), k=self.ke, max_iter=self.max_iter) # Compute main geodesic embeddings self.Us = [H @ cosm(np.diag(Theta)*self.t[i]) + Y @ sinm(np.diag(Theta)*self.t[i]) for i in range(self.T)] # Optionally compute intermediate geodesic embeddings for positional encodings if self.use_intermediate_iterations: self.intermediate_Us = [] for H_inter, Y_inter, Theta_inter in zip(intermediate_H, intermediate_Y, intermediate_Theta): Us_inter = [H_inter @ cosm(np.diag(Theta_inter)*self.t[i]) + Y_inter @ sinm(np.diag(Theta_inter)*self.t[i]) for i in range(self.T)] self.intermediate_Us.append(Us_inter) # Concatenate intermediate embeddings to main embeddings for enriched positional encodings self.enriched_Us = [] for i in range(self.T): # Start with main embedding enriched_embedding = [self.Us[i]] # Add intermediate embeddings for this timestep for inter_Us in self.intermediate_Us: enriched_embedding.append(inter_Us[i]) # Concatenate along feature dimension self.enriched_Us.append(np.concatenate(enriched_embedding, axis=1)) else: self.enriched_Us = self.Us # this overwritten sometimes by subclasses def clustering_Euclidean(self): # Use enriched embeddings if available, otherwise use standard embeddings embeddings_to_use = self.enriched_Us if hasattr(self, 'enriched_Us') else self.Us T = len(embeddings_to_use) if all(k == self.kc_list[0] for k in self.kc_list): # Constant kc_list k_val = self.kc_list[0] kmeans = KMeans(n_clusters=k_val, n_init=10) labels = [kmeans.fit_predict(U_i) for U_i in embeddings_to_use] return labels elif all(isinstance(k, int) for k in self.kc_list): # Non-constant, provided kc_list labels = [] for U_i, k_val in zip(embeddings_to_use, self.kc_list): kmeans = KMeans(n_clusters=k_val, n_init=10) labels.append(kmeans.fit_predict(U_i)) return labels else: # Auto kc_list logger.info("Auto community-count selection uses placeholder kmin=2. Might later consider implementing a heuristic to guess higher when appropriate.") kmin, kmax = 2, self.ke k_vals = range(kmin, kmax + 1) benefit_vs_time_and_k = np.zeros((len(k_vals), T)) - np.inf labels_by_k = {k: [] for k in k_vals} start = time.time() for i, U_i in enumerate(embeddings_to_use): for j, k_val in enumerate(k_vals): kmeans = KMeans(n_clusters=k_val, n_init=10) labels_i = kmeans.fit_predict(U_i) labels_by_k[k_val].append(labels_i) benefit = self.benefit_fn_broadcast([self.sadj_list[i]], [labels_i])[0] benefit_vs_time_and_k[j, i] = benefit if self.smoothing_filter == 'median': kernel_size = self.smoothing_parameter smoothed_benefit = np.array([medfilt(row, kernel_size) for row in benefit_vs_time_and_k]) elif self.smoothing_filter == 'gaussian': sigma = self.smoothing_parameter smoothed_benefit = np.array([gaussian_filter1d(row, sigma) for row in benefit_vs_time_and_k]) else: smoothed_benefit = benefit_vs_time_and_k if self.stable_communities: aggregated_benefit = smoothed_benefit.mean(axis=1) best_k = k_vals[int(np.argmax(aggregated_benefit))] best_labels_all = labels_by_k[best_k] else: k_maximizing_benefit = [k_vals[l] for l in np.argmax(smoothed_benefit, axis=0)] best_labels_all = [labels_by_k[k][i] for i, k in enumerate(k_maximizing_benefit)] return best_labels_all # this is never overwritten by subclasses def run_dcd(self): self.run_geo_embeddings() time_start = time.time() assignments = self.clustering_Euclidean() return assignments def run_geo_embeddings(self): time_start = time.time() # If custom embeddings are provided, use them directly for fitting. if self.precomputed_embeddings is not None: logger.info("Using pre-computed embeddings for geodesic fitting.") self.Xs = self.precomputed_embeddings # Otherwise, run the standard matrix processing pipeline. else: if self.sadj_list is None: raise ValueError("sadj_list must be provided if precomputed_embeddings is not used.") self.make_clustering_matrices() self.make_modeled_clustering_matrices() self.Xs = self.modeled_clustering_matrices # Now, call the fitting method, which assumes self.Xs is ready self.get_geodesic_embeddings() logger.debug("Time to get geodesic embeddings: %.3fs", time.time() - time_start) def _spectral_geodesic_smoothing(sadj_list=None, T=None, num_nodes=None, ke=None, precomputed_embeddings=None, kc='auto', stable_communities=False, mode='simple-nsc', smoothing_filter=None, smoothing_parameter=None, return_geo_embeddings_only=False, use_intermediate_iterations=False, num_intermediate_iterations=20, **mode_kwargs): if T is None: if sadj_list is not None: T = len(sadj_list) elif precomputed_embeddings is not None: T = len(precomputed_embeddings) else: raise ValueError("sadj_list or precomputed_embeddings must be provided") if not isinstance(kc, list) and kc != 'auto': try: kc = [kc] * T except TypeError: raise ValueError("kc must be a list, a natural number, or the default 'auto'") algorithms = { 'simple': ['nsc', 'smm', 'bhc', 'usc'], 'signed': ['srsc', 'gmsc', 'spmsc'], 'overlapping': ['osc', 'csc'], 'directed': ['ddsc', 'bsc', 'rwsc'], 'multiview': ['gmsc', 'pmlc'], 'cocommunity': ['scc'], 'hierarchical': ['hsc'], } simple_class_lookup = { 'nsc': _NSC, 'smm': _SMM, 'bhc': _BHC, 'usc': _USC, } simple_modes = { f"simple-{alg}": simple_class_lookup[alg] for alg in algorithms['simple'] } disabled_modes = { f"{category}-{alg}" for category, algs in algorithms.items() if category != 'simple' for alg in algs } if mode in simple_modes: smoother_class = simple_modes[mode] elif mode in disabled_modes: raise NotImplementedError("Non-'simple-*' temporal spectral modes are currently disabled.") else: valid_modes = ', '.join(sorted(simple_modes.keys())) raise ValueError(f"Invalid simple mode '{mode}'. Valid simple modes are: {valid_modes}") smoother = smoother_class(T=T, d=num_nodes, sadj_list=sadj_list, precomputed_embeddings=precomputed_embeddings, ke=ke, kc_list=kc, stable_communities=stable_communities, smoothing_filter=smoothing_filter, smoothing_parameter=smoothing_parameter, use_intermediate_iterations=use_intermediate_iterations, num_intermediate_iterations=num_intermediate_iterations, **mode_kwargs) if return_geo_embeddings_only: smoother.run_geo_embeddings() # Return enriched embeddings if available, otherwise standard embeddings return smoother.enriched_Us if hasattr(smoother, 'enriched_Us') else smoother.Us assignments = smoother.run_dcd() # Return enriched embeddings if available, otherwise standard embeddings embeddings_to_return = smoother.enriched_Us if hasattr(smoother, 'enriched_Us') else smoother.Us return assignments, embeddings_to_return ## Begin subclass implementations class _Simple(_SpectralGeodesicSmoother): @staticmethod def calculate_modularity(adj_matrix: csr_matrix, communities: list) -> float: """ Calculate the modularity of a network given its adjacency matrix and community assignments. :param adj_matrix: Sparse adjacency matrix of the network (scipy.sparse.csr_matrix) :param communities: List of community assignments for each node :return: Modularity value """ if not isinstance(adj_matrix, csr_matrix): raise ValueError("adj_matrix must be a scipy.sparse.csr_matrix") if len(communities) != adj_matrix.shape[0]: raise ValueError("Number of community assignments must match number of nodes") n_edges = adj_matrix.sum() / 2 n_nodes = adj_matrix.shape[0] modularity = 0 for i in range(n_nodes): for j in range(n_nodes): if communities[i] == communities[j]: a_ij = adj_matrix[i, j] k_i = adj_matrix.getrow(i).sum() k_j = adj_matrix.getrow(j).sum() expected = (k_i * k_j) / (2 * n_edges) modularity += a_ij - expected modularity /= (2 * n_edges) return modularity @staticmethod def calculate_modularity_vectorized(adj_matrix: csr_matrix, communities: list) -> float: """ Calculate the modularity of a network given its adjacency matrix and community assignments using a fully vectorized approach. :param adj_matrix: Sparse adjacency matrix of the network (scipy.sparse.csr_matrix) :param communities: List of community assignments for each node :return: Modularity value """ if not isinstance(adj_matrix, csr_matrix): raise ValueError("adj_matrix must be a scipy.sparse.csr_matrix") n_nodes = adj_matrix.shape[0] if len(communities) != n_nodes: raise ValueError("Number of community assignments must match number of nodes") m = adj_matrix.sum() / 2 if m == 0: raise ValueError("The network has no edges.") communities = np.array(communities) unique_communities, inverse_indices = np.unique(communities, return_inverse=True) n_communities = unique_communities.size degrees = np.array(adj_matrix.sum(axis=1)).flatten() degree_sum_per_community = np.bincount(inverse_indices, weights=degrees) community_matrix = csr_matrix( (np.ones(n_nodes), (inverse_indices, np.arange(n_nodes))), shape=(n_communities, n_nodes) ) connections_within = community_matrix.dot(adj_matrix).dot(community_matrix.transpose()) internal_edges_per_community = connections_within.diagonal() / 2 modularity = (internal_edges_per_community.sum() / m) - np.sum((degree_sum_per_community / (2 * m)) ** 2) return modularity def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.benefit_fn = self.calculate_modularity_vectorized class _Signed(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def power_mean_laplacian(self, A_pos, A_neg, p, epsilon=1e-6): pass def create_clustering_matrix(self, A_pos, A_neg, p, epsilon=1e-6): pass class _Directed(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class _Overlapping(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class _Multiview(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class _Cocommunity(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) class _Hierarchical(_SpectralGeodesicSmoother): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) ## end subclass implementations ## Begin subsubclass implementations class _NSC(_Simple): def make_clustering_matrix(self, sadj): pass # we override the full make_modeled_clustering_matrices method using normalized_signless_laplacian (recall that normalized signless laplacian SVD is equivalent to spectral clustering with normalized graph laplacian) @staticmethod def normalized_signless_laplacian(A): degrees = A.sum(axis=1).A1 with np.errstate(divide='ignore', invalid='ignore'): D_inv_sqrt = 1.0 / np.sqrt(degrees) D_inv_sqrt[np.isinf(D_inv_sqrt) | np.isnan(D_inv_sqrt)] = 0 D_inv_sqrt_matrix = sparse.diags(D_inv_sqrt) D_plus_A = sparse.diags(degrees) + A L_signless = D_inv_sqrt_matrix @ D_plus_A @ D_inv_sqrt_matrix return L_signless def make_modeled_clustering_matrices(self): self.modeled_clustering_matrices = [_NSC.normalized_signless_laplacian(sadj) for sadj in self.sadj_list] class _SMM(_Simple): def __init__(self, *args, **kwargs): kwargs['which_eig'] = 'largest' super().__init__(*args, **kwargs) def make_clustering_matrix(self, A): degrees = A.sum(axis=1).A1 m = A.sum() / 2 expected = sparse.csr_matrix((np.outer(degrees, degrees) / (2 * m)).astype(A.dtype)) B = A - expected return B class _BHC(_Simple): def make_clustering_matrix(self, sadj): r=None if r is None: r = np.sqrt(sadj.mean() * sadj.shape[0]) n = sadj.shape[0] d = sadj.sum(axis=1).A1 H = (r**2 - 1) * sparse.eye(n) - r * sadj + sparse.diags(d) return H class _USC(_Simple): """Unnormalized Spectral Clustering using the unnormalized graph Laplacian L = D - A.""" def make_clustering_matrix(self, sadj): pass # we override the full make_modeled_clustering_matrices method using unnormalized_laplacian @staticmethod def unnormalized_laplacian(A): """ Compute the unnormalized graph Laplacian L = D - A. Args: A: Adjacency matrix (sparse) Returns: L: Unnormalized Laplacian matrix (sparse) """ degrees = A.sum(axis=1).A1 D = sparse.diags(degrees) L = D - A return L def make_modeled_clustering_matrices(self): self.modeled_clustering_matrices = [_USC.unnormalized_laplacian(sadj) for sadj in self.sadj_list] def _signed_power_mean_laplacian(A_pos, A_neg, p, epsilon=1e-6): pass class _SRSC(_Signed): def make_clustering_matrix(self, sadj_plus, sadj_minus): pass class _GMSC(_Signed): def make_clustering_matrix(self, sadj_plus, sadj_minus): pass class _SPMSC(_Signed): def make_clustering_matrix(self, sadj_plus, sadj_minus): pass class _OSC(_Overlapping): def make_clustering_matrix(self, sadj): pass class _CSC(_Overlapping): def make_clustering_matrix(self, sadj): pass class _DDSC(_Directed): def make_clustering_matrix(self, sadj): pass class _BSC(_Directed): def make_clustering_matrix(self, sadj): pass class _RWSC(_Directed): def make_clustering_matrix(self, sadj): pass class _PMLC(_Multiview): def make_clustering_matrix(self, sadj): pass class _SCC(_Cocommunity): def make_clustering_matrix(self, sadj): pass class _HSC(_Hierarchical): def make_clustering_matrix(self, sadj): pass ## End subsubclass implementations