From 54d101e5b875e8337fd996b88cd1bb4369d4f7b0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:28:38 +0100 Subject: [PATCH] segmentShuffleScheduler: persist scheduler state between timesteps Signed-off-by: Csaba Kiraly --- DAS/validator.py | 52 ++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f9571da..725bc98 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -143,6 +143,7 @@ class Validator: self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps self.sched = self.nextToSend() def logIDs(self): @@ -403,7 +404,35 @@ class Validator: # process possible segments to send in shuffled breadth-first order if self.segmentShuffleScheduler: + # This scheduler check which owned segments needs sending (at least + # one neighbor needing it). Then it sends each segment that's worth sending + # once, in shuffled order. This is repeated until bw limit. while True: + if hasattr(self, 'segmentsToSend') and self.segmentsToSend: + self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in shuffled(self.segmentsToSend): + self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) + if s.dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.id, s.i, neigh) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.i, s.id, neigh) + break + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentsToSend = [] + return + self.segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) @@ -427,28 +456,7 @@ class Validator: if needed[i]: self.segmentsToSend.append(SegmentToSend(1, cID, i)) - if self.segmentsToSend: - self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) - for s in shuffled(self.segmentsToSend): - self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) - if s.dim == 0: - for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.id, s.i, neigh) - break - else: - for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.i, s.id, neigh) - break - - if self.statsTxInSlot >= self.bwUplink: - return - else: + if not self.segmentsToSend: break if self.dumbRandomScheduler: