From f95a393068f03fd27b117d830267091b01167c48 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 12:10:34 +0100 Subject: [PATCH] improve perNeighborSendQueue - improve shuffling between rows and columns - speed up code execution Signed-off-by: Csaba Kiraly --- DAS/validator.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 4d5aa27..6410422 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -142,6 +142,7 @@ class Validator: self.repairOnTheFly = True self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor @@ -294,21 +295,27 @@ class Validator: progress = True while (progress): progress = False - for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines): - for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): - if (neigh.sendQueue): - self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) - progress = True - if self.statsTxInSlot >= self.bwUplink: - return - for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines): - for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): + queues = [] + # collect and shuffle + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): if (neigh.sendQueue): - self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) - progress = True - if self.statsTxInSlot >= self.bwUplink: - return + queues.append((0, rID, neigh)) + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((1, cID, neigh)) + + for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): + if dim == 0: + self.sendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.sendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return def runSegmentShuffleScheduler(self): # This scheduler check which owned segments needs sending (at least