diff --git a/DAS/block.py b/DAS/block.py index 10ff30d..f76a944 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -59,7 +59,7 @@ class Block: def repairRow(self, id): """It repairs the entire row if it has at least blockSize/2 ones. - Returns: list of repaired segments + Returns: list of repaired segments. """ line = self.data[id*self.blockSize:(id+1)*self.blockSize] success = line.count(1) diff --git a/DAS/tools.py b/DAS/tools.py index 6852a9d..cd26850 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -31,8 +31,7 @@ class CustomFormatter(): return formatter.format(record) def shuffled(lis, shuffle=True): - ''' Generator yielding list in shuffled order - ''' + """Generator yielding list in shuffled order.""" # based on https://stackoverflow.com/a/60342323 if shuffle: for index in random.sample(range(len(lis)), len(lis)): @@ -41,10 +40,10 @@ def shuffled(lis, shuffle=True): for v in lis: yield v def shuffledDict(d, shuffle=True): - ''' Generator yielding dictionary in shuffled order + """Generator yielding dictionary in shuffled order. - Shuffle, except if not (optional parameter useful for experiment setup) - ''' + Shuffle, except if not (optional parameter useful for experiment setup). + """ if shuffle: lis = list(d.items()) for index in random.sample(range(len(d)), len(d)): @@ -54,9 +53,9 @@ def shuffledDict(d, shuffle=True): yield kv def sampleLine(line, limit): - """ sample up to 'limit' bits from a bitarray + """Sample up to 'limit' bits from a bitarray. - Since this is quite expensive, we use a number of heuristics to get it fast. + Since this is quite expensive, we use a number of heuristics to get it fast. """ if limit == sys.maxsize : return line diff --git a/DAS/validator.py b/DAS/validator.py index 72f2610..7b52e58 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -14,7 +14,7 @@ class Neighbor: It represents one side of a P2P link in the overlay. Sent and received segments are monitored to avoid sending twice or sending back what was - received from a link. + received from a link. """ def __repr__(self): @@ -137,7 +137,7 @@ class Validator: return self.block.getRow(index) def receiveSegment(self, rID, cID, src): - """Receive a segment, register it, and queue for forwarding as needed""" + """Receive a segment, register it, and queue for forwarding as needed.""" # register receive so that we are not sending back if rID in self.rowIDs: if src in self.rowNeighbors[rID]: @@ -156,7 +156,7 @@ class Validator: self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): - """Queue a segment for forwarding""" + """Queue a segment for forwarding.""" if self.perNodeQueue: self.sendQueue.append((rID, cID)) @@ -170,7 +170,7 @@ class Validator: neigh.sendQueue.append(rID) def receiveRowsColumns(self): - """Finalize time step by merging newly received segments in state""" + """Finalize time step by merging newly received segments in state.""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: @@ -199,7 +199,7 @@ class Validator: self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): - """Check if a segment should be sent to a neighbor""" + """Check if a segment should be sent to a neighbor.""" if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore i = rID if neigh.dim else cID @@ -209,7 +209,7 @@ class Validator: return False # received or already sent def sendSegmentToNeigh(self, rID, cID, neigh): - """Send segment to a neighbor (without checks)""" + """Send segment to a neighbor (without checks).""" self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 @@ -217,7 +217,7 @@ class Validator: self.statsTxInSlot += 1 def checkSendSegmentToNeigh(self, rID, cID, neigh): - """Check and send a segment to a neighbor if needed""" + """Check and send a segment to a neighbor if needed.""" if self.checkSegmentToNeigh(rID, cID, neigh): self.sendSegmentToNeigh(rID, cID, neigh) return True @@ -225,10 +225,10 @@ class Validator: return False def processSendQueue(self): - """Send out segments from queue until bandwidth limit reached - + """Send out segments from queue until bandwidth limit reached. + SendQueue is a centralized queue from which segments are sent out - in FIFO order to all interested neighbors. + in FIFO order to all interested neighbors. """ while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -250,13 +250,13 @@ class Validator: self.sendQueue.popleft() def processPerNeighborSendQueue(self): - """Send out segments from per-neighbor queues until bandwidth limit reached - + """Send out segments from per-neighbor queues until bandwidth limit reached. + Segments are dispatched from per-neighbor transmission queues in a shuffled round-robin order, emulating a type of fair queuing. Since neighborhood is handled at the topic (column or row) level, fair queuing is also at the level of flows per topic and per peer. A per-peer model might be closer to the - reality of libp2p implementations where topics between two nodes are + reality of libp2p implementations where topics between two nodes are multiplexed over the same transport. """ progress = True @@ -285,8 +285,8 @@ class Validator: return def runSegmentShuffleScheduler(self): - """ Schedule chunks for sending - + """ Schedule chunks for sending. + This scheduler check which owned segments needs sending (at least one neighbor needing it). Then it sends each segment that's worth sending once, in shuffled order. This is repeated until bw limit. @@ -359,12 +359,12 @@ class Validator: return def runDumbRandomScheduler(self, tries = 100): - """Random scheduler picking segments at random - - This scheduler implements a simple random scheduling order picking + """Random scheduler picking segments at random. + + This scheduler implements a simple random scheduling order picking segments at random and peers potentially interested in that segment also at random. - It server more as a performance baseline than as a realistic model. + It serves more as a performance baseline than as a realistic model. """ def nextSegment(): @@ -396,8 +396,7 @@ class Validator: return def send(self): - """ Send as much as we can in the timestep, limited by bwUplink - """ + """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue self.processSendQueue()