diff --git a/DAS/node.py b/DAS/node.py index aecbf72..85aaf8a 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -126,17 +126,20 @@ class Node: self.bwUplink = shape.bwUplink2 self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize - self.repairOnTheFly = True - self.sendLineUntilR = self.shape.nbColsK # stop sending on a p2p link if at least this amount of samples passed - self.sendLineUntilC = self.shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed - self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) - self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node - self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch - self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send - self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor - self.dumbRandomScheduler = False # dumb random scheduler - self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat - self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + self.repairOnTheFly = config.evalConf(self, config.repairOnTheFly, shape) + self.sendLineUntilR = config.evalConf(self, config.sendLineUntilR, shape) # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilC = config.evalConf(self, config.sendLineUntilC, shape) # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = config.evalConf(self, config.perNeighborQueue, shape) # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = config.evalConf(self, config.shuffleQueues, shape) # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = config.evalConf(self, config.perNodeQueue, shape) # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = config.evalConf(self, config.shuffleLines, shape) # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = config.evalConf(self, config.shuffleNeighbors, shape) # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = config.evalConf(self, config.dumbRandomScheduler, shape) # dumb random scheduler + self.segmentShuffleScheduler = config.evalConf(self, config.segmentShuffleScheduler, shape) # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps + self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning + self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received + self.forwardWhenLineReceived = config.evalConf(self, config.forwardWhenLineReceived, shape) # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" @@ -202,6 +205,17 @@ class Node: measuredFailureRate = nbFailures * 100 / (self.shape.nbCols * self.shape.nbRows) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + if self.queueAllOnInit: + for r in range(self.shape.nbRows): + for c in range(self.shape.nbCols): + if self.block.getSegment(r,c): + if r in self.rowNeighbors: + for n in self.rowNeighbors[r].values(): + n.sendQueue.append(c) + if c in self.columnNeighbors: + for n in self.columnNeighbors[c].values(): + n.sendQueue.append(r) + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) @@ -223,9 +237,10 @@ class Node: self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) self.sampleRecvCount += 1 - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((rID, cID)) - self.msgRecvCount += 1 + if self.forwardOnReceive: + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) + self.msgRecvCount += 1 else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.statsRxDupInSlot += 1 @@ -534,7 +549,19 @@ class Node: self.restoreRow(id) def restoreRow(self, id): - """Restore a given row if repairable.""" + """Restore a given row if repairable. + + The functions checks if the row can be repaired based on the number of segments. + If at least K segments are available, it repairs all remaining segments. + It also forwards repaired segments as follows: + - if forwardWhenLineReceived=False, it is assumed that received segments were + already forwarded, so it forwards only the new (repaired) segments. + - if forwardWhenLineReceived=True, none of the received segments were forwarded + yet. When the line is received (i.e. when repair is possible), it forwards all + segments of the line. + Forwarding here also means cross-posting to the respective column topic, if + subscribed. + """ rep, repairedSamples = self.block.repairRow(id) self.repairedSampleCount += repairedSamples if (rep.any()): @@ -542,7 +569,7 @@ class Node: # be queued after successful repair. self.restoreRowCount += 1 for i in range(len(rep)): - if rep[i]: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", id, i, extra=self.format) if not self.amImalicious: self.addToSendQueue(id, i) @@ -563,7 +590,7 @@ class Node: # be queued after successful repair. self.restoreColumnCount += 1 for i in range(len(rep)): - if rep[i]: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", i, id, extra=self.format) if not self.amImalicious: self.addToSendQueue(i, id) diff --git a/smallConf.py b/smallConf.py index 685c2b1..18a9c17 100644 --- a/smallConf.py +++ b/smallConf.py @@ -113,6 +113,22 @@ diagnostics = False # True to save git diff and git commit saveGit = False +# configure Node options +repairOnTheFly = True +sendLineUntilR = "shape.nbColsK" # stop sending on a p2p link if at least this amount of samples passed +sendLineUntilC = lambda shape : shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed +perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) +shuffleQueues = True # shuffle the order of picking from active queues of a sender node +perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch +shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send +shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor +dumbRandomScheduler = False # dumb random scheduler +segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat +segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps +queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning +forwardOnReceive = True # forward segments as soon as received +forwardWhenLineReceived = False # forward all segments when full line available (repaired segments are always forwarded) + cols = range(64, 113, 128) rows = range(32, 113, 128) colsK = range(32, 65, 128)