From 5383c59f6f8e55b172e23c477ce1a9cd7fdc7f63 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:25:52 +0100 Subject: [PATCH] add shuffleLines and shuffleNeighbors params Signed-off-by: Csaba Kiraly --- DAS/validator.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f447848..dcd6779 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -11,15 +11,24 @@ from collections import deque def shuffled(lis): + ''' Generator yielding list in shuffled order + ''' # based on https://stackoverflow.com/a/60342323 for index in random.sample(range(len(lis)), len(lis)): yield lis[index] -def shuffledDict(d): - lis = list(d.values()) - # based on https://stackoverflow.com/a/60342323 - for index in random.sample(range(len(d)), len(d)): - yield lis[index] +def shuffledDict(d, shuffle=True): + ''' Generator yielding dictionary in shuffled order + + Shuffle, except if not (optional parameter useful for experiment setup) + ''' + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv def sampleLine(line, limit): """ sample up to 'limit' bits from a bitarray @@ -124,6 +133,8 @@ class Validator: self.repairOnTheFly = True self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler self.sched = self.nextToSend() @@ -264,7 +275,7 @@ class Validator: line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in shuffledDict(self.columnNeighbors[columnID]): + for _, n in shuffledDict(self.columnNeighbors[columnID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received @@ -276,7 +287,7 @@ class Validator: line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in shuffledDict(self.rowNeighbors[rowID]): + for _, n in shuffledDict(self.rowNeighbors[rowID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received @@ -341,14 +352,14 @@ class Validator: (rID, cID) = self.sendQueue[0] if rID in self.rowIDs: - for neigh in self.rowNeighbors[rID].values(): + for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): self.sendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: return if cID in self.columnIDs: - for neigh in self.columnNeighbors[cID].values(): + for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): self.sendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: @@ -359,16 +370,16 @@ class Validator: progress = True while (progress): progress = False - for rID, neighs in self.rowNeighbors.items(): - for neigh in neighs.values(): + for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines): + for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): if (neigh.sendQueue): self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) progress = True if self.statsTxInSlot >= self.bwUplink: return - for cID, neighs in self.columnNeighbors.items(): - for neigh in neighs.values(): + for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines): + for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): if (neigh.sendQueue): self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) progress = True