From ae6f2a370db48d9e5638d5ef302ffdb722427a34 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Sat, 8 Jun 2024 07:32:55 +0000 Subject: [PATCH] debug gossip Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 83 +++++++++++++++++++++++----------------------------- smallConf.py | 2 +- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index e89575c..19f317b 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -507,14 +507,14 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def sendGossip(self, neigh): + def sendGossip(self, peer, segments_to_send): """Simulate sending row and column IDs to a peer.""" - have_info = {'source': self.ID, 'rowIDs': self.rowIDs, 'columnIDs': self.columnIDs} - neigh.node.received_gossip[self.ID].append(have_info) - neigh.node.msgRecvCount += 1 - self.logger.debug(f"Gossip sent to {neigh.node.ID}: {neigh.node.received_gossip}", extra=self.format) + have_info = {'source': self.ID, 'segments': segments_to_send} + peer.received_gossip[self.ID].append(have_info) + peer.msgRecvCount += 1 + self.logger.debug(f"Gossip sent to {peer.ID}: {peer.received_gossip}", extra=self.format) - def process_received_gossip(self, simulator): + def processReceivedGossip(self, simulator): """ Processes received gossip messages to request and receive data segments. For each segment not already received, it simulates requesting the segment, @@ -522,54 +522,45 @@ class Node: """ for sender, have_infos in self.received_gossip.items(): for have_info in have_infos: - for rowID in have_info['rowIDs']: - for columnID in have_info['columnIDs']: - if not self.receivedBlock.getSegment(rowID, columnID): - # request for the segment - self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) - self.msgSentCount += 1 - # source sends the segment - self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) - simulator.validators[have_info['source']].sampleSentCount += 1 - simulator.validators[have_info['source']].statsTxInSlot += 1 - # receive the segment - self.receivedBlock.setSegment(rowID, columnID) - self.sampleRecvCount += 1 - self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) + for rowID, columnID in have_info['segments']: + if not self.receivedBlock.getSegment(rowID, columnID): + # request for the segment + self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) + self.msgSentCount += 1 + # source sends the segment + self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) + simulator.validators[have_info['source']].sampleSentCount += 1 + simulator.validators[have_info['source']].statsTxInSlot += 1 + # receive the segment + self.receivedBlock.setSegment(rowID, columnID) + self.sampleRecvCount += 1 + self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) self.received_gossip.clear() def gossip(self, simulator): """ - Periodically sends gossip messages to a random subset of neighbors to share information - about data segments (row and column IDs). The process involves: - 1. Selecting a random subset of row and column neighbors. - 2. Sending the node's current state (row and column IDs) to these neighbors. - 3. Neighbors process the received gossip and update their state accordingly. + Periodically sends gossip messages to a random subset of nodes to share information + about data segments. The process involves: + 1. Selecting a random subset of nodes. + 2. Sending the node's current state (row and column IDs) to these nodes. + 3. Process the received gossip and update their state accordingly. - This ensures data dissemination across the network with minimal delay, + This ensures data dissemination across the network, occurring at intervals defined by the HEARTBEAT timer. """ - if self.rowIDs: - rID = random.choice(list(self.rowIDs)) - rowNeighs = list(self.rowNeighbors[rID].values()) - num_row_peers = random.randint(1, len(rowNeighs)) - selected_row_neighs = random.sample(rowNeighs, num_row_peers) - for rowNeigh in selected_row_neighs: - self.sendGossip(rowNeigh) + total_nodes = simulator.shape.numberNodes + num_peers = random.randint(1, total_nodes - 1) + peers = random.sample(range(1, total_nodes), num_peers) + segments_to_send = [] + for rID in range(0, self.shape.nbRows): + for cID in range(0, self.shape.nbCols): + if self.block.getSegment(rID, cID): + segments_to_send.append((rID, cID)) + if segments_to_send: + for peer in peers: + self.sendGossip(simulator.validators[peer], segments_to_send) self.msgSentCount += 1 - rowNeigh.node.process_received_gossip(simulator) - if self.statsTxInSlot >= self.bwUplink: - return - - if self.columnIDs: - cID = random.choice(list(self.columnIDs)) - columnNeighs = list(self.columnNeighbors[cID].values()) - num_column_peers = random.randint(1, len(columnNeighs)) - selected_column_neighs = random.sample(columnNeighs, num_column_peers) - for columnNeigh in selected_column_neighs: - self.sendGossip(columnNeigh) - self.msgSentCount += 1 - columnNeigh.node.process_received_gossip(simulator) + simulator.validators[peer].processReceivedGossip(simulator) if self.statsTxInSlot >= self.bwUplink: return diff --git a/smallConf.py b/smallConf.py index 1c60cc0..74d5dd0 100644 --- a/smallConf.py +++ b/smallConf.py @@ -59,7 +59,7 @@ maliciousNodes = range(40,41,20) # If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used randomizeMaliciousNodes = True -# When set to True, nodes will use the Gossipsub protocol for communication +# When set to True, nodes will use the Gossip for communication gossip = True # Heartbeat interval for gossip messages in simulation steps