diff --git a/DAS/node.py b/DAS/node.py index ef36930..e89575c 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -3,6 +3,8 @@ import random import collections import logging +from collections import defaultdict +import threading from DAS.block import * from DAS.tools import shuffled, shuffledDict, unionOfSamples from bitarray.util import zeros @@ -76,6 +78,7 @@ class Node: self.repairedSampleCount = 0 self.logger = logger self.validators = validators + self.received_gossip = defaultdict(list) if amIproposer: self.nodeClass = 0 @@ -246,14 +249,6 @@ class Node: self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 - def receiveSegmentViaGossip(self, rID, cID, source): - """Receive a segment via gossipsub protocol from a specific source.""" - if not self.amImalicious: - self.logger.trace("Recv via gossipsub %d-> %d: %d,%d", source, self.ID, rID, cID, extra=self.format) - self.receivedBlock.setSegment(rID, cID) - self.sampleRecvCount += 1 - self.statsRxInSlot += 1 - def addToSendQueue(self, rID, cID): """Queue a segment for forwarding.""" if self.perNodeQueue and not self.amImalicious: @@ -512,62 +507,97 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return - def gossipSub(self, rows, cols): - """ This function facilitates the Gossipsub protocol for segment distribution among nodes. - It ensures that each node receives any missing segments by checking other nodes in the network. + def sendGossip(self, neigh): + """Simulate sending row and column IDs to a peer.""" + have_info = {'source': self.ID, 'rowIDs': self.rowIDs, 'columnIDs': self.columnIDs} + neigh.node.received_gossip[self.ID].append(have_info) + neigh.node.msgRecvCount += 1 + self.logger.debug(f"Gossip sent to {neigh.node.ID}: {neigh.node.received_gossip}", extra=self.format) - Args: - rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows. - cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns. - - Description: - - The function iterates through all row IDs and column IDs. - - For each segment identified by a row ID (rID) and a column ID (cID): - - It checks if the current node (self) already has the segment. - - If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method. + def process_received_gossip(self, simulator): """ - for rID, rSources in rows.items(): - for cID, cSources in cols.items(): - if not self.receivedBlock.getSegment(rID, cID): - sources = list(set(rSources).intersection(cSources)) - if sources: - source = sources[0] # Pick the first source from the intersection - self.receiveSegmentViaGossip(rID, cID, source) - self.statsTxInSlot += 1 # request sent to receive segment via gossip + Processes received gossip messages to request and receive data segments. + For each segment not already received, it simulates requesting the segment, + logs the request and receipt, and updates the segment status and relevant counters. + """ + for sender, have_infos in self.received_gossip.items(): + for have_info in have_infos: + for rowID in have_info['rowIDs']: + for columnID in have_info['columnIDs']: + if not self.receivedBlock.getSegment(rowID, columnID): + # request for the segment + self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) + self.msgSentCount += 1 + # source sends the segment + self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) + simulator.validators[have_info['source']].sampleSentCount += 1 + simulator.validators[have_info['source']].statsTxInSlot += 1 + # receive the segment + self.receivedBlock.setSegment(rowID, columnID) + self.sampleRecvCount += 1 + self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) + self.received_gossip.clear() - - def send(self, gossipsub, rows, cols): + def gossip(self, simulator): + """ + Periodically sends gossip messages to a random subset of neighbors to share information + about data segments (row and column IDs). The process involves: + 1. Selecting a random subset of row and column neighbors. + 2. Sending the node's current state (row and column IDs) to these neighbors. + 3. Neighbors process the received gossip and update their state accordingly. + + This ensures data dissemination across the network with minimal delay, + occurring at intervals defined by the HEARTBEAT timer. + """ + if self.rowIDs: + rID = random.choice(list(self.rowIDs)) + rowNeighs = list(self.rowNeighbors[rID].values()) + num_row_peers = random.randint(1, len(rowNeighs)) + selected_row_neighs = random.sample(rowNeighs, num_row_peers) + for rowNeigh in selected_row_neighs: + self.sendGossip(rowNeigh) + self.msgSentCount += 1 + rowNeigh.node.process_received_gossip(simulator) + if self.statsTxInSlot >= self.bwUplink: + return + + if self.columnIDs: + cID = random.choice(list(self.columnIDs)) + columnNeighs = list(self.columnNeighbors[cID].values()) + num_column_peers = random.randint(1, len(columnNeighs)) + selected_column_neighs = random.sample(columnNeighs, num_column_peers) + for columnNeigh in selected_column_neighs: + self.sendGossip(columnNeigh) + self.msgSentCount += 1 + columnNeigh.node.process_received_gossip(simulator) + if self.statsTxInSlot >= self.bwUplink: + return + + def send(self): """ Send as much as we can in the timestep, limited by bwUplink.""" - if gossipsub: - if not self.amImalicious: - self.gossipSub(rows, cols) - if self.statsTxInSlot >= self.bwUplink: - return + # process node level send queue + if not self.amImalicious: + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - else: - # process node level send queue - if not self.amImalicious: - self.processSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + # process neighbor level send queues in shuffled breadth-first order + if not self.amImalicious: + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return - # process neighbor level send queues in shuffled breadth-first order - if not self.amImalicious: - self.processPerNeighborSendQueue() - if self.statsTxInSlot >= self.bwUplink: - return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler and not self.amImalicious: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return - # process possible segments to send in shuffled breadth-first order - if self.segmentShuffleScheduler and not self.amImalicious: - self.runSegmentShuffleScheduler() - if self.statsTxInSlot >= self.bwUplink: - return - - if self.dumbRandomScheduler and not self.amImalicious: - self.runDumbRandomScheduler() - if self.statsTxInSlot >= self.bwUplink: - return + if self.dumbRandomScheduler and not self.amImalicious: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index a1b3cf4..06c5510 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -278,22 +278,16 @@ class Simulator: self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) oldMissingSamples = missingSamples - rows = {} - cols = {} - for i in range(0, self.shape.numberNodes): - if not self.validators[i].amIproposer and not self.validators[i].amImalicious: - for id in self.validators[i].columnIDs: - if id not in cols: - cols[id] = [] - cols[id].append(self.validators[i]) - for id in self.validators[i].rowIDs: - if id not in rows: - rows[id] = [] - rows[id].append(self.validators[i]) + self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: - self.validators[i].send(self.config.gossipsub, rows, cols) + self.validators[i].send() + if steps % self.config.heartbeat == 0 and self.config.gossip: + self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): + if not self.validators[i].amImalicious: + self.validators[i].gossip(self) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/smallConf.py b/smallConf.py index c2f2d9a..7172013 100644 --- a/smallConf.py +++ b/smallConf.py @@ -60,7 +60,10 @@ maliciousNodes = range(40,41,20) randomizeMaliciousNodes = True # When set to True, nodes will use the Gossipsub protocol for communication -gossipsub = False +gossip = True + +# Heartbeat interval for gossip messages in simulation steps +heartbeat = 10 # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2)