From 2ff4bf4825906429917f6ddeee72228898cc955a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 1 Mar 2024 15:29:16 +0100 Subject: [PATCH 1/5] add forwardOnReceive Normally, nodes would queue messages for forwarding to mesh neighbors right on receive Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 2b489b8..f7cb014 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -111,6 +111,7 @@ class Validator: self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + self.forwardOnReceive = True # forward segments as soon as received def logIDs(self): """It logs the assigned rows and columns.""" @@ -196,8 +197,9 @@ class Validator: if not self.receivedBlock.getSegment(rID, cID): self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((rID, cID)) + if self.forwardOnReceive: + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.statsRxDupInSlot += 1 From 71394768ccf66184e63d9fddb461408140fd4f13 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 1 Mar 2024 15:30:38 +0100 Subject: [PATCH 2/5] add forwardOnRepair If not forwarding on receive, nodes can forward when a whole line is available. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f7cb014..f61f8d4 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -112,6 +112,7 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps self.forwardOnReceive = True # forward segments as soon as received + self.forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" @@ -496,7 +497,7 @@ class Validator: # If operation is based on send queues, segments should # be queued after successful repair. for i in range(len(rep)): - if rep[i]: + if rep[i] or self.forwardOnRepair: self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -514,7 +515,7 @@ class Validator: # If operation is based on send queues, segments should # be queued after successful repair. for i in range(len(rep)): - if rep[i]: + if rep[i] or self.forwardOnRepair: self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1) From fab1dff617bf288efab420357eec0a21998409a8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 1 Mar 2024 15:32:13 +0100 Subject: [PATCH 3/5] add queueAllOnInit instead of using a dynamic scheduler, the bolck builder can queue up everything for sending at the beginning. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index f61f8d4..c9accde 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -111,6 +111,7 @@ class Validator: self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + self.queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning self.forwardOnReceive = True # forward segments as soon as received self.forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) @@ -178,6 +179,17 @@ class Validator: measuredFailureRate = nbFailures * 100 / (self.shape.blockSizeR * self.shape.blockSizeC) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + if self.queueAllOnInit: + for r in range(self.shape.blockSizeC): + for c in range(self.shape.blockSizeR): + if self.block.getSegment(r,c): + if r in self.rowNeighbors: + for n in self.rowNeighbors[r].values(): + n.sendQueue.append(c) + if c in self.columnNeighbors: + for n in self.columnNeighbors[c].values(): + n.sendQueue.append(r) + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) From 23f22eb4d5121c85da17816469407ec463cb18b8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 4 Mar 2024 09:52:15 +0100 Subject: [PATCH 4/5] expose scheduler related configs in config.py Signed-off-by: Csaba Kiraly --- DAS/validator.py | 28 ++++++++++++++-------------- smallConf.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index c9accde..d57988a 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -100,20 +100,20 @@ class Validator: self.bwUplink = shape.bwUplink2 self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize - self.repairOnTheFly = True - self.sendLineUntilR = self.shape.blockSizeRK # stop sending on a p2p link if at least this amount of samples passed - self.sendLineUntilC = self.shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed - self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) - self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node - self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch - self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send - self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor - self.dumbRandomScheduler = False # dumb random scheduler - self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat - self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - self.queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning - self.forwardOnReceive = True # forward segments as soon as received - self.forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) + self.repairOnTheFly = config.evalConf(self, config.repairOnTheFly, shape) + self.sendLineUntilR = config.evalConf(self, config.sendLineUntilR, shape) # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilC = config.evalConf(self, config.sendLineUntilC, shape) # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = config.evalConf(self, config.perNeighborQueue, shape) # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = config.evalConf(self, config.shuffleQueues, shape) # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = config.evalConf(self, config.perNodeQueue, shape) # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = config.evalConf(self, config.shuffleLines, shape) # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = config.evalConf(self, config.shuffleNeighbors, shape) # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = config.evalConf(self, config.dumbRandomScheduler, shape) # dumb random scheduler + self.segmentShuffleScheduler = config.evalConf(self, config.segmentShuffleScheduler, shape) # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps + self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning + self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received + self.forwardOnRepair = config.evalConf(self, config.forwardOnRepair, shape) # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" diff --git a/smallConf.py b/smallConf.py index 530fb7e..f05b7d6 100644 --- a/smallConf.py +++ b/smallConf.py @@ -106,6 +106,22 @@ diagnostics = False # True to save git diff and git commit saveGit = False +# configure Node options +repairOnTheFly = True +sendLineUntilR = "shape.blockSizeRK" # stop sending on a p2p link if at least this amount of samples passed +sendLineUntilC = lambda shape : shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed +perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) +shuffleQueues = True # shuffle the order of picking from active queues of a sender node +perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch +shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send +shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor +dumbRandomScheduler = False # dumb random scheduler +segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat +segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps +queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning +forwardOnReceive = True # forward segments as soon as received +forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) + def nextShape(): for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): From ea8996f1f2c005b6e88aa198940765584066ed77 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 12 Mar 2024 23:16:52 +0100 Subject: [PATCH 5/5] rename forwardOnRepair to forwardWhenLineReceived Signed-off-by: Csaba Kiraly --- DAS/node.py | 20 ++++++++++++++++---- smallConf.py | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index c50504c..85aaf8a 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -139,7 +139,7 @@ class Node: self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received - self.forwardOnRepair = config.evalConf(self, config.forwardOnRepair, shape) # forward all segments when full line available (repaired segments are always forwarded) + self.forwardWhenLineReceived = config.evalConf(self, config.forwardWhenLineReceived, shape) # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" @@ -549,7 +549,19 @@ class Node: self.restoreRow(id) def restoreRow(self, id): - """Restore a given row if repairable.""" + """Restore a given row if repairable. + + The functions checks if the row can be repaired based on the number of segments. + If at least K segments are available, it repairs all remaining segments. + It also forwards repaired segments as follows: + - if forwardWhenLineReceived=False, it is assumed that received segments were + already forwarded, so it forwards only the new (repaired) segments. + - if forwardWhenLineReceived=True, none of the received segments were forwarded + yet. When the line is received (i.e. when repair is possible), it forwards all + segments of the line. + Forwarding here also means cross-posting to the respective column topic, if + subscribed. + """ rep, repairedSamples = self.block.repairRow(id) self.repairedSampleCount += repairedSamples if (rep.any()): @@ -557,7 +569,7 @@ class Node: # be queued after successful repair. self.restoreRowCount += 1 for i in range(len(rep)): - if rep[i] or self.forwardOnRepair: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", id, i, extra=self.format) if not self.amImalicious: self.addToSendQueue(id, i) @@ -578,7 +590,7 @@ class Node: # be queued after successful repair. self.restoreColumnCount += 1 for i in range(len(rep)): - if rep[i] or self.forwardOnRepair: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", i, id, extra=self.format) if not self.amImalicious: self.addToSendQueue(i, id) diff --git a/smallConf.py b/smallConf.py index b687329..18a9c17 100644 --- a/smallConf.py +++ b/smallConf.py @@ -127,7 +127,7 @@ segmentShuffleScheduler = True # send each segment that's worth sending once in segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning forwardOnReceive = True # forward segments as soon as received -forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) +forwardWhenLineReceived = False # forward all segments when full line available (repaired segments are always forwarded) cols = range(64, 113, 128) rows = range(32, 113, 128)