improve perNeighborSendQueue

- improve shuffling between rows and columns
- speed up code execution

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-02-24 12:10:34 +01:00
parent d9a2d5d606
commit f95a393068
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 20 additions and 13 deletions

View File

@ -142,6 +142,7 @@ class Validator:
self.repairOnTheFly = True
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
@ -294,21 +295,27 @@ class Validator:
progress = True
while (progress):
progress = False
for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines):
for _, neigh in shuffledDict(neighs, self.shuffleNeighbors):
if (neigh.sendQueue):
self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines):
for _, neigh in shuffledDict(neighs, self.shuffleNeighbors):
queues = []
# collect and shuffle
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
for dim, lineID, neigh in shuffled(queues, self.shuffleQueues):
if dim == 0:
self.sendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.sendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
def runSegmentShuffleScheduler(self):
# This scheduler check which owned segments needs sending (at least