Merge remote-tracking branch 'origin/gossipsub' into gossip-node-type

This commit is contained in:
Sudipta Basak 2024-07-03 14:04:18 +00:00
commit a656f362d9
No known key found for this signature in database
3 changed files with 72 additions and 0 deletions

View File

@ -3,6 +3,8 @@
import random
import collections
import logging
from collections import defaultdict
import threading
from DAS.block import *
from DAS.tools import shuffled, shuffledDict, unionOfSamples
from bitarray.util import zeros
@ -78,6 +80,7 @@ class Node:
self.repairedSampleCount = 0
self.logger = logger
self.validators = validators
self.received_gossip = defaultdict(list)
if amIproposer:
self.nodeClass = 0
@ -504,6 +507,63 @@ class Node:
if self.statsTxInSlot >= self.bwUplink:
return
def sendGossip(self, peer, segments_to_send):
"""Simulate sending row and column IDs to a peer."""
have_info = {'source': self.ID, 'segments': segments_to_send}
peer.received_gossip[self.ID].append(have_info)
peer.msgRecvCount += 1
self.logger.debug(f"Gossip sent to {peer.ID}: {peer.received_gossip}", extra=self.format)
def processReceivedGossip(self, simulator):
"""
Processes received gossip messages to request and receive data segments.
For each segment not already received, it simulates requesting the segment,
logs the request and receipt, and updates the segment status and relevant counters.
"""
for sender, have_infos in self.received_gossip.items():
for have_info in have_infos:
for rowID, columnID in have_info['segments']:
if not self.receivedBlock.getSegment(rowID, columnID):
# request for the segment
self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format)
self.msgSentCount += 1
# source sends the segment
self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format)
simulator.validators[have_info['source']].sampleSentCount += 1
simulator.validators[have_info['source']].statsTxInSlot += 1
# receive the segment
self.receivedBlock.setSegment(rowID, columnID)
self.sampleRecvCount += 1
self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format)
self.received_gossip.clear()
def gossip(self, simulator):
"""
Periodically sends gossip messages to a random subset of nodes to share information
about data segments. The process involves:
1. Selecting a random subset of nodes.
2. Sending the node's current state (row and column IDs) to these nodes.
3. Process the received gossip and update their state accordingly.
This ensures data dissemination across the network,
occurring at intervals defined by the HEARTBEAT timer.
"""
total_nodes = simulator.shape.numberNodes
num_peers = random.randint(1, total_nodes - 1)
peers = random.sample(range(1, total_nodes), num_peers)
segments_to_send = []
for rID in range(0, self.shape.nbRows):
for cID in range(0, self.shape.nbCols):
if self.block.getSegment(rID, cID):
segments_to_send.append((rID, cID))
if segments_to_send:
for peer in peers:
self.sendGossip(simulator.validators[peer], segments_to_send)
self.msgSentCount += 1
simulator.validators[peer].processReceivedGossip(simulator)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""

View File

@ -288,10 +288,16 @@ class Simulator:
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].send()
if steps % self.config.heartbeat == 0 and self.config.gossip:
self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].gossip(self)
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()

View File

@ -59,6 +59,12 @@ maliciousNodes = range(40,41,20)
# If True, the malicious nodes will be assigned randomly; if False, a predefined pattern may be used
randomizeMaliciousNodes = True
# When set to True, nodes will use the Gossip for communication
gossip = True
# Heartbeat interval for gossip messages in simulation steps
heartbeat = 20
# Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2)