Merge f37190e666458427096f3e764f0aaf1d981dfbc2 into f36c3c85ba31ed0fd27c3650794a511ad994a661

This commit is contained in:
Arunima Chaudhuri 2024-07-06 03:28:28 +05:30 committed by GitHub
commit ab69304187
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 72 additions and 0 deletions

View File

@ -3,6 +3,8 @@
import random
import collections
import logging
from collections import defaultdict
import threading
from DAS.block import *
from DAS.tools import shuffled, shuffledDict, unionOfSamples
from bitarray.util import zeros
@ -76,6 +78,7 @@ class Node:
self.repairedSampleCount = 0
self.logger = logger
self.validators = validators
self.received_gossip = defaultdict(list)
if amIproposer:
self.nodeClass = 0
@ -504,6 +507,63 @@ class Node:
if self.statsTxInSlot >= self.bwUplink:
return
def sendGossip(self, peer, segments_to_send):
"""Simulate sending row and column IDs to a peer."""
have_info = {'source': self.ID, 'segments': segments_to_send}
peer.received_gossip[self.ID].append(have_info)
peer.msgRecvCount += 1
self.logger.debug(f"Gossip sent to {peer.ID}: {peer.received_gossip}", extra=self.format)
def processReceivedGossip(self, simulator):
"""
Processes received gossip messages to request and receive data segments.
For each segment not already received, it simulates requesting the segment,
logs the request and receipt, and updates the segment status and relevant counters.
"""
for sender, have_infos in self.received_gossip.items():
for have_info in have_infos:
for rowID, columnID in have_info['segments']:
if not self.receivedBlock.getSegment(rowID, columnID) and (rowID in self.rowIDs or columnID in self.columnIDs):
# request for the segment
self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format)
self.msgSentCount += 1
# source sends the segment
self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format)
simulator.validators[have_info['source']].sampleSentCount += 1
simulator.validators[have_info['source']].statsTxInSlot += 1
# receive the segment
self.receivedBlock.setSegment(rowID, columnID)
self.sampleRecvCount += 1
self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format)
self.received_gossip.clear()
def gossip(self, simulator):
"""
Periodically sends gossip messages to a random subset of nodes to share information
about data segments. The process involves:
1. Selecting a random subset of nodes.
2. Sending the node's current state (row and column IDs) to these nodes.
3. Process the received gossip and update their state accordingly.
This ensures data dissemination across the network,
occurring at intervals defined by the HEARTBEAT timer.
"""
total_nodes = simulator.shape.numberNodes
num_peers = random.randint(1, total_nodes - 1)
peers = random.sample(range(1, total_nodes), num_peers)
segments_to_send = []
for rID in range(0, self.shape.nbRows):
for cID in range(0, self.shape.nbCols):
if self.block.getSegment(rID, cID):
segments_to_send.append((rID, cID))
if segments_to_send:
for peer in peers:
self.sendGossip(simulator.validators[peer], segments_to_send)
self.msgSentCount += 1
simulator.validators[peer].processReceivedGossip(simulator)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""

View File

@ -279,10 +279,16 @@ class Simulator:
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].send()
if steps % self.config.heartbeat == 0 and self.config.gossip:
self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].gossip(self)
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()

View File

@ -59,6 +59,12 @@ maliciousNodes = range(40,41,20)
# If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used
randomizeMaliciousNodes = True
# When set to True, nodes will use the Gossip for communication
gossip = True
# Heartbeat interval for gossip messages in simulation steps
heartbeat = 20
# Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2)