From 5718fe14b7954476e18d5df9d4cb557daecd4bab Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Mon, 20 May 2024 14:02:39 +0000 Subject: [PATCH 1/6] Gossipsub implementation Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 34 +++++++++++++++++++++++++++++++++- DAS/simulator.py | 14 +++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 85aaf8a..5774242 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -246,6 +246,14 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 + def receiveSegmentViaGossip(self, rID, cID): + """Receive a segment via gossipsub protocol.""" + if not self.amImalicious: + self.logger.trace("Recv via gossipsub: %d: %d,%d", self.ID, rID, cID, extra=self.format) + self.receivedBlock.setSegment(rID, cID) + self.sampleRecvCount += 1 + self.statsRxInSlot += 1 + def addToSendQueue(self, rID, cID): """Queue a segment for forwarding.""" if self.perNodeQueue and not self.amImalicious: @@ -504,7 +512,26 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def send(self): + def gossipSub(self, rows, cols): + """ This function facilitates the Gossipsub protocol for segment distribution among nodes. + It ensures that each node receives any missing segments by checking other nodes in the network. + + Args: + rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows. + cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns. + + Description: + - The function iterates through all row IDs and column IDs. + - For each segment identified by a row ID (rID) and a column ID (cID): + - It checks if the current node (self) already has the segment. + - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. + """ + for rID in rows: + for cID in cols: + if not self.receivedBlock.getSegment(rID, cID): + self.receiveSegmentViaGossip(rID, cID) + + def send(self, rows, cols): """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue @@ -529,6 +556,11 @@ class Node: self.runDumbRandomScheduler() if self.statsTxInSlot >= self.bwUplink: return + + if not self.amImalicious: + self.gossipSub(rows, cols) + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 7d4b341..9aed26d 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -278,10 +278,22 @@ class Simulator: self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) oldMissingSamples = missingSamples + rows = {} + cols = {} + for i in range(0, self.shape.numberNodes): + if not self.validators[i].amIproposer and not self.validators[i].amImalicious: + for id in self.validators[i].columnIDs: + if id not in cols: + cols[id] = [] + cols[id].append(self.validators[i]) + for id in self.validators[i].rowIDs: + if id not in rows: + rows[id] = [] + rows[id].append(self.validators[i]) self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send() + self.validators[i].send(rows, cols) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() From 45f773b184a6ce7122d2b431c108081e64429731 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Tue, 21 May 2024 08:34:58 +0000 Subject: [PATCH 2/6] add gossipsub as a parameter in the config file Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 69 ++++++++++++++++++++++++++---------------------- DAS/simulator.py | 2 +- smallConf.py | 3 +++ 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 5774242..ef36930 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -246,10 +246,10 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 - def receiveSegmentViaGossip(self, rID, cID): - """Receive a segment via gossipsub protocol.""" + def receiveSegmentViaGossip(self, rID, cID, source): + """Receive a segment via gossipsub protocol from a specific source.""" if not self.amImalicious: - self.logger.trace("Recv via gossipsub: %d: %d,%d", self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv via gossipsub %d-> %d: %d,%d", source, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) self.sampleRecvCount += 1 self.statsRxInSlot += 1 @@ -526,41 +526,48 @@ class Node: - It checks if the current node (self) already has the segment. - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. """ - for rID in rows: - for cID in cols: + for rID, rSources in rows.items(): + for cID, cSources in cols.items(): if not self.receivedBlock.getSegment(rID, cID): - self.receiveSegmentViaGossip(rID, cID) + sources = list(set(rSources).intersection(cSources)) + if sources: + source = sources[0] # Pick the first source from the intersection + self.receiveSegmentViaGossip(rID, cID, source) + self.statsTxInSlot += 1 # request sent to receive segment via gossip + - def send(self, rows, cols): + def send(self, gossipsub, rows, cols): """ Send as much as we can in the timestep, limited by bwUplink.""" - # process node level send queue - if not self.amImalicious: - self.processSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + if gossipsub: + if not self.amImalicious: + self.gossipSub(rows, cols) + if self.statsTxInSlot >= self.bwUplink: + return - # process neighbor level send queues in shuffled breadth-first order - if not self.amImalicious: - self.processPerNeighborSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + else: + # process node level send queue + if not self.amImalicious: + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - # process possible segments to send in shuffled breadth-first order - if self.segmentShuffleScheduler and not self.amImalicious: - self.runSegmentShuffleScheduler() - if self.statsTxInSlot >= self.bwUplink: - return + # process neighbor level send queues in shuffled breadth-first order + if not self.amImalicious: + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - if self.dumbRandomScheduler and not self.amImalicious: - self.runDumbRandomScheduler() - if self.statsTxInSlot >= self.bwUplink: - return - - if not self.amImalicious: - self.gossipSub(rows, cols) - if self.statsTxInSlot >= self.bwUplink: - return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler and not self.amImalicious: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return + + if self.dumbRandomScheduler and not self.amImalicious: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 9aed26d..a1b3cf4 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -293,7 +293,7 @@ class Simulator: self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send(rows, cols) + self.validators[i].send(self.config.gossipsub, rows, cols) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/smallConf.py b/smallConf.py index 18a9c17..c2f2d9a 100644 --- a/smallConf.py +++ b/smallConf.py @@ -59,6 +59,9 @@ maliciousNodes = range(40,41,20) # If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used randomizeMaliciousNodes = True +# When set to True, nodes will use the Gossipsub protocol for communication +gossipsub = False + # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2) From d9f29dc5f29eb26c55ff54fe014a45103e506e63 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Wed, 29 May 2024 11:48:09 +0000 Subject: [PATCH 3/6] --- DAS/node.py | 142 ++++++++++++++++++++++++++++------------------- DAS/simulator.py | 20 +++---- smallConf.py | 5 +- 3 files changed, 97 insertions(+), 70 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index ef36930..e89575c 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -3,6 +3,8 @@ import random import collections import logging +from collections import defaultdict +import threading from DAS.block import * from DAS.tools import shuffled, shuffledDict, unionOfSamples from bitarray.util import zeros @@ -76,6 +78,7 @@ class Node: self.repairedSampleCount = 0 self.logger = logger self.validators = validators + self.received_gossip = defaultdict(list) if amIproposer: self.nodeClass = 0 @@ -246,14 +249,6 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 - def receiveSegmentViaGossip(self, rID, cID, source): - """Receive a segment via gossipsub protocol from a specific source.""" - if not self.amImalicious: - self.logger.trace("Recv via gossipsub %d-> %d: %d,%d", source, self.ID, rID, cID, extra=self.format) - self.receivedBlock.setSegment(rID, cID) - self.sampleRecvCount += 1 - self.statsRxInSlot += 1 - def addToSendQueue(self, rID, cID): """Queue a segment for forwarding.""" if self.perNodeQueue and not self.amImalicious: @@ -512,62 +507,97 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def gossipSub(self, rows, cols): - """ This function facilitates the Gossipsub protocol for segment distribution among nodes. - It ensures that each node receives any missing segments by checking other nodes in the network. + def sendGossip(self, neigh): + """Simulate sending row and column IDs to a peer.""" + have_info = {'source': self.ID, 'rowIDs': self.rowIDs, 'columnIDs': self.columnIDs} + neigh.node.received_gossip[self.ID].append(have_info) + neigh.node.msgRecvCount += 1 + self.logger.debug(f"Gossip sent to {neigh.node.ID}: {neigh.node.received_gossip}", extra=self.format) - Args: - rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows. - cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns. - - Description: - - The function iterates through all row IDs and column IDs. - - For each segment identified by a row ID (rID) and a column ID (cID): - - It checks if the current node (self) already has the segment. - - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. + def process_received_gossip(self, simulator): """ - for rID, rSources in rows.items(): - for cID, cSources in cols.items(): - if not self.receivedBlock.getSegment(rID, cID): - sources = list(set(rSources).intersection(cSources)) - if sources: - source = sources[0] # Pick the first source from the intersection - self.receiveSegmentViaGossip(rID, cID, source) - self.statsTxInSlot += 1 # request sent to receive segment via gossip + Processes received gossip messages to request and receive data segments. + For each segment not already received, it simulates requesting the segment, + logs the request and receipt, and updates the segment status and relevant counters. + """ + for sender, have_infos in self.received_gossip.items(): + for have_info in have_infos: + for rowID in have_info['rowIDs']: + for columnID in have_info['columnIDs']: + if not self.receivedBlock.getSegment(rowID, columnID): + # request for the segment + self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) + self.msgSentCount += 1 + # source sends the segment + self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) + simulator.validators[have_info['source']].sampleSentCount += 1 + simulator.validators[have_info['source']].statsTxInSlot += 1 + # receive the segment + self.receivedBlock.setSegment(rowID, columnID) + self.sampleRecvCount += 1 + self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) + self.received_gossip.clear() - - def send(self, gossipsub, rows, cols): + def gossip(self, simulator): + """ + Periodically sends gossip messages to a random subset of neighbors to share information + about data segments (row and column IDs). The process involves: + 1. Selecting a random subset of row and column neighbors. + 2. Sending the node's current state (row and column IDs) to these neighbors. + 3. Neighbors process the received gossip and update their state accordingly. + + This ensures data dissemination across the network with minimal delay, + occurring at intervals defined by the HEARTBEAT timer. + """ + if self.rowIDs: + rID = random.choice(list(self.rowIDs)) + rowNeighs = list(self.rowNeighbors[rID].values()) + num_row_peers = random.randint(1, len(rowNeighs)) + selected_row_neighs = random.sample(rowNeighs, num_row_peers) + for rowNeigh in selected_row_neighs: + self.sendGossip(rowNeigh) + self.msgSentCount += 1 + rowNeigh.node.process_received_gossip(simulator) + if self.statsTxInSlot >= self.bwUplink: + return + + if self.columnIDs: + cID = random.choice(list(self.columnIDs)) + columnNeighs = list(self.columnNeighbors[cID].values()) + num_column_peers = random.randint(1, len(columnNeighs)) + selected_column_neighs = random.sample(columnNeighs, num_column_peers) + for columnNeigh in selected_column_neighs: + self.sendGossip(columnNeigh) + self.msgSentCount += 1 + columnNeigh.node.process_received_gossip(simulator) + if self.statsTxInSlot >= self.bwUplink: + return + + def send(self): """ Send as much as we can in the timestep, limited by bwUplink.""" - if gossipsub: - if not self.amImalicious: - self.gossipSub(rows, cols) - if self.statsTxInSlot >= self.bwUplink: - return + # process node level send queue + if not self.amImalicious: + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - else: - # process node level send queue - if not self.amImalicious: - self.processSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + # process neighbor level send queues in shuffled breadth-first order + if not self.amImalicious: + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - # process neighbor level send queues in shuffled breadth-first order - if not self.amImalicious: - self.processPerNeighborSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler and not self.amImalicious: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return - # process possible segments to send in shuffled breadth-first order - if self.segmentShuffleScheduler and not self.amImalicious: - self.runSegmentShuffleScheduler() - if self.statsTxInSlot >= self.bwUplink: - return - - if self.dumbRandomScheduler and not self.amImalicious: - self.runDumbRandomScheduler() - if self.statsTxInSlot >= self.bwUplink: - return + if self.dumbRandomScheduler and not self.amImalicious: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index a1b3cf4..06c5510 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -278,22 +278,16 @@ class Simulator: self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) oldMissingSamples = missingSamples - rows = {} - cols = {} - for i in range(0, self.shape.numberNodes): - if not self.validators[i].amIproposer and not self.validators[i].amImalicious: - for id in self.validators[i].columnIDs: - if id not in cols: - cols[id] = [] - cols[id].append(self.validators[i]) - for id in self.validators[i].rowIDs: - if id not in rows: - rows[id] = [] - rows[id].append(self.validators[i]) + self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send(self.config.gossipsub, rows, cols) + self.validators[i].send() + if steps % self.config.heartbeat == 0 and self.config.gossip: + self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): + if not self.validators[i].amImalicious: + self.validators[i].gossip(self) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/smallConf.py b/smallConf.py index c2f2d9a..7172013 100644 --- a/smallConf.py +++ b/smallConf.py @@ -60,7 +60,10 @@ maliciousNodes = range(40,41,20) randomizeMaliciousNodes = True # When set to True, nodes will use the Gossipsub protocol for communication -gossipsub = False +gossip = True + +# Heartbeat interval for gossip messages in simulation steps +heartbeat = 10 # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2) From e5dd13bd97cd7cccff81bc6272ed6df9ff242980 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Wed, 29 May 2024 11:51:02 +0000 Subject: [PATCH 4/6] set heartbeat to 20 steps Signed-off-by: Arunima Chaudhuri --- smallConf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smallConf.py b/smallConf.py index 7172013..1c60cc0 100644 --- a/smallConf.py +++ b/smallConf.py @@ -63,7 +63,7 @@ randomizeMaliciousNodes = True gossip = True # Heartbeat interval for gossip messages in simulation steps -heartbeat = 10 +heartbeat = 20 # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2) From ae6f2a370db48d9e5638d5ef302ffdb722427a34 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Sat, 8 Jun 2024 07:32:55 +0000 Subject: [PATCH 5/6] debug gossip Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 83 +++++++++++++++++++++++----------------------------- smallConf.py | 2 +- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index e89575c..19f317b 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -507,14 +507,14 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def sendGossip(self, neigh): + def sendGossip(self, peer, segments_to_send): """Simulate sending row and column IDs to a peer.""" - have_info = {'source': self.ID, 'rowIDs': self.rowIDs, 'columnIDs': self.columnIDs} - neigh.node.received_gossip[self.ID].append(have_info) - neigh.node.msgRecvCount += 1 - self.logger.debug(f"Gossip sent to {neigh.node.ID}: {neigh.node.received_gossip}", extra=self.format) + have_info = {'source': self.ID, 'segments': segments_to_send} + peer.received_gossip[self.ID].append(have_info) + peer.msgRecvCount += 1 + self.logger.debug(f"Gossip sent to {peer.ID}: {peer.received_gossip}", extra=self.format) - def process_received_gossip(self, simulator): + def processReceivedGossip(self, simulator): """ Processes received gossip messages to request and receive data segments. For each segment not already received, it simulates requesting the segment, @@ -522,54 +522,45 @@ class Node: """ for sender, have_infos in self.received_gossip.items(): for have_info in have_infos: - for rowID in have_info['rowIDs']: - for columnID in have_info['columnIDs']: - if not self.receivedBlock.getSegment(rowID, columnID): - # request for the segment - self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) - self.msgSentCount += 1 - # source sends the segment - self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) - simulator.validators[have_info['source']].sampleSentCount += 1 - simulator.validators[have_info['source']].statsTxInSlot += 1 - # receive the segment - self.receivedBlock.setSegment(rowID, columnID) - self.sampleRecvCount += 1 - self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) + for rowID, columnID in have_info['segments']: + if not self.receivedBlock.getSegment(rowID, columnID): + # request for the segment + self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) + self.msgSentCount += 1 + # source sends the segment + self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) + simulator.validators[have_info['source']].sampleSentCount += 1 + simulator.validators[have_info['source']].statsTxInSlot += 1 + # receive the segment + self.receivedBlock.setSegment(rowID, columnID) + self.sampleRecvCount += 1 + self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) self.received_gossip.clear() def gossip(self, simulator): """ - Periodically sends gossip messages to a random subset of neighbors to share information - about data segments (row and column IDs). The process involves: - 1. Selecting a random subset of row and column neighbors. - 2. Sending the node's current state (row and column IDs) to these neighbors. - 3. Neighbors process the received gossip and update their state accordingly. + Periodically sends gossip messages to a random subset of nodes to share information + about data segments. The process involves: + 1. Selecting a random subset of nodes. + 2. Sending the node's current state (row and column IDs) to these nodes. + 3. Process the received gossip and update their state accordingly. - This ensures data dissemination across the network with minimal delay, + This ensures data dissemination across the network, occurring at intervals defined by the HEARTBEAT timer. """ - if self.rowIDs: - rID = random.choice(list(self.rowIDs)) - rowNeighs = list(self.rowNeighbors[rID].values()) - num_row_peers = random.randint(1, len(rowNeighs)) - selected_row_neighs = random.sample(rowNeighs, num_row_peers) - for rowNeigh in selected_row_neighs: - self.sendGossip(rowNeigh) + total_nodes = simulator.shape.numberNodes + num_peers = random.randint(1, total_nodes - 1) + peers = random.sample(range(1, total_nodes), num_peers) + segments_to_send = [] + for rID in range(0, self.shape.nbRows): + for cID in range(0, self.shape.nbCols): + if self.block.getSegment(rID, cID): + segments_to_send.append((rID, cID)) + if segments_to_send: + for peer in peers: + self.sendGossip(simulator.validators[peer], segments_to_send) self.msgSentCount += 1 - rowNeigh.node.process_received_gossip(simulator) - if self.statsTxInSlot >= self.bwUplink: - return - - if self.columnIDs: - cID = random.choice(list(self.columnIDs)) - columnNeighs = list(self.columnNeighbors[cID].values()) - num_column_peers = random.randint(1, len(columnNeighs)) - selected_column_neighs = random.sample(columnNeighs, num_column_peers) - for columnNeigh in selected_column_neighs: - self.sendGossip(columnNeigh) - self.msgSentCount += 1 - columnNeigh.node.process_received_gossip(simulator) + simulator.validators[peer].processReceivedGossip(simulator) if self.statsTxInSlot >= self.bwUplink: return diff --git a/smallConf.py b/smallConf.py index 1c60cc0..74d5dd0 100644 --- a/smallConf.py +++ b/smallConf.py @@ -59,7 +59,7 @@ maliciousNodes = range(40,41,20) # If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used randomizeMaliciousNodes = True -# When set to True, nodes will use the Gossipsub protocol for communication +# When set to True, nodes will use the Gossip for communication gossip = True # Heartbeat interval for gossip messages in simulation steps From f37190e666458427096f3e764f0aaf1d981dfbc2 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Fri, 5 Jul 2024 21:58:21 +0000 Subject: [PATCH 6/6] add check for rowID and columnID Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/node.py b/DAS/node.py index 19f317b..007033b 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -523,7 +523,7 @@ class Node: for sender, have_infos in self.received_gossip.items(): for have_info in have_infos: for rowID, columnID in have_info['segments']: - if not self.receivedBlock.getSegment(rowID, columnID): + if not self.receivedBlock.getSegment(rowID, columnID) and (rowID in self.rowIDs or columnID in self.columnIDs): # request for the segment self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) self.msgSentCount += 1