diff --git a/DAS/validator.py b/DAS/validator.py index 5cec9e3..a887bb5 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,8 @@ import sys from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +from collections import deque + def shuffled(lis): # based on https://stackoverflow.com/a/60342323 @@ -78,6 +80,8 @@ class Validator: self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) + self.receivedQueue = [] + self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: @@ -187,6 +191,7 @@ class Validator: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): self.receivedBlock.setSegment(rID, cID) + self.receivedQueue.append((rID, cID)) # else: # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 @@ -212,6 +217,10 @@ class Validator: neigh.received |= neigh.receiving neigh.receiving.setall(0) + # add newly received segments to the send queue + self.sendQueue.extend(self.receivedQueue) + self.receivedQueue.clear() + def updateStats(self): """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) @@ -293,6 +302,25 @@ class Validator: """ Send as much as we can in the timeslot, limited by bwUplink """ + while self.sendQueue: + (rID, cID) = self.sendQueue[0] + + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + self.sendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + self.sendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + self.sendQueue.popleft() + for n in self.sched: neigh = n.neigh toSend = n.toSend