diff --git a/DAS/node.py b/DAS/node.py index 85aaf8a..007033b 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -3,6 +3,8 @@ import random import collections import logging +from collections import defaultdict +import threading from DAS.block import * from DAS.tools import shuffled, shuffledDict, unionOfSamples from bitarray.util import zeros @@ -76,6 +78,7 @@ class Node: self.repairedSampleCount = 0 self.logger = logger self.validators = validators + self.received_gossip = defaultdict(list) if amIproposer: self.nodeClass = 0 @@ -504,6 +507,63 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return + def sendGossip(self, peer, segments_to_send): + """Simulate sending row and column IDs to a peer.""" + have_info = {'source': self.ID, 'segments': segments_to_send} + peer.received_gossip[self.ID].append(have_info) + peer.msgRecvCount += 1 + self.logger.debug(f"Gossip sent to {peer.ID}: {peer.received_gossip}", extra=self.format) + + def processReceivedGossip(self, simulator): + """ + Processes received gossip messages to request and receive data segments. + For each segment not already received, it simulates requesting the segment, + logs the request and receipt, and updates the segment status and relevant counters. + """ + for sender, have_infos in self.received_gossip.items(): + for have_info in have_infos: + for rowID, columnID in have_info['segments']: + if not self.receivedBlock.getSegment(rowID, columnID) and (rowID in self.rowIDs or columnID in self.columnIDs): + # request for the segment + self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format) + self.msgSentCount += 1 + # source sends the segment + self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format) + simulator.validators[have_info['source']].sampleSentCount += 1 + simulator.validators[have_info['source']].statsTxInSlot += 1 + # receive the segment + self.receivedBlock.setSegment(rowID, columnID) + self.sampleRecvCount += 1 + self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format) + self.received_gossip.clear() + + def gossip(self, simulator): + """ + Periodically sends gossip messages to a random subset of nodes to share information + about data segments. The process involves: + 1. Selecting a random subset of nodes. + 2. Sending the node's current state (row and column IDs) to these nodes. + 3. Process the received gossip and update their state accordingly. + + This ensures data dissemination across the network, + occurring at intervals defined by the HEARTBEAT timer. + """ + total_nodes = simulator.shape.numberNodes + num_peers = random.randint(1, total_nodes - 1) + peers = random.sample(range(1, total_nodes), num_peers) + segments_to_send = [] + for rID in range(0, self.shape.nbRows): + for cID in range(0, self.shape.nbCols): + if self.block.getSegment(rID, cID): + segments_to_send.append((rID, cID)) + if segments_to_send: + for peer in peers: + self.sendGossip(simulator.validators[peer], segments_to_send) + self.msgSentCount += 1 + simulator.validators[peer].processReceivedGossip(simulator) + if self.statsTxInSlot >= self.bwUplink: + return + def send(self): """ Send as much as we can in the timestep, limited by bwUplink.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 3657b03..ad37959 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -279,10 +279,16 @@ class Simulator: self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) oldMissingSamples = missingSamples + self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): if not self.validators[i].amImalicious: self.validators[i].send() + if steps % self.config.heartbeat == 0 and self.config.gossip: + self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): + if not self.validators[i].amImalicious: + self.validators[i].gossip(self) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() diff --git a/smallConf.py b/smallConf.py index 18a9c17..74d5dd0 100644 --- a/smallConf.py +++ b/smallConf.py @@ -59,6 +59,12 @@ maliciousNodes = range(40,41,20) # If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used randomizeMaliciousNodes = True +# When set to True, nodes will use the Gossip for communication +gossip = True + +# Heartbeat interval for gossip messages in simulation steps +heartbeat = 20 + # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2)