diff --git a/DAS/validator.py b/DAS/validator.py index b16701e..b15d275 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -277,10 +277,8 @@ class Validator: reality of libp2p implementations where topics between two nodes are multiplexed over the same transport. """ - progress = True - while (progress): - progress = False + def activeSendQueues(): queues = [] # collect and shuffle for rID, neighs in self.rowNeighbors.items(): @@ -293,14 +291,27 @@ class Validator: if (neigh.sendQueue): queues.append((1, cID, neigh)) - for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): - if dim == 0: - self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) - else: - self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) - progress = True - if self.statsTxInSlot >= self.bwUplink: - return + return queues + + progress = True + while (progress): + + if hasattr(self, 'activeSendQueues'): + progress = False + for dim, lineID, neigh in self.activeSendQueues: + if dim == 0: + self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + self.activeSendQueues = activeSendQueues() + if self.activeSendQueues: + self.activeSendQueues = shuffled(activeSendQueues(), self.shuffleQueues) + else: + return def runSegmentShuffleScheduler(self): """ Schedule chunks for sending.