From ea8996f1f2c005b6e88aa198940765584066ed77 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 12 Mar 2024 23:16:52 +0100 Subject: [PATCH] rename forwardOnRepair to forwardWhenLineReceived Signed-off-by: Csaba Kiraly --- DAS/node.py | 20 ++++++++++++++++---- smallConf.py | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index c50504c..85aaf8a 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -139,7 +139,7 @@ class Node: self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received - self.forwardOnRepair = config.evalConf(self, config.forwardOnRepair, shape) # forward all segments when full line available (repaired segments are always forwarded) + self.forwardWhenLineReceived = config.evalConf(self, config.forwardWhenLineReceived, shape) # forward all segments when full line available (repaired segments are always forwarded) def logIDs(self): """It logs the assigned rows and columns.""" @@ -549,7 +549,19 @@ class Node: self.restoreRow(id) def restoreRow(self, id): - """Restore a given row if repairable.""" + """Restore a given row if repairable. + + The functions checks if the row can be repaired based on the number of segments. + If at least K segments are available, it repairs all remaining segments. + It also forwards repaired segments as follows: + - if forwardWhenLineReceived=False, it is assumed that received segments were + already forwarded, so it forwards only the new (repaired) segments. + - if forwardWhenLineReceived=True, none of the received segments were forwarded + yet. When the line is received (i.e. when repair is possible), it forwards all + segments of the line. + Forwarding here also means cross-posting to the respective column topic, if + subscribed. + """ rep, repairedSamples = self.block.repairRow(id) self.repairedSampleCount += repairedSamples if (rep.any()): @@ -557,7 +569,7 @@ class Node: # be queued after successful repair. self.restoreRowCount += 1 for i in range(len(rep)): - if rep[i] or self.forwardOnRepair: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", id, i, extra=self.format) if not self.amImalicious: self.addToSendQueue(id, i) @@ -578,7 +590,7 @@ class Node: # be queued after successful repair. self.restoreColumnCount += 1 for i in range(len(rep)): - if rep[i] or self.forwardOnRepair: + if rep[i] or self.forwardWhenLineReceived: self.logger.trace("Rep: %d,%d", i, id, extra=self.format) if not self.amImalicious: self.addToSendQueue(i, id) diff --git a/smallConf.py b/smallConf.py index b687329..18a9c17 100644 --- a/smallConf.py +++ b/smallConf.py @@ -127,7 +127,7 @@ segmentShuffleScheduler = True # send each segment that's worth sending once in segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning forwardOnReceive = True # forward segments as soon as received -forwardOnRepair = False # forward all segments when full line available (repaired segments are always forwarded) +forwardWhenLineReceived = False # forward all segments when full line available (repaired segments are always forwarded) cols = range(64, 113, 128) rows = range(32, 113, 128)