From dc2710f012830b855b784b66158c5e1d4e154d3e Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 13 Apr 2023 21:07:43 +0200 Subject: [PATCH] make messages be a half-line large - enable NodeQueue in proposer - fill queue at beginning - do not forward individual samples - forward when repair is possible, i.e. when half-line was received Signed-off-by: Csaba Kiraly --- DAS/validator.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 09b93fa..d79f419 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -102,11 +102,11 @@ class Validator: self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node - self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.perNodeQueue = self.amIproposer # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler - self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleScheduler = False # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps def logIDs(self): @@ -177,6 +177,26 @@ class Validator: measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.info("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + # for r in range(self.shape.blockSize): + # for c in range(self.shape.blockSize): + # if self.block.getSegment(r,c): + # #self.sendQueue.append((r,c)) + # list(self.rowNeighbors[r].values())[0].sendQueue.append(c) + + # for i in range(self.shape.blockSize): + # for line in range(self.shape.blockSize): + # if self.block.getSegment(line,i): + # list(self.rowNeighbors[line].values())[0].sendQueue.append(i) + # if self.block.getSegment(i,line): + # list(self.columnNeighbors[line].values())[0].sendQueue.append(i) + + for line in range(self.shape.blockSize): + for i in range(self.shape.blockSize): + if self.block.getSegment(line,i): + list(self.rowNeighbors[line].values())[0].sendQueue.append(i) + if self.block.getSegment(i,line): + list(self.columnNeighbors[line].values())[0].sendQueue.append(i) + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) @@ -197,8 +217,8 @@ class Validator: if not self.receivedBlock.getSegment(rID, cID): self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((rID, cID)) + # if self.perNodeQueue or self.perNeighborQueue: + # self.receivedQueue.append((rID, cID)) else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.statsRxDupInSlot += 1 @@ -495,7 +515,7 @@ class Validator: # If operation is based on send queues, segments should # be queued after successful repair. for i in range(len(rep)): - if rep[i]: + # if rep[i]: self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -513,7 +533,7 @@ class Validator: # If operation is based on send queues, segments should # be queued after successful repair. for i in range(len(rep)): - if rep[i]: + # if rep[i]: self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1)