diff --git a/DAS/node.py b/DAS/node.py index 5774242..ef36930 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -246,10 +246,10 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 - def receiveSegmentViaGossip(self, rID, cID): - """Receive a segment via gossipsub protocol.""" + def receiveSegmentViaGossip(self, rID, cID, source): + """Receive a segment via gossipsub protocol from a specific source.""" if not self.amImalicious: - self.logger.trace("Recv via gossipsub: %d: %d,%d", self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv via gossipsub %d-> %d: %d,%d", source, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) self.sampleRecvCount += 1 self.statsRxInSlot += 1 @@ -526,41 +526,48 @@ class Node: - It checks if the current node (self) already has the segment. - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. """ - for rID in rows: - for cID in cols: + for rID, rSources in rows.items(): + for cID, cSources in cols.items(): if not self.receivedBlock.getSegment(rID, cID): - self.receiveSegmentViaGossip(rID, cID) + sources = list(set(rSources).intersection(cSources)) + if sources: + source = sources[0] # Pick the first source from the intersection + self.receiveSegmentViaGossip(rID, cID, source) + self.statsTxInSlot += 1 # request sent to receive segment via gossip + - def send(self, rows, cols): + def send(self, gossipsub, rows, cols): """ Send as much as we can in the timestep, limited by bwUplink.""" - # process node level send queue - if not self.amImalicious: - self.processSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + if gossipsub: + if not self.amImalicious: + self.gossipSub(rows, cols) + if self.statsTxInSlot >= self.bwUplink: + return - # process neighbor level send queues in shuffled breadth-first order - if not self.amImalicious: - self.processPerNeighborSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + else: + # process node level send queue + if not self.amImalicious: + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - # process possible segments to send in shuffled breadth-first order - if self.segmentShuffleScheduler and not self.amImalicious: - self.runSegmentShuffleScheduler() - if self.statsTxInSlot >= self.bwUplink: - return + # process neighbor level send queues in shuffled breadth-first order + if not self.amImalicious: + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - if self.dumbRandomScheduler and not self.amImalicious: - self.runDumbRandomScheduler() - if self.statsTxInSlot >= self.bwUplink: - return - - if not self.amImalicious: - self.gossipSub(rows, cols) - if self.statsTxInSlot >= self.bwUplink: - return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler and not self.amImalicious: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return + + if self.dumbRandomScheduler and not self.amImalicious: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 9aed26d..a1b3cf4 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -293,7 +293,7 @@ class Simulator: self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send(rows, cols) + self.validators[i].send(self.config.gossipsub, rows, cols) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/smallConf.py b/smallConf.py index 18a9c17..c2f2d9a 100644 --- a/smallConf.py +++ b/smallConf.py @@ -59,6 +59,9 @@ maliciousNodes = range(40,41,20) # If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used randomizeMaliciousNodes = True +# When set to True, nodes will use the Gossipsub protocol for communication +gossipsub = False + # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2)