add shuffleLines and shuffleNeighbors params

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-02-15 03:25:52 +01:00
parent d0641e4568
commit 5383c59f6f
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 24 additions and 13 deletions

View File

@ -11,15 +11,24 @@ from collections import deque
def shuffled(lis):
''' Generator yielding list in shuffled order
'''
# based on https://stackoverflow.com/a/60342323
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
def shuffledDict(d):
lis = list(d.values())
# based on https://stackoverflow.com/a/60342323
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
def shuffledDict(d, shuffle=True):
''' Generator yielding dictionary in shuffled order
Shuffle, except if not (optional parameter useful for experiment setup)
'''
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
""" sample up to 'limit' bits from a bitarray
@ -124,6 +133,8 @@ class Validator:
self.repairOnTheFly = True
self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
self.dumbRandomScheduler = False # dumb random scheduler
self.sched = self.nextToSend()
@ -264,7 +275,7 @@ class Validator:
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in shuffledDict(self.columnNeighbors[columnID]):
for _, n in shuffledDict(self.columnNeighbors[columnID]):
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
@ -276,7 +287,7 @@ class Validator:
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in shuffledDict(self.rowNeighbors[rowID]):
for _, n in shuffledDict(self.rowNeighbors[rowID]):
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
@ -341,14 +352,14 @@ class Validator:
(rID, cID) = self.sendQueue[0]
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.sendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.sendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
@ -359,16 +370,16 @@ class Validator:
progress = True
while (progress):
progress = False
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines):
for _, neigh in shuffledDict(neighs, self.shuffleNeighbors):
if (neigh.sendQueue):
self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines):
for _, neigh in shuffledDict(neighs, self.shuffleNeighbors):
if (neigh.sendQueue):
self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh)
progress = True