factorize segmentShuffleScheduler code

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-03-01 10:41:47 +01:00
parent 2bf85c41a2
commit 3095e440c6
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 42 additions and 38 deletions

View File

@ -61,12 +61,6 @@ def sampleLine(line, limit):
limit -= 1
return r
class SegmentToSend:
def __init__(self, dim, id, i):
self.dim = dim
self.id = id
self.i = i
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data."""
@ -319,35 +313,13 @@ class Validator:
return
def runSegmentShuffleScheduler(self):
# This scheduler check which owned segments needs sending (at least
# one neighbor needing it). Then it sends each segment that's worth sending
# once, in shuffled order. This is repeated until bw limit.
while True:
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
#self.logger.debug("TX:%d queue:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format)
for s in self.segmentShuffleGen:
self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format)
if s.dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors):
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
if self.checkSendSegmentToNeigh(s.id, s.i, neigh):
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors):
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
if self.checkSendSegmentToNeigh(s.i, s.id, neigh):
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
break
# This scheduler check which owned segments needs sending (at least
# one neighbor needing it). Then it sends each segment that's worth sending
# once, in shuffled order. This is repeated until bw limit.
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentsToSend = []
self.segmentShuffleGen = None
return
self.segmentsToSend = []
def collectSegmentsToSend():
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = []
for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID)
needed = zeros(self.shape.blockSize)
@ -359,7 +331,7 @@ class Validator:
if (needed).any():
for i in range(len(needed)):
if needed[i]:
self.segmentsToSend.append(SegmentToSend(0, rID, i))
segmentsToSend.append((0, rID, i))
for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID)
@ -372,12 +344,44 @@ class Validator:
if (needed).any():
for i in range(len(needed)):
if needed[i]:
self.segmentsToSend.append(SegmentToSend(1, cID, i))
segmentsToSend.append((1, cID, i))
if not self.segmentsToSend:
return segmentsToSend
def nextSegment():
while True:
# send each collected segment once
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
for dim, lineID, id in self.segmentShuffleGen:
if dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(lineID, id, neigh):
yield((lineID, id, neigh))
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(id, lineID, neigh):
yield((id, lineID, neigh))
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend()
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend:
break
else:
self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines)
self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines)
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
return
def runDumbRandomScheduler(self, tries = 100):
# dumb random scheduler picking segments at random and trying to send it