From 5718fe14b7954476e18d5df9d4cb557daecd4bab Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Mon, 20 May 2024 14:02:39 +0000 Subject: [PATCH] Gossipsub implementation Signed-off-by: Arunima Chaudhuri --- DAS/node.py | 34 +++++++++++++++++++++++++++++++++- DAS/simulator.py | 14 +++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 85aaf8a..5774242 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -246,6 +246,14 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 + def receiveSegmentViaGossip(self, rID, cID): + """Receive a segment via gossipsub protocol.""" + if not self.amImalicious: + self.logger.trace("Recv via gossipsub: %d: %d,%d", self.ID, rID, cID, extra=self.format) + self.receivedBlock.setSegment(rID, cID) + self.sampleRecvCount += 1 + self.statsRxInSlot += 1 + def addToSendQueue(self, rID, cID): """Queue a segment for forwarding.""" if self.perNodeQueue and not self.amImalicious: @@ -504,7 +512,26 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def send(self): + def gossipSub(self, rows, cols): + """ This function facilitates the Gossipsub protocol for segment distribution among nodes. + It ensures that each node receives any missing segments by checking other nodes in the network. + + Args: + rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows. + cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns. + + Description: + - The function iterates through all row IDs and column IDs. + - For each segment identified by a row ID (rID) and a column ID (cID): + - It checks if the current node (self) already has the segment. + - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. + """ + for rID in rows: + for cID in cols: + if not self.receivedBlock.getSegment(rID, cID): + self.receiveSegmentViaGossip(rID, cID) + + def send(self, rows, cols): """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue @@ -529,6 +556,11 @@ class Node: self.runDumbRandomScheduler() if self.statsTxInSlot >= self.bwUplink: return + + if not self.amImalicious: + self.gossipSub(rows, cols) + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 7d4b341..9aed26d 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -278,10 +278,22 @@ class Simulator: self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) oldMissingSamples = missingSamples + rows = {} + cols = {} + for i in range(0, self.shape.numberNodes): + if not self.validators[i].amIproposer and not self.validators[i].amImalicious: + for id in self.validators[i].columnIDs: + if id not in cols: + cols[id] = [] + cols[id].append(self.validators[i]) + for id in self.validators[i].rowIDs: + if id not in rows: + rows[id] = [] + rows[id].append(self.validators[i]) self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send() + self.validators[i].send(rows, cols) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns()