From 23f22eb4d5121c85da17816469407ec463cb18b8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 4 Mar 2024 09:52:15 +0100 Subject: [PATCH] expose scheduler related configs in config.py Signed-off-by: Csaba Kiraly --- DAS/validator.py | 28 ++++++++++++++-------------- smallConf.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index c9accde..d57988a 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -100,20 +100,20 @@ class Validator: self.bwUplink = shape.bwUplink2 self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize - self.repairOnTheFly = True - self.sendLineUntilR = self.shape.blockSizeRK # stop sending on a p2p link if at least this amount of samples passed - self.sendLineUntilC = self.shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed - self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) - self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node - self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch - self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send - self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor - self.dumbRandomScheduler = False # dumb random scheduler - self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat - self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - self.queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning - self.forwardOnReceive = True # forward segments as soon as received - self.forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) + self.repairOnTheFly = config.evalConf(self, config.repairOnTheFly, shape) + self.sendLineUntilR = config.evalConf(self, config.sendLineUntilR, shape) # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilC = config.evalConf(self, config.sendLineUntilC, shape) # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = config.evalConf(self, config.perNeighborQueue, shape) # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = config.evalConf(self, config.shuffleQueues, shape) # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = config.evalConf(self, config.perNodeQueue, shape) # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = config.evalConf(self, config.shuffleLines, shape) # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = config.evalConf(self, config.shuffleNeighbors, shape) # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = config.evalConf(self, config.dumbRandomScheduler, shape) # dumb random scheduler + self.segmentShuffleScheduler = config.evalConf(self, config.segmentShuffleScheduler, shape) # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps + self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning + self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received + self.forwardOnRepair = config.evalConf(self, config.forwardOnRepair, shape) # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" diff --git a/smallConf.py b/smallConf.py index 530fb7e..f05b7d6 100644 --- a/smallConf.py +++ b/smallConf.py @@ -106,6 +106,22 @@ diagnostics = False # True to save git diff and git commit saveGit = False +# configure Node options +repairOnTheFly = True +sendLineUntilR = "shape.blockSizeRK" # stop sending on a p2p link if at least this amount of samples passed +sendLineUntilC = lambda shape : shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed +perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) +shuffleQueues = True # shuffle the order of picking from active queues of a sender node +perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch +shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send +shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor +dumbRandomScheduler = False # dumb random scheduler +segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat +segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps +queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning +forwardOnReceive = True # forward segments as soon as received +forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) + def nextShape(): for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):