mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-07 15:43:08 +00:00
add node level send queue
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
382954de02
commit
0f4883bf26
@ -7,6 +7,8 @@ import sys
|
|||||||
from DAS.block import *
|
from DAS.block import *
|
||||||
from bitarray import bitarray
|
from bitarray import bitarray
|
||||||
from bitarray.util import zeros
|
from bitarray.util import zeros
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
def shuffled(lis):
|
def shuffled(lis):
|
||||||
# based on https://stackoverflow.com/a/60342323
|
# based on https://stackoverflow.com/a/60342323
|
||||||
@ -78,6 +80,8 @@ class Validator:
|
|||||||
self.format = {"entity": "Val "+str(self.ID)}
|
self.format = {"entity": "Val "+str(self.ID)}
|
||||||
self.block = Block(self.shape.blockSize)
|
self.block = Block(self.shape.blockSize)
|
||||||
self.receivedBlock = Block(self.shape.blockSize)
|
self.receivedBlock = Block(self.shape.blockSize)
|
||||||
|
self.receivedQueue = []
|
||||||
|
self.sendQueue = deque()
|
||||||
self.amIproposer = amIproposer
|
self.amIproposer = amIproposer
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
if self.shape.chi < 1:
|
if self.shape.chi < 1:
|
||||||
@ -187,6 +191,7 @@ class Validator:
|
|||||||
self.columnNeighbors[cID][src].receiving[rID] = 1
|
self.columnNeighbors[cID][src].receiving[rID] = 1
|
||||||
if not self.receivedBlock.getSegment(rID, cID):
|
if not self.receivedBlock.getSegment(rID, cID):
|
||||||
self.receivedBlock.setSegment(rID, cID)
|
self.receivedBlock.setSegment(rID, cID)
|
||||||
|
self.receivedQueue.append((rID, cID))
|
||||||
# else:
|
# else:
|
||||||
# self.statsRxDuplicateInSlot += 1
|
# self.statsRxDuplicateInSlot += 1
|
||||||
self.statsRxInSlot += 1
|
self.statsRxInSlot += 1
|
||||||
@ -212,6 +217,10 @@ class Validator:
|
|||||||
neigh.received |= neigh.receiving
|
neigh.received |= neigh.receiving
|
||||||
neigh.receiving.setall(0)
|
neigh.receiving.setall(0)
|
||||||
|
|
||||||
|
# add newly received segments to the send queue
|
||||||
|
self.sendQueue.extend(self.receivedQueue)
|
||||||
|
self.receivedQueue.clear()
|
||||||
|
|
||||||
def updateStats(self):
|
def updateStats(self):
|
||||||
"""It updates the stats related to sent and received data."""
|
"""It updates the stats related to sent and received data."""
|
||||||
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
|
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
|
||||||
@ -293,6 +302,25 @@ class Validator:
|
|||||||
""" Send as much as we can in the timeslot, limited by bwUplink
|
""" Send as much as we can in the timeslot, limited by bwUplink
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
while self.sendQueue:
|
||||||
|
(rID, cID) = self.sendQueue[0]
|
||||||
|
|
||||||
|
if rID in self.rowIDs:
|
||||||
|
for neigh in self.rowNeighbors[rID].values():
|
||||||
|
self.sendSegmentToNeigh(rID, cID, neigh)
|
||||||
|
|
||||||
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
|
return
|
||||||
|
|
||||||
|
if cID in self.columnIDs:
|
||||||
|
for neigh in self.columnNeighbors[cID].values():
|
||||||
|
self.sendSegmentToNeigh(rID, cID, neigh)
|
||||||
|
|
||||||
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.sendQueue.popleft()
|
||||||
|
|
||||||
for n in self.sched:
|
for n in self.sched:
|
||||||
neigh = n.neigh
|
neigh = n.neigh
|
||||||
toSend = n.toSend
|
toSend = n.toSend
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user