This commit is contained in:
Arunima Chaudhuri 2024-05-29 11:48:09 +00:00
parent 45f773b184
commit d9f29dc5f2
3 changed files with 97 additions and 70 deletions

View File

@ -3,6 +3,8 @@
import random import random
import collections import collections
import logging import logging
from collections import defaultdict
import threading
from DAS.block import * from DAS.block import *
from DAS.tools import shuffled, shuffledDict, unionOfSamples from DAS.tools import shuffled, shuffledDict, unionOfSamples
from bitarray.util import zeros from bitarray.util import zeros
@ -76,6 +78,7 @@ class Node:
self.repairedSampleCount = 0 self.repairedSampleCount = 0
self.logger = logger self.logger = logger
self.validators = validators self.validators = validators
self.received_gossip = defaultdict(list)
if amIproposer: if amIproposer:
self.nodeClass = 0 self.nodeClass = 0
@ -246,14 +249,6 @@ class Node:
self.statsRxDupInSlot += 1 self.statsRxDupInSlot += 1
self.statsRxInSlot += 1 self.statsRxInSlot += 1
def receiveSegmentViaGossip(self, rID, cID, source):
"""Receive a segment via gossipsub protocol from a specific source."""
if not self.amImalicious:
self.logger.trace("Recv via gossipsub %d-> %d: %d,%d", source, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
self.sampleRecvCount += 1
self.statsRxInSlot += 1
def addToSendQueue(self, rID, cID): def addToSendQueue(self, rID, cID):
"""Queue a segment for forwarding.""" """Queue a segment for forwarding."""
if self.perNodeQueue and not self.amImalicious: if self.perNodeQueue and not self.amImalicious:
@ -512,62 +507,97 @@ class Node:
if self.statsTxInSlot >= self.bwUplink: if self.statsTxInSlot >= self.bwUplink:
return return
def gossipSub(self, rows, cols): def sendGossip(self, neigh):
""" This function facilitates the Gossipsub protocol for segment distribution among nodes. """Simulate sending row and column IDs to a peer."""
It ensures that each node receives any missing segments by checking other nodes in the network. have_info = {'source': self.ID, 'rowIDs': self.rowIDs, 'columnIDs': self.columnIDs}
neigh.node.received_gossip[self.ID].append(have_info)
neigh.node.msgRecvCount += 1
self.logger.debug(f"Gossip sent to {neigh.node.ID}: {neigh.node.received_gossip}", extra=self.format)
Args: def process_received_gossip(self, simulator):
rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows.
cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns.
Description:
- The function iterates through all row IDs and column IDs.
- For each segment identified by a row ID (rID) and a column ID (cID):
- It checks if the current node (self) already has the segment.
- If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method.
""" """
for rID, rSources in rows.items(): Processes received gossip messages to request and receive data segments.
for cID, cSources in cols.items(): For each segment not already received, it simulates requesting the segment,
if not self.receivedBlock.getSegment(rID, cID): logs the request and receipt, and updates the segment status and relevant counters.
sources = list(set(rSources).intersection(cSources)) """
if sources: for sender, have_infos in self.received_gossip.items():
source = sources[0] # Pick the first source from the intersection for have_info in have_infos:
self.receiveSegmentViaGossip(rID, cID, source) for rowID in have_info['rowIDs']:
self.statsTxInSlot += 1 # request sent to receive segment via gossip for columnID in have_info['columnIDs']:
if not self.receivedBlock.getSegment(rowID, columnID):
# request for the segment
self.logger.debug(f"Requesting segment ({rowID}, {columnID}) from {have_info['source']}", extra=self.format)
self.msgSentCount += 1
# source sends the segment
self.logger.debug(f"Sending segment ({rowID}, {columnID}) to {self.ID} from {have_info['source']}", extra=self.format)
simulator.validators[have_info['source']].sampleSentCount += 1
simulator.validators[have_info['source']].statsTxInSlot += 1
# receive the segment
self.receivedBlock.setSegment(rowID, columnID)
self.sampleRecvCount += 1
self.logger.debug(f"Received segment ({rowID}, {columnID}) via gossip from {have_info['source']}", extra=self.format)
self.received_gossip.clear()
def gossip(self, simulator):
def send(self, gossipsub, rows, cols): """
Periodically sends gossip messages to a random subset of neighbors to share information
about data segments (row and column IDs). The process involves:
1. Selecting a random subset of row and column neighbors.
2. Sending the node's current state (row and column IDs) to these neighbors.
3. Neighbors process the received gossip and update their state accordingly.
This ensures data dissemination across the network with minimal delay,
occurring at intervals defined by the HEARTBEAT timer.
"""
if self.rowIDs:
rID = random.choice(list(self.rowIDs))
rowNeighs = list(self.rowNeighbors[rID].values())
num_row_peers = random.randint(1, len(rowNeighs))
selected_row_neighs = random.sample(rowNeighs, num_row_peers)
for rowNeigh in selected_row_neighs:
self.sendGossip(rowNeigh)
self.msgSentCount += 1
rowNeigh.node.process_received_gossip(simulator)
if self.statsTxInSlot >= self.bwUplink:
return
if self.columnIDs:
cID = random.choice(list(self.columnIDs))
columnNeighs = list(self.columnNeighbors[cID].values())
num_column_peers = random.randint(1, len(columnNeighs))
selected_column_neighs = random.sample(columnNeighs, num_column_peers)
for columnNeigh in selected_column_neighs:
self.sendGossip(columnNeigh)
self.msgSentCount += 1
columnNeigh.node.process_received_gossip(simulator)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink.""" """ Send as much as we can in the timestep, limited by bwUplink."""
if gossipsub: # process node level send queue
if not self.amImalicious: if not self.amImalicious:
self.gossipSub(rows, cols) self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink: if self.statsTxInSlot >= self.bwUplink:
return return
else: # process neighbor level send queues in shuffled breadth-first order
# process node level send queue if not self.amImalicious:
if not self.amImalicious: self.processPerNeighborSendQueue()
self.processSendQueue() if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= self.bwUplink: return
return
# process neighbor level send queues in shuffled breadth-first order # process possible segments to send in shuffled breadth-first order
if not self.amImalicious: if self.segmentShuffleScheduler and not self.amImalicious:
self.processPerNeighborSendQueue() self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink: if self.statsTxInSlot >= self.bwUplink:
return return
# process possible segments to send in shuffled breadth-first order if self.dumbRandomScheduler and not self.amImalicious:
if self.segmentShuffleScheduler and not self.amImalicious: self.runDumbRandomScheduler()
self.runSegmentShuffleScheduler() if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= self.bwUplink: return
return
if self.dumbRandomScheduler and not self.amImalicious:
self.runDumbRandomScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
def logRows(self): def logRows(self):
"""It logs the rows assigned to the validator.""" """It logs the rows assigned to the validator."""

View File

@ -278,22 +278,16 @@ class Simulator:
self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
oldMissingSamples = missingSamples oldMissingSamples = missingSamples
rows = {}
cols = {}
for i in range(0, self.shape.numberNodes):
if not self.validators[i].amIproposer and not self.validators[i].amImalicious:
for id in self.validators[i].columnIDs:
if id not in cols:
cols[id] = []
cols[id].append(self.validators[i])
for id in self.validators[i].rowIDs:
if id not in rows:
rows[id] = []
rows[id].append(self.validators[i])
self.logger.debug("PHASE SEND %d" % steps, extra=self.format) self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes): for i in range(0,self.shape.numberNodes):
if not self.validators[i].amImalicious: if not self.validators[i].amImalicious:
self.validators[i].send(self.config.gossipsub, rows, cols) self.validators[i].send()
if steps % self.config.heartbeat == 0 and self.config.gossip:
self.logger.debug("PHASE GOSSIP %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].gossip(self)
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes): for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns() self.validators[i].receiveRowsColumns()

View File

@ -60,7 +60,10 @@ maliciousNodes = range(40,41,20)
randomizeMaliciousNodes = True randomizeMaliciousNodes = True
# When set to True, nodes will use the Gossipsub protocol for communication # When set to True, nodes will use the Gossipsub protocol for communication
gossipsub = False gossip = True
# Heartbeat interval for gossip messages in simulation steps
heartbeat = 10
# Per-topic mesh neighborhood size # Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2) netDegrees = range(8, 9, 2)