Module hdlib.graph
Directed and undirected, weighted and unweighted graphs with hdlib.
It implements the hdlib.graph.Graph class object which allows to represent weighted directed and undirected graphs built according to the Hyperdimensional Computing (HDC) paradigm as described in Poduval et al. 2022 https://doi.org/10.3389/fnins.2022.757125.
Classes
class Graph (size: int = 10000, directed: bool = False, seed: Optional[int] = None)
-
Hyperdimensional Graph representation.
Initialize a Graph object.
Parameters
size
:int
, default10000
- The size of vectors used to create a Space and define Vector objects.
directed
:bool
, defaultFalse
- Directed or undirected.
seed
:int
, optional- Random seed for reproducibility of results.
Raises
TypeError
- If the vector size is not an integer number.
ValueError
- If the vector size is lower than 10,000.
Examples
>>> from hdlib.graph import Graph >>> graph = Graph(size=10000, vtype='bipolar', directed=False) >>> type(graph) <class 'hdlib.graph.Graph'>
This creates a new undirected Graph object around a Space that can host random bipolar Vector objects with size 10,000.
Expand source code
class Graph(object): """Hyperdimensional Graph representation.""" def __init__( self, size: int=10000, directed: bool=False, seed: Optional[int]=None ) -> "Graph": """Initialize a Graph object. Parameters ---------- size : int, default 10000 The size of vectors used to create a Space and define Vector objects. directed : bool, default False Directed or undirected. seed : int, optional Random seed for reproducibility of results. Raises ------ TypeError If the vector size is not an integer number. ValueError If the vector size is lower than 10,000. Examples -------- >>> from hdlib.graph import Graph >>> graph = Graph(size=10000, vtype='bipolar', directed=False) >>> type(graph) <class 'hdlib.graph.Graph'> This creates a new undirected Graph object around a Space that can host random bipolar Vector objects with size 10,000. """ if not isinstance(size, int): raise TypeError("Vectors size must be an integer number") if size < 10000: raise ValueError("Vectors size must be greater than or equal to 10000") # Register vectors dimensionality self.size = size # Register a set with edge weights (classes) self.weights = set() # Register vectors type self.vtype = "bipolar" # Register whether the graph is directed or undirected self.directed = directed # Keep track of the number of nodes self.nodes_counter = 0 # Keep track of the number of edges self.edges_counter = 0 # Hyperdimensional space self.space = Space(size=self.size, vtype=self.vtype) self.seed = seed if self.seed is None: self.rand = np.random.default_rng() else: # Conditions on random seed for reproducibility # numpy allows integers as random seeds if not isinstance(seed, int): raise TypeError("Seed must be an integer number") self.rand = np.random.default_rng(seed=self.seed) # Keep track of hdlib version self.version = __version__ def __str__(self) -> None: """Print the Graph object properties. Returns ------- str A description of the Graph object. It reports the vectors size, the vector type, the number of nodes, the number of edges, and whether it is directed or undirected. Examples -------- >>> from hdlib.graph import Graph >>> graph = Graph() >>> print(graph) Class: hdlib.graph.Graph Version: 0.1.17 Size: 10000 Type: bipolar Directed: False Weights: 100 Nodes: 0 Edges: 0 Seed: None Print the MLModel object properties. By default, the size of vectors in space is 10,000, their types is bipolar, and the number of level vectors is 2. The number of data points and the number of class labels are empty here since no dataset has been processed yet. """ return """ Class: hdlib.graph.Graph Version: {} Size: {} Type: {} Directed: {} Weights: {} Nodes: {} Edges: {} Seed: {} """.format( self.version, self.size, self.vtype, self.directed, len(self.weights), self.nodes_counter, self.edges_counter, self.seed ) def _add_edge( self, node1: str, node2: str, weight: Any, ) -> None: """Add an edge to the graph and automatically build nodes if they do not exist in the space. Parameters ---------- node1 : str Node name. node2 : str Node name. weight : Any The edge weight. This can be numeric or a string used as a class label. Raises ------ ValueError If `node1` or `node2` is equals to `GRAPH_ID`. """ if node1 == GRAPH_ID or node2 == GRAPH_ID: raise ValueError("Node names cannot match with the private graph ID `{}`".format(GRAPH_ID)) edge_exists = False if node1 in self.space.space and node2 in self.space.space: # Check whether an edge between node1 and node2 already exists if self.directed and node2 in self.space.space[node1].children: edge_exists = True elif not self.directed and node2 in self.space.space[node1].children and node1 in self.space.space[node2].children: edge_exists = True if not edge_exists: for node in [node1, node2]: # Build node if it is not in the space if node not in self.space.space: # Build a random binary vector vector = Vector( name=node, size=self.size, vtype=self.vtype ) # Define a new property called memory to store information # about current node neighbors setattr(vector, "memory", None) # Define a new property called weights to store # edge weights in case of a weighted graph setattr(vector, "weights", dict()) # Register the node into the space self.space.insert(vector) # Increment the nodes counter self.nodes_counter += 1 # Take track of the edge as a link between the two nodes self.space.link(node1, node2) # Increment the edges counter self.edges_counter += 1 if not self.directed: # Take track of the same edge again in case of an undirected graph self.space.link(node2, node1) # Increment the edges counter self.edges_counter += 1 # Keep track of the edge weight if node2 not in self.space.space[node1].weights: self.space.space[node1].weights[node2] = set() if not self.directed: if node1 not in self.space.space[node2].weights: self.space.space[node2].weights[node1] = set() self.space.space[node1].weights[node2].add(weight) if not self.directed: self.space.space[node2].weights[node1].add(weight) def _node_memory(self, node: str) -> None: """Build the node memory as a vector containing information about its neighbors. Parameters ---------- node : str The node for which we want to build the memory. Raises ------ Exception - if the input `node` is not in the graph space; - if the input `node` does not have any neighbors. """ if node not in self.space.space: raise Exception("Node `{}` is not in the graph space".format(node)) neighbors = self.space.space[node].children node_memory = None for neighbor in neighbors: # Get the real weight from vector tags for weight in self.space.space[node].weights[neighbor]: # Retrieve the weight vector from the space weight_vector = self.space.space["{}{}".format(WEIGHT_ID, weight)] if node_memory is None: # Initialize the node memory with the first neighbor # multiplied by its weight vector node_memory = weight_vector * self.space.space[neighbor] else: # Multiply each neighbor with its weight vector and # bundle all the resulting vectors together to build the node memory node_memory = node_memory + (weight_vector * self.space.space[neighbor]) # Store the node memory into the memory property of the node vector object self.space.space[node].memory = node_memory def _weight_memory(self) -> None: """Build the weights memory. """ # Recover edge weights from the space for node in self.space.space: # Check whether the current node is not the actual graph memory # Also, check whether the current node is not a weight vector if node != GRAPH_ID and not node.startswith(WEIGHT_ID): for neighbor in self.space.space[node].weights: # Retrieve the weights on these edges self.weights.update(self.space.space[node].weights[neighbor]) for weight in self.weights: # Build a random vector weight_vector = Vector( name="{}{}".format(WEIGHT_ID, weight), size=self.size, vtype=self.vtype ) self.space.insert(weight_vector) def error_rate( self, edges: Set[Tuple[str, str, Any]], ) -> Tuple[float, Set[Tuple[str, str, float]], Set[Tuple[str, str, float]]]: """Compute the error rate defined as the number of mispredicted edges on the total number of edges. Note that the error rate depends on the set of edges in input to this function which could be different from the actual set of edges used to build the graph model. Parameters ---------- edges : set The set of edges used to mitigate the graph model error rate. Note that the edges in this set do not necessarily have to be present in the graph. Returns ------- tuple A tuple with the error rate, and the sets of flase positive and false negative edges among those in the input `edges`. """ # Compute the error rate as the number of mispredicted edges over the total number of edges false_positives = set() false_negatives = set() for edge in edges: node1, node2, weight_true = edge weight_pred = list() for weight in self.weights: _, weight_dist = self.edge_exists(node1, node2, weight) weight_pred.append((weight, weight_dist)) if len(weight_pred) > 1: # Sort weights based on their distances weight_pred = sorted(weight_pred, key=lambda w: w[1]) # Get the difference between the closest and the farthest distances dist_diff = weight_pred[-1][1] - weight_pred[0][1] # Use the difference in distances to select the top closest weights weight_pred = [w[0] for w in weight_pred if w[1] - weight_pred[0][1] < dist_diff] if weight_true in weight_pred and node2 not in self.space.space[node1].children: false_positives.add(edge) elif weight_true not in weight_pred and node2 in self.space.space[node1].children: false_negatives.add(edge) else: weight_pred = weight_pred[0] if weight_true == weight_pred and node2 not in self.space.space[node1].children: false_positives.add(edge) elif weight_true != weight_pred and node2 in self.space.space[node1].children: false_negatives.add(edge) return (len(false_positives) + len(false_negatives)) / len(edges), false_positives, false_negatives def error_mitigation( self, edges: Set[Tuple[str, str, Any]], max_iter: int=10, prev_error_rate: Optional[float]=None ) -> None: """Mitigate the error rate of the graph model. Parameters ---------- edges : set The set of edges used to mitigate the graph model error rate. Note that the edges in this set do not necessarily have to be present in the graph. max_iter : int, deafult 10 This is an iterative process that is repeated for up to `max_iter` iterations. prev_error_rate : float, optional Used to compare the error rate of the graph model with the error rate computed at the previous iteration. This must be initially set to `1.0`. """ # Compute the graph model error rate error_rate, false_positives, false_negatives = self.error_rate(edges) if (prev_error_rate is None or error_rate < prev_error_rate) and max_iter > 0: # Rebuild the mispredicted node memories for edge in false_positives.union(false_negatives): node1, node2, weight = edge # Retrieve the weight vector from the space weight_vector = self.space.space["{}{}".format(WEIGHT_ID, weight)] if self.space.space[node1].memory: if edge in false_positives: # Reduce the signal of node2 in the memory of node1 self.space.space[node1].memory -= (weight_vector * self.space.space[node2]) elif edge in false_negatives: self.space.space[node1].memory += (weight_vector * self.space.space[node2]) # Rebuild the graph model # Do not rebuild the nodes memory since they have been overwritten earlier self.fit(self.edges, build_nodes_memory=False) # Recursively mitigate the error rate self.error_mitigation(edges, max_iter=max_iter-1, prev_error_rate=error_rate) def fit( self, edges: Set[Tuple[str, str, Any]], build_nodes_memory: bool=True ) -> None: """Build the graph memory and store it into the space. Parameters ---------- edges : set The set of edges defined as tuples `<source, target, weight>`. build_nodes_memory : bool, default True Build nodes and weight memories by default. This must be set to False in case this is invoked to mitigate the error rate. Raises ------ ValueError If no edges are provided in input. """ if not edges: raise ValueError("Must provide at least one edge") for edge in edges: node1, node2, weight = edge self._add_edge(node1, node2, weight) if build_nodes_memory: # Build the vector representation of the edges weight self._weight_memory() graph = None for node in self.space.space: # Check whether the current node is not the actual graph memory # Also, check whether the current node is not a weight vector # This is required because this function can be run multiple times if node != GRAPH_ID and not node.startswith(WEIGHT_ID): if build_nodes_memory: # Build the node memory self._node_memory(node) if self.directed: if graph is None: if self.space.space[node].memory: graph = self.space.space[node] * permute(self.space.space[node].memory, rotate_by=1) else: graph = self.space.space[node] else: if self.space.space[node].memory: # Build the graph memory as the bundle of all the node memories rotated by 1 position graph = graph + (self.space.space[node] * permute(self.space.space[node].memory, rotate_by=1)) else: graph = graph + self.space.space[node] else: if graph is None: if self.space.space[node].memory: graph = self.space.space[node] * self.space.space[node].memory else: graph = self.space.space[node] else: if self.space.space[node].memory: # Build the graph memory as the bundle of all the node memories graph = graph + (self.space.space[node] * self.space.space[node].memory) else: graph = graph + self.space.space[node] if not self.directed: # Introduce a factor 1/2 because if we expand the node memory, then # H(i)*H(j) and H(j)*H(i) will be counted twice graph.vector = graph.vector / 2 # Check whether a graph is already present in the space if GRAPH_ID in self.space.space: self.space.remove(GRAPH_ID) # Rename the graph vector graph.name = GRAPH_ID # Store the graph vector into the space self.space.insert(graph) # Also keep track of the edges # This is used in case of the error_mitigation() self.edges = edges def edge_exists( self, node1: str, node2: str, weight: Any, threshold: float=0.7 ) -> Tuple[bool, float]: """Check whether an edge exists between `node1` and `node2` according to a specified distance `threshold`. node1 : str The source node name or ID. node2 : str The target node name or ID. weight : Any The edge weight. threshold : float, default 0.7 The distance threshold on vectors to establish the presence of the edge. Returns ------- Tuple True in case an edge between `node1` and `node2` exists in the graph space, otherwise False. It also returns the actual distance between the two vectors. Raises ------ Exception - if there is no graph vector in the space; - if `node1` and `node2` are not in the graph space. """ if GRAPH_ID not in self.space.space: raise Exception("There is no graph in space") for node in [node1, node2]: if node not in self.space.space: raise Exception("Node '{}' not in space".format(node)) if "{}{}".format(WEIGHT_ID, weight) not in self.space.space: raise Exception("Weight vector '{}' not in space".format(weight)) # Retrieve the vector representation of the graph graph = self.space.space[GRAPH_ID] # Also retrieve the vector representations of the two input nodes node1_vector = self.space.space[node1] node2_vector = self.space.space[node2] # Retrieve the node1 memory by binding the vector representation of # node1 with the vector representation of the graph # The resulting vector equals to the actual node1 memory plus noise node1_memory = bind(node1_vector, graph) if self.directed: # In case of directed graphs, nodes memory are rotated by 1 position # Thus, it must be rotated back in order to preserve the similarity node1_memory = permute(node1_memory, rotate_by=-1) # Check whether there is a edge between node1 and node2 by computing the distance between node2 and node1 memory # A distance close to 0 means the edge exists # A distance close to 1 means the edge does not exist # Retrieve the weight vector from the space weight_vector = self.space.space["{}{}".format(WEIGHT_ID, weight)] with np.errstate(invalid="ignore", divide="ignore"): distance = (weight_vector * node2_vector).dist(node1_memory, method="cosine") if distance < threshold: return True, distance return False, distance def predict( self, edges: Set[Tuple[str, str, Any]], retrain: int=10 ) -> Tuple[Any, float]: """Predict the weight (class) of a specific set of edges. Parameters ---------- edges : set Set of edges. retrain : int, default 10 Maximum number of retraining iterations. Returns ------- tuple A tuple with the predicted class and it's accuracy in terms of percentage of edges that matched the predicted class. """ if retrain < 1: # Use a minimum of 1 iteration retrain = 1 prediction = None accuracy = 0.0 # Keep track of edges where nodes exist in the graph # If a node does not exist, discart it # This is just for retraining purposes # We cannot retrain using nodes that are not in the graph space retraining_edges = set() for retrain_iter in range(retrain): hits = {weight: 0 for weight in self.weights} for node1, node2, edge_weight in edges: if node1 in self.space.space and node2 in self.space.space: weight_pred = list() for weight in self.weights: _, weight_dist = self.edge_exists(node1, node2, weight) weight_pred.append((weight, weight_dist)) if len(weight_pred) > 1: # Sort weights based on their distances weight_pred = sorted(weight_pred, key=lambda w: w[1]) # Get the difference between the closest and the farthest distances dist_diff = weight_pred[-1][1] - weight_pred[0][1] # Use the difference in distances to select the top closest weights weight_pred = [w[0] for w in weight_pred if w[1] - weight_pred[0][1] < dist_diff] for weight in weight_pred: hits[weight] += 1 else: weight_pred = weight_pred[0] hits[weight_pred] += 1 # Both node1 and node2 are in the graph space # Keep track of this edge for retraining purposes retraining_edges.add((node1, node2, edge_weight)) # Get the best hit iter_prediction = max(hits, key=hits.get) iter_accuracy = (hits[iter_prediction] / len(edges)) * 100.0 if iter_accuracy < accuracy: break # Compare with the previous retraining iteration # Update prediction and accuracy prediction = iter_prediction accuracy = iter_accuracy # Skip the retraining if this is the last iteration if retrain_iter < retrain-1: # Apply `error_mitigation` with the input set of edges (where nodes are in the graph space) # This is supposed to improve the model ability to correctly predict edges' weights self.error_mitigation(retraining_edges, max_iter=retrain) return (prediction, accuracy)
Methods
def edge_exists(self, node1: str, node2: str, weight: Any, threshold: float = 0.7) ‑> Tuple[bool, float]
-
Check whether an edge exists between
node1
andnode2
according to a specified distancethreshold
.node1 : str The source node name or ID. node2 : str The target node name or ID. weight : Any The edge weight. threshold : float, default 0.7 The distance threshold on vectors to establish the presence of the edge.
Returns
Tuple
- True in case an edge between
node1
andnode2
exists in the graph space, otherwise False. It also returns the actual distance between the two vectors.
Raises
Exception
-
- if there is no graph vector in the space;
- if
node1
andnode2
are not in the graph space.
def error_mitigation(self, edges: Set[Tuple[str, str, Any]], max_iter: int = 10, prev_error_rate: Optional[float] = None) ‑> None
-
Mitigate the error rate of the graph model.
Parameters
edges
:set
- The set of edges used to mitigate the graph model error rate. Note that the edges in this set do not necessarily have to be present in the graph.
max_iter
:int, deafult 10
- This is an iterative process that is repeated for up to
max_iter
iterations. prev_error_rate
:float
, optional- Used to compare the error rate of the graph model with the error rate computed
at the previous iteration. This must be initially set to
1.0
.
def error_rate(self, edges: Set[Tuple[str, str, Any]]) ‑> Tuple[float, Set[Tuple[str, str, float]], Set[Tuple[str, str, float]]]
-
Compute the error rate defined as the number of mispredicted edges on the total number of edges. Note that the error rate depends on the set of edges in input to this function which could be different from the actual set of edges used to build the graph model.
Parameters
edges
:set
- The set of edges used to mitigate the graph model error rate. Note that the edges in this set do not necessarily have to be present in the graph.
Returns
tuple
- A tuple with the error rate, and the sets of flase positive and false negative edges
among those in the input
edges
.
def fit(self, edges: Set[Tuple[str, str, Any]], build_nodes_memory: bool = True) ‑> None
-
Build the graph memory and store it into the space.
Parameters
edges
:set
- The set of edges defined as tuples
<source, target, weight>
. build_nodes_memory
:bool
, defaultTrue
- Build nodes and weight memories by default. This must be set to False in case this is invoked to mitigate the error rate.
Raises
ValueError
- If no edges are provided in input.
def predict(self, edges: Set[Tuple[str, str, Any]], retrain: int = 10) ‑> Tuple[Any, float]
-
Predict the weight (class) of a specific set of edges.
Parameters
edges
:set
- Set of edges.
retrain
:int
, default10
- Maximum number of retraining iterations.
Returns
tuple
- A tuple with the predicted class and it's accuracy in terms of percentage of edges that matched the predicted class.