From daee84b9ea00c41d5c9e33097afd50b850028f3b Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 23:59:35 +0100 Subject: [PATCH] add more function docustrings Signed-off-by: Csaba Kiraly --- DAS/validator.py | 49 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5fa6aee..72f2610 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -10,7 +10,12 @@ from collections import deque from itertools import chain class Neighbor: - """This class implements a node neighbor to monitor sent and received data.""" + """This class implements a node neighbor to monitor sent and received data. + + It represents one side of a P2P link in the overlay. Sent and received + segments are monitored to avoid sending twice or sending back what was + received from a link. + """ def __repr__(self): """It returns the amount of sent and received data.""" @@ -132,6 +137,7 @@ class Validator: return self.block.getRow(index) def receiveSegment(self, rID, cID, src): + """Receive a segment, register it, and queue for forwarding as needed""" # register receive so that we are not sending back if rID in self.rowIDs: if src in self.rowNeighbors[rID]: @@ -150,6 +156,7 @@ class Validator: self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): + """Queue a segment for forwarding""" if self.perNodeQueue: self.sendQueue.append((rID, cID)) @@ -163,7 +170,7 @@ class Validator: neigh.sendQueue.append(rID) def receiveRowsColumns(self): - """It receives rows and columns.""" + """Finalize time step by merging newly received segments in state""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: @@ -192,6 +199,7 @@ class Validator: self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): + """Check if a segment should be sent to a neighbor""" if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore i = rID if neigh.dim else cID @@ -201,6 +209,7 @@ class Validator: return False # received or already sent def sendSegmentToNeigh(self, rID, cID, neigh): + """Send segment to a neighbor (without checks)""" self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 @@ -208,6 +217,7 @@ class Validator: self.statsTxInSlot += 1 def checkSendSegmentToNeigh(self, rID, cID, neigh): + """Check and send a segment to a neighbor if needed""" if self.checkSegmentToNeigh(rID, cID, neigh): self.sendSegmentToNeigh(rID, cID, neigh) return True @@ -215,6 +225,11 @@ class Validator: return False def processSendQueue(self): + """Send out segments from queue until bandwidth limit reached + + SendQueue is a centralized queue from which segments are sent out + in FIFO order to all interested neighbors. + """ while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -235,6 +250,15 @@ class Validator: self.sendQueue.popleft() def processPerNeighborSendQueue(self): + """Send out segments from per-neighbor queues until bandwidth limit reached + + Segments are dispatched from per-neighbor transmission queues in a shuffled + round-robin order, emulating a type of fair queuing. Since neighborhood is + handled at the topic (column or row) level, fair queuing is also at the level + of flows per topic and per peer. A per-peer model might be closer to the + reality of libp2p implementations where topics between two nodes are + multiplexed over the same transport. + """ progress = True while (progress): progress = False @@ -261,9 +285,12 @@ class Validator: return def runSegmentShuffleScheduler(self): - # This scheduler check which owned segments needs sending (at least - # one neighbor needing it). Then it sends each segment that's worth sending - # once, in shuffled order. This is repeated until bw limit. + """ Schedule chunks for sending + + This scheduler check which owned segments needs sending (at least + one neighbor needing it). Then it sends each segment that's worth sending + once, in shuffled order. This is repeated until bw limit. + """ def collectSegmentsToSend(): # yields list of segments to send as (dim, lineID, id) @@ -332,7 +359,13 @@ class Validator: return def runDumbRandomScheduler(self, tries = 100): - # dumb random scheduler picking segments at random and trying to send it + """Random scheduler picking segments at random + + This scheduler implements a simple random scheduling order picking + segments at random and peers potentially interested in that segment + also at random. + It server more as a performance baseline than as a realistic model. + """ def nextSegment(): t = tries @@ -363,7 +396,7 @@ class Validator: return def send(self): - """ Send as much as we can in the timeslot, limited by bwUplink + """ Send as much as we can in the timestep, limited by bwUplink """ # process node level send queue @@ -406,6 +439,7 @@ class Validator: self.restoreRow(id) def restoreRow(self, id): + """Restore a given row if repairable.""" rep = self.block.repairRow(id) if (rep.any()): # If operation is based on send queues, segments should @@ -423,6 +457,7 @@ class Validator: self.restoreColumn(id) def restoreColumn(self, id): + """Restore a given column if repairable.""" rep = self.block.repairColumn(id) if (rep.any()): # If operation is based on send queues, segments should