diff --git a/DAS/validator.py b/DAS/validator.py index 8923a9a..f9571da 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -65,6 +65,12 @@ class NextToSend: self.id = id self.dim = dim +class SegmentToSend: + def __init__(self, dim, id, i): + self.dim = dim + self.id = id + self.i = i + class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -136,6 +142,7 @@ class Validator: self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.sched = self.nextToSend() def logIDs(self): @@ -394,6 +401,56 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + while True: + self.segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(1, cID, i)) + + if self.segmentsToSend: + self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in shuffled(self.segmentsToSend): + self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) + if s.dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.id, s.i, neigh) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.i, s.id, neigh) + break + + if self.statsTxInSlot >= self.bwUplink: + return + else: + break + if self.dumbRandomScheduler: # dumb random scheduler picking segments at random and trying to send it tries = 100