diff --git a/graphconstructor/operators/__init__.py b/graphconstructor/operators/__init__.py index 1e0e7f5..fe379db 100644 --- a/graphconstructor/operators/__init__.py +++ b/graphconstructor/operators/__init__.py @@ -2,6 +2,7 @@ from .disparity import DisparityFilter from .doubly_stochastic import DoublyStochasticBackbone, DoublyStochasticNormalize from .enhanced_configuration_model import EnhancedConfigurationModelFilter +from .high_similarity import HighSimilarityFilter from .knn_selector import KNNSelector from .locally_adaptive_sparsification import LocallyAdaptiveSparsification from .marginal_likelihood import MarginalLikelihoodFilter @@ -17,6 +18,7 @@ "DoublyStochasticBackbone", "EnhancedConfigurationModelFilter", "GraphOperator", + "HighSimilarityFilter", "KNNSelector", "LocallyAdaptiveSparsification", "MarginalLikelihoodFilter", diff --git a/graphconstructor/operators/high_similarity.py b/graphconstructor/operators/high_similarity.py new file mode 100644 index 0000000..16a69ec --- /dev/null +++ b/graphconstructor/operators/high_similarity.py @@ -0,0 +1,98 @@ +from dataclasses import dataclass +from typing import Literal +import networkx as nx +from ..graph import Graph +from .base import GraphOperator + + +Method = Literal["PA", "LP"] + + +@dataclass(slots=True) +class HighSimilarityFilter(GraphOperator): + """ + Extract a weighted undirected backbone using the HighSimilarityFilter (HS). + + Parameters + ---------- + k : float, default=0.5 + Threshold for retaining edges. Smaller values produce + sparser backbones. + method : {"PA", "LP"}, default="PA" + Link-prediction function. PA=Preferential Attachment, LP=Local Path Index. + + References + ---------- + Paper: https://link.springer.com/article/10.1007/s41109-025-00705-y + """ + + k: float = 0.5 + method: Method = "PA" + supported_modes = ["similarity"] + + @staticmethod + def _calculate_edge_similarities(G, method): + """Calculate similarities for all edges in graph G using similarity function S. + + Parameters + ---------- + G NetworkX graph + """ + # for each edge (u, v) in E do: + # temprarily remove edge (u, v) from G + # calculate similarity S(u, v) + # restore (u, v) to G + # assign S(u, v) to edge (u, v) + # return G + epsilon = 0.01 + graph = G.to_networkx() + A = nx.to_numpy_array(graph) + A2 = A @ A + A3 = A2 @ A + for u, v in graph.edges(): + graph.remove_edge(u, v) + if method == "PA": + s = nx.preferential_attachment(graph, [(u, v)]) + p = next(iter(s))[2] + elif method == "LP": + # https://www.sciencedirect.com/science/article/pii/S0378437120300856?via%3Dihub + p = A2[u, v] + epsilon * A3[u, v] + + graph.add_edge(u, v, similarity=p) + + return graph + + def _directed_filter(self, G: Graph) -> Graph: + pass + + def _undirected_filter(self, G): + """Select top k% edges from graph G""" + # initialize E <- empty set + # sort edges E in descending order based on S(u, v) + # select top k% edges from E to form sorted E' + # E' <- selected edges + # return E' + n_nodes = G.n_nodes + G = self._calculate_edge_similarities(G, self.method) + + edge_similarities = {} + for u, v, data in sorted(G.edges(data=True), key=lambda x: x[2]["similarity"]): + edge_similarities[(u, v)] = data["similarity"] + + select_count = int(len(edge_similarities) * self.k) + + selected_edges = dict(list(edge_similarities.items())[:select_count]) + new_graph = nx.Graph() + new_graph.add_weighted_edges_from((u, v, w) for (u, v), w in selected_edges.items()) + new_graph.add_nodes_from(range(n_nodes)) + + new_graph = nx.to_scipy_sparse_array(new_graph) + new_graph = Graph.from_dense(new_graph, mode="similarity") + return new_graph + + def apply(self, G): + self._check_mode_supported(G) + if G.directed: + return self._directed_filter(G) + else: + return self._undirected_filter(G)