From e285890fa734fcc2f9db7e06a4155a88de34071a Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Wed, 29 Mar 2023 16:42:09 +0200 Subject: [PATCH] Fixes allocation bug, remove duplicates in rowIDs and columnIDs, add diagnostics when the block is not available. Add number of steps without progress to stop condition. --- DAS/simulator.py | 78 ++++++++++++++++++++++++++++++++++++++--------- config_example.py | 6 ++++ 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 04d1eca..6ec3bfa 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -48,11 +48,19 @@ class Simulator: lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) totalValidators = lightVal + heavyVal - rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) - columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + totalRows = totalValidators * self.shape.chi + rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) offset = heavyVal*self.shape.chi random.shuffle(rows) random.shuffle(columns) + self.logger.debug("There is a total of %d validators" % totalValidators, extra=self.format) + self.logger.debug("Shuffling a total of %d rows/columns" % len(rows), extra=self.format) + self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) + self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) + + assignatedRows = [] + assignatedCols = [] for i in range(self.shape.numberNodes): if self.config.evenLineDistribution: if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes @@ -62,9 +70,17 @@ class Simulator: j = i - int(heavyVal/self.shape.vpn2) start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) - r = rows[start:end] - c = columns[start:end] + # Remove duplicates + r = list(dict.fromkeys(rows[start:end])) + c = list(dict.fromkeys(columns[start:end])) + r.sort() + c.sort() val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) + self.logger.debug("Validators %d row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) + self.logger.debug("Validators %d column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) + assignatedRows = assignatedRows + r + assignatedCols = assignatedCols + c + else: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: @@ -72,6 +88,11 @@ class Simulator: else: val.logIDs() self.validators.append(val) + + assignatedRows.sort() + assignatedCols.sort() + self.logger.debug("Rows assignated: %s" % str(assignatedRows), extra=self.format) + self.logger.debug("Columns assignated: %s" % str(assignatedCols), extra=self.format) self.logger.debug("Validators initialized.", extra=self.format) def initNetwork(self): @@ -86,12 +107,14 @@ class Simulator: columnChannels[id].append(v) # Check rows/columns distribution - #totalR = 0 - #totalC = 0 - #for r in rowChannels: - # totalR += len(r) - #for c in columnChannels: - # totalC += len(c) + distR = [] + distC = [] + for r in rowChannels: + distR.append(len(r)) + for c in columnChannels: + distC.append(len(c)) + self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(distR), max(distR)), extra=self.format) + self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(distC), max(distC)), extra=self.format) for id in range(self.shape.blockSize): @@ -164,6 +187,29 @@ class Simulator: logger.addHandler(ch) self.logger = logger + def printDiagnostics(self): + """Print all required diagnostics to check when a block does not become available""" + for val in self.validators: + (a, e) = val.checkStatus() + if e-a > 0 and val.ID != 0: + self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format) + for r in val.rowIDs: + row = val.getRow(r) + if row.count() < len(row): + self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format) + neiR = val.rowNeighbors[r] + for nr in neiR: + self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format) + self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format) + for c in val.columnIDs: + col = val.getColumn(c) + if col.count() < len(col): + self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format) + neiC = val.columnNeighbors[c] + for nc in neiC: + self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) + self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) + def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) @@ -194,14 +240,14 @@ class Simulator: # log TX and RX statistics trafficStats = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: %s" % + self.logger.debug("step %d: %s" % (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) - self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) cnS = "samples received" @@ -229,9 +275,13 @@ class Simulator: }) if missingSamples == oldMissingSamples: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if len(missingVector) > self.config.steps4StopCondition: + if missingSamples == missingVector[len(missingVector)-1-self.config.steps4StopCondition]: + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if self.config.diagnostics: + self.printDiagnostics() + break missingVector.append(missingSamples) - break elif missingSamples == 0: #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) diff --git a/config_example.py b/config_example.py index 461ee36..d07a899 100644 --- a/config_example.py +++ b/config_example.py @@ -74,6 +74,12 @@ deterministic = False # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" +# If True, print diagnostics when the block is not available +diagnostics = False + +# Number of steps without progress to stop simulation +steps4StopCondition = 7 + def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):