diff --git a/DAS/validator.py b/DAS/validator.py index 560d0d1..33b9cf9 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,12 +61,6 @@ def sampleLine(line, limit): limit -= 1 return r -class SegmentToSend: - def __init__(self, dim, id, i): - self.dim = dim - self.id = id - self.i = i - class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -319,35 +313,13 @@ class Validator: return def runSegmentShuffleScheduler(self): - # This scheduler check which owned segments needs sending (at least - # one neighbor needing it). Then it sends each segment that's worth sending - # once, in shuffled order. This is repeated until bw limit. - while True: - if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: - #self.logger.debug("TX:%d queue:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) - for s in self.segmentShuffleGen: - self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) - if s.dim == 0: - for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.checkSendSegmentToNeigh(s.id, s.i, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break - else: - for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.checkSendSegmentToNeigh(s.i, s.id, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break + # This scheduler check which owned segments needs sending (at least + # one neighbor needing it). Then it sends each segment that's worth sending + # once, in shuffled order. This is repeated until bw limit. - if self.statsTxInSlot >= self.bwUplink: - if not self.segmentShuffleSchedulerPersist: - # remove scheduler state before leaving - self.segmentsToSend = [] - self.segmentShuffleGen = None - return - - self.segmentsToSend = [] + def collectSegmentsToSend(): + # yields list of segments to send as (dim, lineID, id) + segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) needed = zeros(self.shape.blockSize) @@ -359,7 +331,7 @@ class Validator: if (needed).any(): for i in range(len(needed)): if needed[i]: - self.segmentsToSend.append(SegmentToSend(0, rID, i)) + segmentsToSend.append((0, rID, i)) for cID, neighs in self.columnNeighbors.items(): line = self.getColumn(cID) @@ -372,12 +344,44 @@ class Validator: if (needed).any(): for i in range(len(needed)): if needed[i]: - self.segmentsToSend.append(SegmentToSend(1, cID, i)) + segmentsToSend.append((1, cID, i)) - if not self.segmentsToSend: + return segmentsToSend + + def nextSegment(): + while True: + # send each collected segment once + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + for dim, lineID, id in self.segmentShuffleGen: + if dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(lineID, id, neigh): + yield((lineID, id, neigh)) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(id, lineID, neigh): + yield((id, lineID, neigh)) + break + + # collect segments for next round + segmentsToSend = collectSegmentsToSend() + + # finish if empty or set up shuffled generator based on collected segments + if not segmentsToSend: break else: - self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines) + self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines) + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentShuffleGen = None + return def runDumbRandomScheduler(self, tries = 100): # dumb random scheduler picking segments at random and trying to send it