mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-03 13:43:11 +00:00
add more function docustrings
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
66a9d66dc6
commit
daee84b9ea
@ -10,7 +10,12 @@ from collections import deque
|
||||
from itertools import chain
|
||||
|
||||
class Neighbor:
|
||||
"""This class implements a node neighbor to monitor sent and received data."""
|
||||
"""This class implements a node neighbor to monitor sent and received data.
|
||||
|
||||
It represents one side of a P2P link in the overlay. Sent and received
|
||||
segments are monitored to avoid sending twice or sending back what was
|
||||
received from a link.
|
||||
"""
|
||||
|
||||
def __repr__(self):
|
||||
"""It returns the amount of sent and received data."""
|
||||
@ -132,6 +137,7 @@ class Validator:
|
||||
return self.block.getRow(index)
|
||||
|
||||
def receiveSegment(self, rID, cID, src):
|
||||
"""Receive a segment, register it, and queue for forwarding as needed"""
|
||||
# register receive so that we are not sending back
|
||||
if rID in self.rowIDs:
|
||||
if src in self.rowNeighbors[rID]:
|
||||
@ -150,6 +156,7 @@ class Validator:
|
||||
self.statsRxInSlot += 1
|
||||
|
||||
def addToSendQueue(self, rID, cID):
|
||||
"""Queue a segment for forwarding"""
|
||||
if self.perNodeQueue:
|
||||
self.sendQueue.append((rID, cID))
|
||||
|
||||
@ -163,7 +170,7 @@ class Validator:
|
||||
neigh.sendQueue.append(rID)
|
||||
|
||||
def receiveRowsColumns(self):
|
||||
"""It receives rows and columns."""
|
||||
"""Finalize time step by merging newly received segments in state"""
|
||||
if self.amIproposer == 1:
|
||||
self.logger.error("I am a block proposer", extra=self.format)
|
||||
else:
|
||||
@ -192,6 +199,7 @@ class Validator:
|
||||
self.statsTxInSlot = 0
|
||||
|
||||
def checkSegmentToNeigh(self, rID, cID, neigh):
|
||||
"""Check if a segment should be sent to a neighbor"""
|
||||
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
|
||||
return False # sent enough, other side can restore
|
||||
i = rID if neigh.dim else cID
|
||||
@ -201,6 +209,7 @@ class Validator:
|
||||
return False # received or already sent
|
||||
|
||||
def sendSegmentToNeigh(self, rID, cID, neigh):
|
||||
"""Send segment to a neighbor (without checks)"""
|
||||
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
|
||||
i = rID if neigh.dim else cID
|
||||
neigh.sent[i] = 1
|
||||
@ -208,6 +217,7 @@ class Validator:
|
||||
self.statsTxInSlot += 1
|
||||
|
||||
def checkSendSegmentToNeigh(self, rID, cID, neigh):
|
||||
"""Check and send a segment to a neighbor if needed"""
|
||||
if self.checkSegmentToNeigh(rID, cID, neigh):
|
||||
self.sendSegmentToNeigh(rID, cID, neigh)
|
||||
return True
|
||||
@ -215,6 +225,11 @@ class Validator:
|
||||
return False
|
||||
|
||||
def processSendQueue(self):
|
||||
"""Send out segments from queue until bandwidth limit reached
|
||||
|
||||
SendQueue is a centralized queue from which segments are sent out
|
||||
in FIFO order to all interested neighbors.
|
||||
"""
|
||||
while self.sendQueue:
|
||||
(rID, cID) = self.sendQueue[0]
|
||||
|
||||
@ -235,6 +250,15 @@ class Validator:
|
||||
self.sendQueue.popleft()
|
||||
|
||||
def processPerNeighborSendQueue(self):
|
||||
"""Send out segments from per-neighbor queues until bandwidth limit reached
|
||||
|
||||
Segments are dispatched from per-neighbor transmission queues in a shuffled
|
||||
round-robin order, emulating a type of fair queuing. Since neighborhood is
|
||||
handled at the topic (column or row) level, fair queuing is also at the level
|
||||
of flows per topic and per peer. A per-peer model might be closer to the
|
||||
reality of libp2p implementations where topics between two nodes are
|
||||
multiplexed over the same transport.
|
||||
"""
|
||||
progress = True
|
||||
while (progress):
|
||||
progress = False
|
||||
@ -261,9 +285,12 @@ class Validator:
|
||||
return
|
||||
|
||||
def runSegmentShuffleScheduler(self):
|
||||
# This scheduler check which owned segments needs sending (at least
|
||||
# one neighbor needing it). Then it sends each segment that's worth sending
|
||||
# once, in shuffled order. This is repeated until bw limit.
|
||||
""" Schedule chunks for sending
|
||||
|
||||
This scheduler check which owned segments needs sending (at least
|
||||
one neighbor needing it). Then it sends each segment that's worth sending
|
||||
once, in shuffled order. This is repeated until bw limit.
|
||||
"""
|
||||
|
||||
def collectSegmentsToSend():
|
||||
# yields list of segments to send as (dim, lineID, id)
|
||||
@ -332,7 +359,13 @@ class Validator:
|
||||
return
|
||||
|
||||
def runDumbRandomScheduler(self, tries = 100):
|
||||
# dumb random scheduler picking segments at random and trying to send it
|
||||
"""Random scheduler picking segments at random
|
||||
|
||||
This scheduler implements a simple random scheduling order picking
|
||||
segments at random and peers potentially interested in that segment
|
||||
also at random.
|
||||
It server more as a performance baseline than as a realistic model.
|
||||
"""
|
||||
|
||||
def nextSegment():
|
||||
t = tries
|
||||
@ -363,7 +396,7 @@ class Validator:
|
||||
return
|
||||
|
||||
def send(self):
|
||||
""" Send as much as we can in the timeslot, limited by bwUplink
|
||||
""" Send as much as we can in the timestep, limited by bwUplink
|
||||
"""
|
||||
|
||||
# process node level send queue
|
||||
@ -406,6 +439,7 @@ class Validator:
|
||||
self.restoreRow(id)
|
||||
|
||||
def restoreRow(self, id):
|
||||
"""Restore a given row if repairable."""
|
||||
rep = self.block.repairRow(id)
|
||||
if (rep.any()):
|
||||
# If operation is based on send queues, segments should
|
||||
@ -423,6 +457,7 @@ class Validator:
|
||||
self.restoreColumn(id)
|
||||
|
||||
def restoreColumn(self, id):
|
||||
"""Restore a given column if repairable."""
|
||||
rep = self.block.repairColumn(id)
|
||||
if (rep.any()):
|
||||
# If operation is based on send queues, segments should
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user