mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-04 06:03:10 +00:00
Gossipsub implementation
Signed-off-by: Arunima Chaudhuri <arunimachaudhuri2020@gmail.com>
This commit is contained in:
parent
0b999dc7f8
commit
5718fe14b7
34
DAS/node.py
34
DAS/node.py
@ -246,6 +246,14 @@ class Node:
|
|||||||
self.statsRxDupInSlot += 1
|
self.statsRxDupInSlot += 1
|
||||||
self.statsRxInSlot += 1
|
self.statsRxInSlot += 1
|
||||||
|
|
||||||
|
def receiveSegmentViaGossip(self, rID, cID):
|
||||||
|
"""Receive a segment via gossipsub protocol."""
|
||||||
|
if not self.amImalicious:
|
||||||
|
self.logger.trace("Recv via gossipsub: %d: %d,%d", self.ID, rID, cID, extra=self.format)
|
||||||
|
self.receivedBlock.setSegment(rID, cID)
|
||||||
|
self.sampleRecvCount += 1
|
||||||
|
self.statsRxInSlot += 1
|
||||||
|
|
||||||
def addToSendQueue(self, rID, cID):
|
def addToSendQueue(self, rID, cID):
|
||||||
"""Queue a segment for forwarding."""
|
"""Queue a segment for forwarding."""
|
||||||
if self.perNodeQueue and not self.amImalicious:
|
if self.perNodeQueue and not self.amImalicious:
|
||||||
@ -504,7 +512,26 @@ class Node:
|
|||||||
if self.statsTxInSlot >= self.bwUplink:
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
return
|
return
|
||||||
|
|
||||||
def send(self):
|
def gossipSub(self, rows, cols):
|
||||||
|
""" This function facilitates the Gossipsub protocol for segment distribution among nodes.
|
||||||
|
It ensures that each node receives any missing segments by checking other nodes in the network.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rows (dict): A hash table where the keys are row IDs and the values are lists of nodes that contain these rows.
|
||||||
|
cols (dict): A hash table where the keys are column IDs and the values are lists of nodes that contain these columns.
|
||||||
|
|
||||||
|
Description:
|
||||||
|
- The function iterates through all row IDs and column IDs.
|
||||||
|
- For each segment identified by a row ID (rID) and a column ID (cID):
|
||||||
|
- It checks if the current node (self) already has the segment.
|
||||||
|
- If the segment is missing, it attempts to receive the segment from other nodes using the Gossipsub protocol via the receiveSegmentViaGossip method.
|
||||||
|
"""
|
||||||
|
for rID in rows:
|
||||||
|
for cID in cols:
|
||||||
|
if not self.receivedBlock.getSegment(rID, cID):
|
||||||
|
self.receiveSegmentViaGossip(rID, cID)
|
||||||
|
|
||||||
|
def send(self, rows, cols):
|
||||||
""" Send as much as we can in the timestep, limited by bwUplink."""
|
""" Send as much as we can in the timestep, limited by bwUplink."""
|
||||||
|
|
||||||
# process node level send queue
|
# process node level send queue
|
||||||
@ -529,6 +556,11 @@ class Node:
|
|||||||
self.runDumbRandomScheduler()
|
self.runDumbRandomScheduler()
|
||||||
if self.statsTxInSlot >= self.bwUplink:
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not self.amImalicious:
|
||||||
|
self.gossipSub(rows, cols)
|
||||||
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
|
return
|
||||||
|
|
||||||
def logRows(self):
|
def logRows(self):
|
||||||
"""It logs the rows assigned to the validator."""
|
"""It logs the rows assigned to the validator."""
|
||||||
|
|||||||
@ -278,10 +278,22 @@ class Simulator:
|
|||||||
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
|
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
|
||||||
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
|
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
|
||||||
oldMissingSamples = missingSamples
|
oldMissingSamples = missingSamples
|
||||||
|
rows = {}
|
||||||
|
cols = {}
|
||||||
|
for i in range(0, self.shape.numberNodes):
|
||||||
|
if not self.validators[i].amIproposer and not self.validators[i].amImalicious:
|
||||||
|
for id in self.validators[i].columnIDs:
|
||||||
|
if id not in cols:
|
||||||
|
cols[id] = []
|
||||||
|
cols[id].append(self.validators[i])
|
||||||
|
for id in self.validators[i].rowIDs:
|
||||||
|
if id not in rows:
|
||||||
|
rows[id] = []
|
||||||
|
rows[id].append(self.validators[i])
|
||||||
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
|
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
|
||||||
for i in range(0,self.shape.numberNodes):
|
for i in range(0,self.shape.numberNodes):
|
||||||
if not self.validators[i].amImalicious:
|
if not self.validators[i].amImalicious:
|
||||||
self.validators[i].send()
|
self.validators[i].send(rows, cols)
|
||||||
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
|
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
|
||||||
for i in range(1,self.shape.numberNodes):
|
for i in range(1,self.shape.numberNodes):
|
||||||
self.validators[i].receiveRowsColumns()
|
self.validators[i].receiveRowsColumns()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user