diff --git a/DAS/validator.py b/DAS/validator.py index 88e32d6..9c43967 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -69,6 +69,7 @@ class Neighbor: self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) + self.sendQueue = deque() class Validator: @@ -86,7 +87,7 @@ class Validator: self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) - self.receivedQueue = [] + self.receivedQueue = deque() self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger @@ -119,6 +120,7 @@ class Validator: # TODO: this should be a parameter self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps + self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.sched = self.nextToSend() @@ -233,6 +235,19 @@ class Validator: # add newly received segments to the send queue if self.perNeighborQueue: self.sendQueue.extend(self.receivedQueue) + + if self.perNodeQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) + self.receivedQueue.clear() def updateStats(self): @@ -335,6 +350,25 @@ class Validator: self.sendQueue.popleft() + progress = True + while (progress): + progress = False + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + tries = 100 while tries: if self.rowIDs: