From cd2b69149bda81692ee37c056749b0ec6bb7af29 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:22:55 +0100 Subject: [PATCH] add segmentShuffleScheduler This scheduler check which owned segments needs sending (at least one neighbor needing it). Then it sends each segment that's worth sending once, in shuffled order. This is repeated until bw limit. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 8923a9a..f9571da 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -65,6 +65,12 @@ class NextToSend: self.id = id self.dim = dim +class SegmentToSend: + def __init__(self, dim, id, i): + self.dim = dim + self.id = id + self.i = i + class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -136,6 +142,7 @@ class Validator: self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.sched = self.nextToSend() def logIDs(self): @@ -394,6 +401,56 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + while True: + self.segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(1, cID, i)) + + if self.segmentsToSend: + self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in shuffled(self.segmentsToSend): + self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) + if s.dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.id, s.i, neigh) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.i, s.id, neigh) + break + + if self.statsTxInSlot >= self.bwUplink: + return + else: + break + if self.dumbRandomScheduler: # dumb random scheduler picking segments at random and trying to send it tries = 100