mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-08 16:13:11 +00:00
add perNeighborQueue option
If enabled, queue incoming messages to outgoing connections on arrival, as typical in some GossipSub implementations. Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
7c0fcaba78
commit
23e40693f1
@ -69,6 +69,7 @@ class Neighbor:
|
|||||||
self.receiving = zeros(blockSize)
|
self.receiving = zeros(blockSize)
|
||||||
self.received = zeros(blockSize)
|
self.received = zeros(blockSize)
|
||||||
self.sent = zeros(blockSize)
|
self.sent = zeros(blockSize)
|
||||||
|
self.sendQueue = deque()
|
||||||
|
|
||||||
|
|
||||||
class Validator:
|
class Validator:
|
||||||
@ -86,7 +87,7 @@ class Validator:
|
|||||||
self.format = {"entity": "Val "+str(self.ID)}
|
self.format = {"entity": "Val "+str(self.ID)}
|
||||||
self.block = Block(self.shape.blockSize)
|
self.block = Block(self.shape.blockSize)
|
||||||
self.receivedBlock = Block(self.shape.blockSize)
|
self.receivedBlock = Block(self.shape.blockSize)
|
||||||
self.receivedQueue = []
|
self.receivedQueue = deque()
|
||||||
self.sendQueue = deque()
|
self.sendQueue = deque()
|
||||||
self.amIproposer = amIproposer
|
self.amIproposer = amIproposer
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
@ -119,6 +120,7 @@ class Validator:
|
|||||||
# TODO: this should be a parameter
|
# TODO: this should be a parameter
|
||||||
self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps
|
self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps
|
||||||
|
|
||||||
|
self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
|
||||||
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
|
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
|
||||||
self.sched = self.nextToSend()
|
self.sched = self.nextToSend()
|
||||||
|
|
||||||
@ -233,6 +235,19 @@ class Validator:
|
|||||||
# add newly received segments to the send queue
|
# add newly received segments to the send queue
|
||||||
if self.perNeighborQueue:
|
if self.perNeighborQueue:
|
||||||
self.sendQueue.extend(self.receivedQueue)
|
self.sendQueue.extend(self.receivedQueue)
|
||||||
|
|
||||||
|
if self.perNodeQueue:
|
||||||
|
while self.receivedQueue:
|
||||||
|
(rID, cID) = self.receivedQueue.popleft()
|
||||||
|
|
||||||
|
if rID in self.rowIDs:
|
||||||
|
for neigh in self.rowNeighbors[rID].values():
|
||||||
|
neigh.sendQueue.append(cID)
|
||||||
|
|
||||||
|
if cID in self.columnIDs:
|
||||||
|
for neigh in self.columnNeighbors[cID].values():
|
||||||
|
neigh.sendQueue.append(rID)
|
||||||
|
|
||||||
self.receivedQueue.clear()
|
self.receivedQueue.clear()
|
||||||
|
|
||||||
def updateStats(self):
|
def updateStats(self):
|
||||||
@ -335,6 +350,25 @@ class Validator:
|
|||||||
|
|
||||||
self.sendQueue.popleft()
|
self.sendQueue.popleft()
|
||||||
|
|
||||||
|
progress = True
|
||||||
|
while (progress):
|
||||||
|
progress = False
|
||||||
|
for rID, neighs in self.rowNeighbors.items():
|
||||||
|
for neigh in neighs.values():
|
||||||
|
if (neigh.sendQueue):
|
||||||
|
self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh)
|
||||||
|
progress = True
|
||||||
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
|
return
|
||||||
|
|
||||||
|
for cID, neighs in self.columnNeighbors.items():
|
||||||
|
for neigh in neighs.values():
|
||||||
|
if (neigh.sendQueue):
|
||||||
|
self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh)
|
||||||
|
progress = True
|
||||||
|
if self.statsTxInSlot >= self.bwUplink:
|
||||||
|
return
|
||||||
|
|
||||||
tries = 100
|
tries = 100
|
||||||
while tries:
|
while tries:
|
||||||
if self.rowIDs:
|
if self.rowIDs:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user