Module hdlib.model.graph
Directed and undirected, weighted and unweighted graphs with hdlib.
It implements the hdlib.model.graph.GraphModel class object which allows to represent weighted directed and undirected graphs built according to the Hyperdimensional Computing (HDC) paradigm as described in Poduval et al. 2022 https://doi.org/10.3389/fnins.2022.757125.
Classes
class GraphModel (size: int = 10000, directed: bool = False, seed: Optional[int] = None)
-
Hyperdimensional GraphModel representation.
Initialize a GraphModel object.
Parameters
size
:int
, default10000
- The size of vectors used to create a Space and define Vector objects.
directed
:bool
, defaultFalse
- Directed or undirected.
seed
:int
, optional- Random seed for reproducibility of results.
Raises
TypeError
- If the vector size is not an integer number.
ValueError
- If the vector size is lower than 1,000.
Examples
>>> from hdlib.model import GraphModel >>> graph = GraphModel(size=10000, vtype='bipolar', directed=False) >>> type(graph) <class 'hdlib.model.GraphModel'>
This creates a new undirected GraphModel object around a Space that can host random bipolar Vector objects with size 10,000.
Expand source code
class GraphModel(object): """Hyperdimensional GraphModel representation.""" def __init__( self, size: int=10000, directed: bool=False, seed: Optional[int]=None ) -> "GraphModel": """Initialize a GraphModel object. Parameters ---------- size : int, default 10000 The size of vectors used to create a Space and define Vector objects. directed : bool, default False Directed or undirected. seed : int, optional Random seed for reproducibility of results. Raises ------ TypeError If the vector size is not an integer number. ValueError If the vector size is lower than 1,000. Examples -------- >>> from hdlib.model import GraphModel >>> graph = GraphModel(size=10000, vtype='bipolar', directed=False) >>> type(graph) <class 'hdlib.model.GraphModel'> This creates a new undirected GraphModel object around a Space that can host random bipolar Vector objects with size 10,000. """ if not isinstance(size, int): raise TypeError("Vectors size must be an integer number") if size < 1000: raise ValueError("Vectors size must be greater than or equal to 1000") # Register vectors dimensionality self.size = size # Register a set with edge weights (classes) self.weights = set() # Register vectors type self.vtype = "bipolar" # Register whether the graph is directed or undirected self.directed = directed # Keep track of the number of nodes self.nodes_counter = 0 # Keep track of the number of edges self.edges_counter = 0 # Hyperdimensional space self.space = Space(size=self.size, vtype=self.vtype) self.seed = seed if self.seed is None: self.rand = np.random.default_rng() else: # Conditions on random seed for reproducibility # numpy allows integers as random seeds if not isinstance(seed, int): raise TypeError("Seed must be an integer number") self.rand = np.random.default_rng(seed=self.seed) # We estimate a proper threshold to establish whether an edge exists between two nodes # Keep track of these thresholds to avoid recomputing them # This is also weight dependant self.weight_to_node_specific_thresholds = dict() # Also keep track of the edges for error rate estimation and error mitigation self.edges = set() # Keep track of hdlib version self.version = __version__ def __str__(self) -> str: """Print the GraphModel object properties. Returns ------- str A description of the GraphModel object. It reports the vectors size, the vector type, the number of nodes, the number of edges, and whether it is directed or undirected. Examples -------- >>> from hdlib.model import GraphModel >>> graph = GraphModel() >>> print(graph) Class: hdlib.model.graph.GraphModel Version: 0.1.17 Size: 10000 Type: bipolar Directed: False Weights: 100 Nodes: 0 Edges: 0 Seed: None Print the GraphModel object properties. By default, the size of vectors in space is 10,000, their types is bipolar, and the number of level vectors is 2. The number of data points and the number of class labels are empty here since no dataset has been processed yet. """ return """ Class: hdlib.model.graph.GraphModel Version: {} Size: {} Type: {} Directed: {} Weights: {} Nodes: {} Edges: {} Seed: {} """.format( self.version, self.size, self.vtype, self.directed, len(self.weights), self.nodes_counter, self.edges_counter, self.seed ) def _add_edge( self, node1: str, node2: str, weight: Any, ) -> None: """Add an edge to the graph and automatically build nodes if they do not exist in the space. Parameters ---------- node1 : str Node name. node2 : str Node name. weight : Any The edge weight. This can be numeric or a string used as a class label. Raises ------ ValueError If `node1` or `node2` is equals to `GRAPH_ID`. """ if node1 == GRAPH_ID or node2 == GRAPH_ID: raise ValueError(f"Node names cannot match with the private graph ID `{GRAPH_ID}`") edge_exists = False if node1 in self.space.space and node2 in self.space.space: # Check whether an edge between node1 and node2 already exists if self.directed and node2 in self.space.space[node1].children: edge_exists = True elif not self.directed and node2 in self.space.space[node1].children and node1 in self.space.space[node2].children: edge_exists = True if not edge_exists: for node in [node1, node2]: # Build node if it is not in the space if node not in self.space.space: # Build a random binary vector vector = Vector( name=node, size=self.size, vtype=self.vtype ) # Define a new property called memory to store information # about current node neighbors setattr(vector, "memory", None) # Define a new property called weights to store # edge weights in case of a weighted graph setattr(vector, "weights", dict()) # Register the node into the space self.space.insert(vector) # Increment the nodes counter self.nodes_counter += 1 # Take track of the edge as a link between the two nodes self.space.link(node1, node2) # Increment the edges counter self.edges_counter += 1 if not self.directed: # Take track of the same edge again in case of an undirected graph self.space.link(node2, node1) # Increment the edges counter self.edges_counter += 1 # Keep track of the edge weight if node2 not in self.space.space[node1].weights: self.space.space[node1].weights[node2] = set() if not self.directed: if node1 not in self.space.space[node2].weights: self.space.space[node2].weights[node1] = set() self.space.space[node1].weights[node2].add(weight) if not self.directed: self.space.space[node2].weights[node1].add(weight) # Keep track of the edges here self.edges.add((node1, node2, weight)) def _node_memory(self, node: str) -> None: """Build the node memory as a vector containing information about its neighbors. Parameters ---------- node : str The node for which we want to build the memory. Raises ------ Exception - if the input `node` is not in the graph space; - if the input `node` does not have any neighbors. """ if node not in self.space.space: raise Exception(f"Node `{node}` is not in the graph space") neighbors = self.space.space[node].children node_memory = None for neighbor in neighbors: # Get the real weight from vector tags for weight in self.space.space[node].weights[neighbor]: # Retrieve the weight vector from the space weight_vector = self.space.space[f"{WEIGHT_ID}{weight}"] if node_memory is None: # Initialize the node memory with the first neighbor # multiplied by its weight vector node_memory = weight_vector * self.space.space[neighbor] else: # Multiply each neighbor with its weight vector and # bundle all the resulting vectors together to build the node memory node_memory = node_memory + (weight_vector * self.space.space[neighbor]) # Store the node memory into the memory property of the node vector object self.space.space[node].memory = node_memory def _weight_memory(self) -> None: """Build the weights memory. """ # Recover edge weights from the space for node in self.space.space: # Check whether the current node is not the actual graph memory # Also, check whether the current node is not a weight vector if node != GRAPH_ID and not node.startswith(WEIGHT_ID): for neighbor in self.space.space[node].weights: # Retrieve the weights on these edges self.weights.update(self.space.space[node].weights[neighbor]) for weight in self.weights: # Build a random vector weight_vector = Vector(name=f"{WEIGHT_ID}{weight}", size=self.size, vtype=self.vtype) self.space.insert(weight_vector) @staticmethod def _error_rate(instance: "GraphModel") -> Tuple[float, Set[Tuple[str, str, float]], Set[Tuple[str, str, float]]]: """Just a wrapper around the `error_rate` function to make it callable in multiprocessing. It is safe to run in multiprocessing because it does not modify any instance attributes. Parameters ---------- instance : GraphModel A GraphModel instance. Returns ------- tuple A tuple with the error rate, and the sets of flase positive and false negative edges among those in the input `edges`. """ return instance.error_rate() def error_rate(self) -> Tuple[float, Set[Tuple[str, str, float]], Set[Tuple[str, str, float]]]: """Compute the error rate defined as the number of mispredicted edges on the total number of edges. Note that the error rate depends on the set of edges in input to this function which could be different from the actual set of edges used to build the graph model. Returns ------- tuple A tuple with the error rate, and the sets of flase positive and false negative edges among those in the input `edges`. """ # Compute the error rate as the number of mispredicted edges over the total number of edges false_negatives = set() for edge in self.edges: node1, node2, weight_true = edge # Compute node specific threshold node1_threshold = self.weight_to_node_specific_thresholds[weight_true].get(node1) if weight_true in self.weight_to_node_specific_thresholds else None # Search for the current edge edge_exists, _, dist_threshold = self.edge_exists(node1, node2, weight, threshold=node1_threshold) if weight_true not in self.weight_to_node_specific_thresholds: self.weight_to_node_specific_thresholds[weight_true] = dict() if node1 not in self.weight_to_node_specific_thresholds[weight_true]: self.weight_to_node_specific_thresholds[weight_true][node1] = dist_threshold if not edge_exists: # This is a false negative false_negatives.add(edge) return len(false_negatives) / len(edges), false_negatives def error_mitigation( self, max_iter: int=10, nproc: int=1 ) -> None: """Mitigate the error rate of the graph model. Parameters ---------- max_iter : int, deafult 10 This is an iterative process that is repeated for up to `max_iter` iterations. nproc : int, default 1 Maximum number of jobs for multiprocessing. """ if max_iter > 0: # Keep track of misclassified edges false_negatives = set() # Split the set of edges into equally sized chunks # This is used for multiprocessing only chunk_size = len(self.edges) // nproc edges_list = None edges_subsets = None # Redefine the number of CPUs for multiprocessing if nproc <= 1: nproc = 1 # Compute the model error rate prev_error_rate, false_negatives = self.error_rate() else: if nproc > mp.cpu_count(): nproc = mp.cpu_count() # Very inefficient but required edges_list = list(self.edges) edges_subsets = [(self, set(edges_list[i:i+chunk_size])) for i in range(0, len(edges_list), chunk_size)] with mp.Pool(processes=nproc) as pool: # Compute the model error rate in multiprocessing error_estimation = pool.starmap(self.__class__._error_rate, edges_subsets) # Retrieve the set of misclassified edges for _, false_negatives_partial in error_estimation: false_negatives = false_negatives.union(false_negatives_partial) prev_error_rate = len(false_negatives) / len(self.edges) print(f"(base)\tError rate: {prev_error_rate}\tFalse negatives: {len(false_negatives)}") # Error mitigate the model if the error rate is > 0 if prev_error_rate > 0: for i in range(max_iter): # Work on a copy of self here graph = copy.deepcopy(self) # Rebuild the mispredicted node memories for edge in false_negatives: #node1, node2, weight_true, weights_pred = edge node1, node2, weight_true = edge if graph.space.space[node1].memory: # Retrieve the weight vector from the space weight_true_vector = graph.space.space[f"{WEIGHT_ID}{weight_true}"] # Defined correct connection vector # Reinforcement signal correct_connection = weight_true_vector * graph.space.space[node2] if self.directed: # Increase the signal of node1, node2 connection on weight_true graph.space.space[GRAPH_ID] += graph.space.space[node1] * permute(correct_connection, rotate_by=1) else: # Do not permute the reinforced node1 memory if the global graph is not directed graph.space.space[GRAPH_ID] += graph.space.space[node1] * correct_connection # Reset `false_negatives` false_negatives = set() # Redefine the number of CPUs for multiprocessing # There is no need to downscale nproc here if nproc == 1: # Compute the model error rate error_rate, false_negatives = graph.error_rate() else: with mp.Pool(processes=nproc) as pool: # Compute the model error rate in multiprocessing error_estimation = pool.starmap(graph.__class__._error_rate, edges_subsets) # Retrieve the set of misclassified edges for _, false_negatives_partial in error_estimation: false_negatives = false_negatives.union(false_negatives_partial) error_rate = len(false_negatives) / len(graph.edges) print(f"(iter {i+1})\tError rate: {error_rate}\tFalse negatives: {len(false_negatives)}") if error_rate > prev_error_rate: # Stop the error mitigating the graph model here break # Update the error rate prev_error_rate = error_rate # Make the error mitigated graph persistent self.__dict__.update(graph.__dict__) if error_rate == 0.0: # There is nothing else to do here break def fit( self, edges: Set[Tuple[str, str, Any]], build_nodes_memory: bool=True ) -> None: """Build the graph memory and store it into the space. Parameters ---------- edges : set The set of edges defined as tuples `<source, target, weight>`. build_nodes_memory : bool, default True Build nodes and weight memories by default. This must be set to False in case this is invoked to mitigate the error rate. Raises ------ ValueError If no edges are provided in input. """ if not edges: raise ValueError("Must provide at least one edge") for edge in edges: node1, node2, weight = edge self._add_edge(node1, node2, weight) if build_nodes_memory: # Build the vector representation of the edges weight self._weight_memory() graph = None for node in self.space.space: # Check whether the current node is not the actual graph memory # Also, check whether the current node is not a weight vector # This is required because this function can be run multiple times if node != GRAPH_ID and not node.startswith(WEIGHT_ID): if build_nodes_memory: # Build the node memory self._node_memory(node) if self.directed: if graph is None: if self.space.space[node].memory: graph = self.space.space[node] * permute(self.space.space[node].memory, rotate_by=1) else: graph = self.space.space[node] else: if self.space.space[node].memory: # Build the graph memory as the bundle of all the node memories rotated by 1 position graph = graph + (self.space.space[node] * permute(self.space.space[node].memory, rotate_by=1)) else: graph = graph + self.space.space[node] else: if graph is None: if self.space.space[node].memory: graph = self.space.space[node] * self.space.space[node].memory else: graph = self.space.space[node] else: if self.space.space[node].memory: # Build the graph memory as the bundle of all the node memories graph = graph + (self.space.space[node] * self.space.space[node].memory) else: graph = graph + self.space.space[node] if not self.directed: # Introduce a factor 1/2 because if we expand the node memory, then # H(i)*H(j) and H(j)*H(i) will be counted twice graph.vector = graph.vector / 2 # Check whether a graph is already present in the space if GRAPH_ID in self.space.space: self.space.remove(GRAPH_ID) # Rename the graph vector graph.name = GRAPH_ID # Store the graph vector into the space self.space.insert(graph) # Also keep track of the edges # This is used in case of the error_mitigation() self.edges = edges def edge_exists( self, node1: str, node2: str, weight: Any, threshold: Optional[float]=None ) -> Tuple[bool, float, float]: """Check whether an edge exists between `node1` and `node2` according to a specified distance `threshold`. node1 : str The source node name or ID. node2 : str The target node name or ID. weight : Any The edge weight. threshold : float, default None The distance threshold on vectors to establish the presence of the edge. It is automatically estimated if None. Returns ------- Tuple True in case an edge between `node1` and `node2` exists in the graph space, otherwise False. It also returns the actual distance between the two vectors and the threshold used to establish whether the edge exists. Raises ------ Exception - if there is no graph vector in the space; - if `node1` and `node2` are not in the graph space. """ if GRAPH_ID not in self.space.space: raise Exception("There is no graph in space") for node in [node1, node2]: if node not in self.space.space: raise Exception(f"Node '{node}' not in space") if f"{WEIGHT_ID}{weight}" not in self.space.space: raise Exception(f"Weight vector '{weight}' not in space") # Retrieve the vector representation of the graph graph = self.space.space[GRAPH_ID] # Also retrieve the vector representations of the two input nodes node1_vector = self.space.space[node1] node2_vector = self.space.space[node2] # Retrieve the node1 memory by binding the vector representation of # node1 with the vector representation of the graph # The resulting vector equals to the actual node1 memory plus noise node1_memory = bind(node1_vector, graph) if self.directed: # In case of directed graphs, nodes memory are rotated by 1 position # Thus, it must be rotated back in order to preserve the similarity node1_memory = permute(node1_memory, rotate_by=-1) # Check whether there is a edge between node1 and node2 by computing the distance between node2 and node1 memory # A distance close to 0 means the edge exists # A distance close to 1 means the edge does not exist # Retrieve the weight vector from the space weight_vector = self.space.space[f"{WEIGHT_ID}{weight}"] if threshold == None: if not self.space.space[node1].children: # We cannot do much if node1 has no children threshold = 1.0 else: # Retrieve node1 children # There could be hundreds of thousands of nodes here, so we focus on the first 10 neighbors = set(list(self.space.space[node1].children)[:10]) # Pick the same number of random non-neighbors # This is very inefficient! non_neighbors = set(self.rand.choice(list(self.space.space.keys()), size=len(neighbors), replace=False)) # Keep track of cosine distances between node1 and its neighbors plus random non-neighbors distances = list() for node in neighbors.union(non_neighbors): if node == GRAPH_ID or node.startswith(WEIGHT_ID): continue # Retrieve the node vector from the space # node is numpy.str_ node_vector = self.space.space[str(node)] with np.errstate(invalid="ignore", divide="ignore"): distances.append((weight_vector * node_vector).dist(node1_memory, method="cosine")) # Use the 5th percentile as threshold # This is a node-specific threshold for node1 threshold = float(np.percentile(distances, 5)) with np.errstate(invalid="ignore", divide="ignore"): distance = (weight_vector * node2_vector).dist(node1_memory, method="cosine") if distance < threshold: return True, distance, threshold return False, distance, threshold @staticmethod def _predict( instance: "GraphModel", name: str, y_true: str, edges: Set[Tuple[str, str, Any]], ) -> Tuple[float, Set[Tuple[str, str, float, Tuple[float]]], Set[Tuple[str, str, float, Tuple[float]]]]: """Just a wrapper around the `predict` function to make it callable in multiprocessing. It is safe to run in multiprocessing because it does not modify any instance attributes. Parameters ---------- instance : GraphModel A GraphModel instance. name : str The name associated to the set of edges. y_true : str The true class. edges : set The set of edges used to mitigate the graph model error rate. Note that the edges in this set do not necessarily have to be present in the graph. Returns ------- tuple A tuple with the name of the test set, the true class, and a tuple with the predicted class and it's accuracy in terms of percentage of edges that matched the predicted class. """ return name, y_true, instance.predict(edges) def predict( self, edges: Set[Tuple[str, str, Any]] ) -> Tuple[Any, float]: """Predict the weight (class) of a specific set of edges. Parameters ---------- edges : set Set of edges. Returns ------- tuple A tuple with the predicted class and it's accuracy in terms of percentage of edges that matched the predicted class. """ hits = {weight: 0 for weight in self.weights} for node1, node2, edge_weight in edges: if node1 in self.space.space and node2 in self.space.space: weight_pred = list() for weight in self.weights: node1_threshold = self.weight_to_node_specific_thresholds[weight].get(node1) if weight in self.weight_to_node_specific_thresholds else None _, weight_dist, dist_threshold = self.edge_exists(node1, node2, weight, threshold=node1_threshold) weight_pred.append((weight, weight_dist)) if weight not in self.weight_to_node_specific_thresholds: self.weight_to_node_specific_thresholds[weight] = dict() if node1 not in self.weight_to_node_specific_thresholds[weight]: self.weight_to_node_specific_thresholds[weight][node1] = dist_threshold if len(weight_pred) > 1: # Sort weights based on their distances weight_pred = sorted(weight_pred, key=lambda w: w[1]) # Get the difference between the closest and the farthest distances dist_diff = weight_pred[-1][1] - weight_pred[0][1] # Use the difference in distances to select the top closest weights weight_pred = [w[0] for w in weight_pred if w[1] - weight_pred[0][1] < dist_diff] for weight in weight_pred: hits[weight] += 1 else: weight_pred = weight_pred[0][0] hits[weight_pred] += 1 # Get the best hit prediction = max(hits, key=hits.get) accuracy = (hits[prediction] / len(edges)) * 100.0 return (prediction, accuracy)
Methods
def edge_exists(self, node1: str, node2: str, weight: Any, threshold: Optional[float] = None) ‑> Tuple[bool, float, float]
-
Check whether an edge exists between
node1
andnode2
according to a specified distancethreshold
.node1 : str The source node name or ID. node2 : str The target node name or ID. weight : Any The edge weight. threshold : float, default None The distance threshold on vectors to establish the presence of the edge. It is automatically estimated if None.
Returns
Tuple
- True in case an edge between
node1
andnode2
exists in the graph space, otherwise False. It also returns the actual distance between the two vectors and the threshold used to establish whether the edge exists.
Raises
Exception
-
- if there is no graph vector in the space;
- if
node1
andnode2
are not in the graph space.
def error_mitigation(self, max_iter: int = 10, nproc: int = 1) ‑> None
-
Mitigate the error rate of the graph model.
Parameters
max_iter
:int, deafult 10
- This is an iterative process that is repeated for up to
max_iter
iterations. nproc
:int
, default1
- Maximum number of jobs for multiprocessing.
def error_rate(self) ‑> Tuple[float, Set[Tuple[str, str, float]], Set[Tuple[str, str, float]]]
-
Compute the error rate defined as the number of mispredicted edges on the total number of edges. Note that the error rate depends on the set of edges in input to this function which could be different from the actual set of edges used to build the graph model.
Returns
tuple
- A tuple with the error rate, and the sets of flase positive and false negative edges
among those in the input
edges
.
def fit(self, edges: Set[Tuple[str, str, Any]], build_nodes_memory: bool = True) ‑> None
-
Build the graph memory and store it into the space.
Parameters
edges
:set
- The set of edges defined as tuples
<source, target, weight>
. build_nodes_memory
:bool
, defaultTrue
- Build nodes and weight memories by default. This must be set to False in case this is invoked to mitigate the error rate.
Raises
ValueError
- If no edges are provided in input.
def predict(self, edges: Set[Tuple[str, str, Any]]) ‑> Tuple[Any, float]
-
Predict the weight (class) of a specific set of edges.
Parameters
edges
:set
- Set of edges.
Returns
tuple
- A tuple with the predicted class and it's accuracy in terms of percentage of edges that matched the predicted class.