From d38b8074f49087130b62dc4107c4e3bf11512fb8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:09:04 +0100 Subject: [PATCH 01/43] use main Signed-off-by: Csaba Kiraly --- study.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/study.py b/study.py index 2b2aab6..2532d13 100644 --- a/study.py +++ b/study.py @@ -63,6 +63,5 @@ def study(): vis = Visualizer(execID) vis.plotHeatmaps() - -study() - +if __name__ == "__main__": + study() From 954d40e75876c0d822b2909c4efbdf5e07ccd276 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:10:13 +0100 Subject: [PATCH 02/43] use new simulator object instead of reset Signed-off-by: Csaba Kiraly # Conflicts: # study.py --- study.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/study.py b/study.py index 2532d13..3b956e8 100644 --- a/study.py +++ b/study.py @@ -12,7 +12,20 @@ from DAS import * # https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls # and https://github.com/joblib/joblib/issues/1017 -def runOnce(sim, config, shape): +def initLogger(config): + """It initializes the logger.""" + logger = logging.getLogger("Study") + logger.setLevel(config.logLevel) + ch = logging.StreamHandler() + ch.setLevel(config.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger + +def runOnce(config, shape): + + sim = Simulator(shape, config) + if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) @@ -40,24 +53,24 @@ def study(): print("You need to pass a configuration file in parameter") exit(1) - shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - sim = Simulator(shape, config) - sim.initLogger() + logger = initLogger(config) + format = {"entity": "Study"} + results = [] now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) - sim.logger.info("Starting simulations:", extra=sim.format) + logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape) for shape in config.nextShape()) end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) if config.dumpXML: for res in results: res.dump(execID) - sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) + logger.info("Results dumped into results/%s/" % (execID), extra=format) if config.visualization: vis = Visualizer(execID) From 952d191ccd018276ddc09b04c380306d45addee5 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:12:08 +0100 Subject: [PATCH 03/43] remove unused golden data Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 +------- DAS/simulator.py | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index af5866a..ee0604b 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -13,18 +13,17 @@ class Observer: self.block = [] self.rows = [] self.columns = [] - self.goldenData = [] self.broadcasted = [] def reset(self): """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize - self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize self.broadcasted = Block(self.config.blockSize) + def checkRowsColumns(self, validators): """It checks how many validators have been assigned to each row and column.""" for val in validators: @@ -39,11 +38,6 @@ class Observer: if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) - def setGoldenData(self, block): - """Stores the original real data to compare it with future situations.""" - for i in range(self.config.blockSize*self.config.blockSize): - self.goldenData[i] = block.data[i] - def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 diff --git a/DAS/simulator.py b/DAS/simulator.py index 30c0d86..900609c 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -55,7 +55,6 @@ class Simulator: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: val.initBlock() - self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) From af124c07558ecb53beef94e2707d2e7cb6792c00 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:12:54 +0100 Subject: [PATCH 04/43] simplify code Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 -------- DAS/simulator.py | 1 - 2 files changed, 9 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index ee0604b..88fc8c4 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -10,14 +10,6 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [] - self.rows = [] - self.columns = [] - self.broadcasted = [] - - - def reset(self): - """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize diff --git a/DAS/simulator.py b/DAS/simulator.py index 900609c..bb27a8a 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -27,7 +27,6 @@ class Simulator: def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) - self.glob.reset() self.validators = [] if self.config.evenLineDistribution: From 6e4b37a3d2e41c0da1e827071a1b6e40dbc9bfb4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 11:16:43 +0100 Subject: [PATCH 05/43] adding log level TRACE Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 7 ++++++- DAS/validator.py | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index bb27a8a..0bdbfd6 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,7 @@ import networkx as nx import logging, random +from functools import partial, partialmethod from datetime import datetime from statistics import mean from DAS.tools import * @@ -135,6 +136,11 @@ class Simulator: def initLogger(self): """It initializes the logger.""" + logging.TRACE = 5 + logging.addLevelName(logging.TRACE, 'TRACE') + logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = partial(logging.log, logging.TRACE) + logger = logging.getLogger("DAS") if len(logger.handlers) == 0: logger.setLevel(self.logLevel) @@ -144,7 +150,6 @@ class Simulator: logger.addHandler(ch) self.logger = logger - def resetShape(self, shape): """It resets the parameters of the simulation.""" self.shape = shape diff --git a/DAS/validator.py b/DAS/validator.py index f869171..9a83d42 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -155,12 +155,12 @@ class Validator: if src in self.columnNeighbors[cID]: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) if self.perNodeQueue or self.perNeighborQueue: self.receivedQueue.append((rID, cID)) else: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 @@ -183,7 +183,7 @@ class Validator: if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: - self.logger.debug("Receiving the data...", extra=self.format) + self.logger.trace("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) self.block.merge(self.receivedBlock) @@ -219,7 +219,7 @@ class Validator: def sendSegmentToNeigh(self, rID, cID, neigh): """Send segment to a neighbor (without checks).""" - self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) @@ -454,7 +454,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -472,7 +472,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1) From 122f6a8348c5cb80559ab6c62a8441e15dc3a9af Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 11:29:06 +0100 Subject: [PATCH 06/43] remove resetShape Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 35 ++++++++++++----------------------- study.py | 4 +--- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 0bdbfd6..a6788b8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -25,6 +25,18 @@ class Simulator: self.proposerID = 0 self.glob = [] + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) @@ -150,29 +162,6 @@ class Simulator: logger.addHandler(ch) self.logger = logger - def resetShape(self, shape): - """It resets the parameters of the simulation.""" - self.shape = shape - self.result = Result(self.shape) - for val in self.validators: - val.shape.failureRate = shape.failureRate - val.shape.chi = shape.chi - val.shape.vpn1 = shape.vpn1 - val.shape.vpn2 = shape.vpn2 - - # In GossipSub the initiator might push messages without participating in the mesh. - # proposerPublishOnly regulates this behavior. If set to true, the proposer is not - # part of the p2p distribution graph, only pushes segments to it. If false, the proposer - # might get back segments from other peers since links are symmetric. - self.proposerPublishOnly = True - - # If proposerPublishOnly == True, this regulates how many copies of each segment are - # pushed out by the proposer. - # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) - # self.shape.netDegree: default behavior similar (but not same) to previous code - self.proposerPublishTo = self.shape.netDegree - - def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) diff --git a/study.py b/study.py index 3b956e8..e7e6ce6 100644 --- a/study.py +++ b/study.py @@ -24,14 +24,12 @@ def initLogger(config): def runOnce(config, shape): - sim = Simulator(shape, config) - if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + sim = Simulator(shape, config) sim.initLogger() - sim.resetShape(shape) sim.initValidators() sim.initNetwork() result = sim.run() From ff93161b8df53336e9852904805a68c8f2bd6ada Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 12:04:00 +0100 Subject: [PATCH 07/43] set vpn in validator Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 9a83d42..5ae1cd5 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -66,9 +66,9 @@ class Validator: else: #if shape.deterministic: # random.seed(self.ID) - vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 - self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) - self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) + self.vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) From 894009b4145aa8ec0aee23402acc0937e398ac4d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 08:34:02 +0100 Subject: [PATCH 08/43] Validator node: add nodeClass property Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5ae1cd5..49165a0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,12 +61,14 @@ class Validator: self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: + self.nodeClass = 0 self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: #if shape.deterministic: # random.seed(self.ID) - self.vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) @@ -83,7 +85,7 @@ class Validator: # TODO: this should be a parameter if self.amIproposer: self.bwUplink = shape.bwUplinkProd - elif self.ID <= shape.numberNodes * shape.class1ratio: + elif self.nodeClass == 1: self.bwUplink = shape.bwUplink1 else: self.bwUplink = shape.bwUplink2 From ec6ed7c1e8eda838f73072cc9f98d337f31c16e4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 14:53:48 +0100 Subject: [PATCH 09/43] dump results to XML after each run Signed-off-by: Csaba Kiraly --- study.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/study.py b/study.py index e7e6ce6..fde8099 100644 --- a/study.py +++ b/study.py @@ -22,7 +22,7 @@ def initLogger(config): logger.addHandler(ch) return logger -def runOnce(config, shape): +def runOnce(config, shape, execID): if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) @@ -34,6 +34,10 @@ def runOnce(config, shape): sim.initNetwork() result = sim.run() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + + if config.dumpXML: + result.dump(execID) + return result def study(): @@ -61,15 +65,10 @@ def study(): logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(config, shape) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) end = time.time() logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) - if config.dumpXML: - for res in results: - res.dump(execID) - logger.info("Results dumped into results/%s/" % (execID), extra=format) - if config.visualization: vis = Visualizer(execID) vis.plotHeatmaps() From 3795948564e96c78fe95499528239e5f22e01aaf Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Sat, 25 Mar 2023 10:27:23 +0100 Subject: [PATCH 10/43] fix requirements.txt --- DAS/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/requirements.txt b/DAS/requirements.txt index da7dcc7..f8c9fe4 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,7 +1,7 @@ bitarray==2.6.0 -DAS==0.29.0 dicttoxml==1.7.16 matplotlib==3.6.2 networkx==3.0 numpy==1.23.5 seaborn==0.12.2 +joblib==1.2.0 From 037c4cd67a516646e7636d961d179d978eec8385 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 12:32:39 +0100 Subject: [PATCH 11/43] count number of validators having all rows/columns Signed-off-by: Csaba Kiraly --- DAS/observer.py | 5 ++++- DAS/simulator.py | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 88fc8c4..ce435dd 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -44,9 +44,12 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + validated = 0 for val in validators: if val.amIproposer == 0: (a, e) = val.checkStatus() arrived += a expected += e - return (arrived, expected) + if a == e: + validated += 1 + return (arrived, expected, validated) diff --git a/DAS/simulator.py b/DAS/simulator.py index a6788b8..9582492 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -166,7 +166,7 @@ class Simulator: """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] steps = 0 @@ -198,10 +198,12 @@ class Simulator: for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) + self.logger.debug("step %d, missing %d of %d (%0.02f %%), validated (%0.02f %%)" + % (steps, missingSamples, expected, missingRate, + validated/(len(self.validators)-1)*100), extra=self.format) if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) From 119777787eb6c3c381ef69c6ef0f8a2d41ebd5d8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 12:04:11 +0100 Subject: [PATCH 12/43] add progress meters to observer Signed-off-by: Csaba Kiraly --- DAS/observer.py | 16 ++++++++++++++-- DAS/simulator.py | 11 ++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index ce435dd..0b80638 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -44,6 +44,7 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + ready = 0 validated = 0 for val in validators: if val.amIproposer == 0: @@ -51,5 +52,16 @@ class Observer: arrived += a expected += e if a == e: - validated += 1 - return (arrived, expected, validated) + ready += 1 + validated += val.vpn + return (arrived, expected, ready, validated) + + def getProgress(self, validators): + arrived, expected, ready, validated = self.checkStatus(validators) + missingSamples = expected - arrived + sampleProgress = arrived / expected + nodeProgress = ready / (len(validators)-1) + validatorCnt = sum([v.vpn for v in validators[1:]]) + validatorProgress = validated / validatorCnt + + return missingSamples, sampleProgress, nodeProgress, validatorProgress diff --git a/DAS/simulator.py b/DAS/simulator.py index 9582492..1744bf8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -166,7 +166,7 @@ class Simulator: """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected, validated = self.glob.checkStatus(self.validators) + arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] steps = 0 @@ -198,12 +198,9 @@ class Simulator: for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() - arrived, expected, validated = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%), validated (%0.02f %%)" - % (steps, missingSamples, expected, missingRate, - validated/(len(self.validators)-1)*100), extra=self.format) + missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) From 6616cc799f016a0ead7d1a3b703a414d45938193 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 14:26:42 +0100 Subject: [PATCH 13/43] move traffic stats calculation to observer Signed-off-by: Csaba Kiraly --- DAS/observer.py | 13 +++++++++++++ DAS/simulator.py | 10 +++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 0b80638..26b0632 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,5 +1,6 @@ #!/bin/python3 +from statistics import mean from DAS.block import * class Observer: @@ -65,3 +66,15 @@ class Observer: validatorProgress = validated / validatorCnt return missingSamples, sampleProgress, nodeProgress, validatorProgress + + def getTrafficStats(self, validators): + statsTxInSlot = [v.statsTxInSlot for v in validators] + statsRxInSlot = [v.statsRxInSlot for v in validators] + TX_prod = statsTxInSlot[0] + RX_prod = statsRxInSlot[0] + TX_avg = mean(statsTxInSlot[1:]) + TX_max = max(statsTxInSlot[1:]) + Rx_avg = mean(statsRxInSlot[1:]) + Rx_max = max(statsRxInSlot[1:]) + + return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max) diff --git a/DAS/simulator.py b/DAS/simulator.py index 1744bf8..82d9594 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -4,7 +4,6 @@ import networkx as nx import logging, random from functools import partial, partialmethod from datetime import datetime -from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -189,12 +188,9 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - statsTxInSlot = [v.statsTxInSlot for v in self.validators] - statsRxInSlot = [v.statsRxInSlot for v in self.validators] - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, statsTxInSlot[0], statsRxInSlot[0], - mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), - mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() From 7e4074938a48e5181efcabf27e5f5a6704521e90 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 14:33:35 +0100 Subject: [PATCH 14/43] add duplicate statistics Signed-off-by: Csaba Kiraly --- DAS/observer.py | 5 ++++- DAS/simulator.py | 6 +++--- DAS/validator.py | 6 +++++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 26b0632..6af4507 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -70,11 +70,14 @@ class Observer: def getTrafficStats(self, validators): statsTxInSlot = [v.statsTxInSlot for v in validators] statsRxInSlot = [v.statsRxInSlot for v in validators] + statsRxDupInSlot = [v.statsRxDupInSlot for v in validators] TX_prod = statsTxInSlot[0] RX_prod = statsRxInSlot[0] TX_avg = mean(statsTxInSlot[1:]) TX_max = max(statsTxInSlot[1:]) Rx_avg = mean(statsRxInSlot[1:]) Rx_max = max(statsRxInSlot[1:]) + RxDup_avg = mean(statsRxDupInSlot[1:]) + RxDup_max = max(statsRxDupInSlot[1:]) - return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max) + return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max) diff --git a/DAS/simulator.py b/DAS/simulator.py index 82d9594..e860d17 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -188,9 +188,9 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max), extra=self.format) + TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max = self.glob.getTrafficStats(self.validators) + self.logger.info("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f, RxDup_avg=%.1f, RxDup_max=%.1f" % + (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max ,RxDup_avg, RxDup_max), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() diff --git a/DAS/validator.py b/DAS/validator.py index 49165a0..cfa7fac 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -79,6 +79,8 @@ class Validator: self.statsTxPerSlot = [] self.statsRxInSlot = 0 self.statsRxPerSlot = [] + self.statsRxDupInSlot = 0 + self.statsRxDupPerSlot = [] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -163,7 +165,7 @@ class Validator: self.receivedQueue.append((rID, cID)) else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) - # self.statsRxDuplicateInSlot += 1 + self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): @@ -205,8 +207,10 @@ class Validator: """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsRxDupPerSlot.append(self.statsRxDupInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 + self.statsRxDupInSlot = 0 self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): From 23af30e381b7be97e7d44cf75cc2cc53849313a6 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 08:34:28 +0100 Subject: [PATCH 15/43] add generalized metrics collection Signed-off-by: Csaba Kiraly --- DAS/observer.py | 28 +++++++++++++++------------- DAS/results.py | 4 ++++ DAS/simulator.py | 9 ++++++--- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 6af4507..1bdf4d0 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,6 +1,6 @@ #!/bin/python3 -from statistics import mean +import numpy as np from DAS.block import * class Observer: @@ -68,16 +68,18 @@ class Observer: return missingSamples, sampleProgress, nodeProgress, validatorProgress def getTrafficStats(self, validators): - statsTxInSlot = [v.statsTxInSlot for v in validators] - statsRxInSlot = [v.statsRxInSlot for v in validators] - statsRxDupInSlot = [v.statsRxDupInSlot for v in validators] - TX_prod = statsTxInSlot[0] - RX_prod = statsRxInSlot[0] - TX_avg = mean(statsTxInSlot[1:]) - TX_max = max(statsTxInSlot[1:]) - Rx_avg = mean(statsRxInSlot[1:]) - Rx_max = max(statsRxInSlot[1:]) - RxDup_avg = mean(statsRxDupInSlot[1:]) - RxDup_max = max(statsRxDupInSlot[1:]) + def maxOrNan(l): + return np.max(l) if l else np.NaN - return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max) + trafficStats = {} + for cl in range(0,3): + Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl] + Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] + RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] + trafficStats[cl] = { + "Tx": {"mean": np.mean(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": np.mean(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": np.mean(RxDup), "max": maxOrNan(RxDup)}, + } + + return trafficStats diff --git a/DAS/results.py b/DAS/results.py index 0efe26d..61f4f04 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -13,6 +13,7 @@ class Result: self.blockAvailable = -1 self.tta = -1 self.missingVector = [] + self.metrics = {} def populate(self, shape, missingVector): """It populates part of the result data inside a vector.""" @@ -26,6 +27,9 @@ class Result: self.blockAvailable = 0 self.tta = -1 + def addMetric(self, name, metric): + self.metrics[name] = metric + def dump(self, execID): """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): diff --git a/DAS/simulator.py b/DAS/simulator.py index e860d17..4bd0242 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -168,6 +168,7 @@ class Simulator: arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + trafficStatsVector = [] steps = 0 while(True): missingVector.append(missingSamples) @@ -188,11 +189,12 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max = self.glob.getTrafficStats(self.validators) - self.logger.info("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f, RxDup_avg=%.1f, RxDup_max=%.1f" % - (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max ,RxDup_avg, RxDup_max), extra=self.format) + trafficStats = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: %s" % + (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() + trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" @@ -209,5 +211,6 @@ class Simulator: steps += 1 self.result.populate(self.shape, missingVector) + self.result.addMetric("trafficStats", trafficStatsVector) return self.result From eb4f451303e3fc14bbf6aa3f0c6de4ea5039d63f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:11:40 +0200 Subject: [PATCH 16/43] save progress and traffic statistics to XML Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 31 ++++++++++++++++++++++++++++++- config_example.py | 3 +++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 4bd0242..677a260 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,7 @@ import networkx as nx import logging, random +import pandas as pd from functools import partial, partialmethod from datetime import datetime from DAS.tools import * @@ -168,6 +169,7 @@ class Simulator: arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + progressVector = [] trafficStatsVector = [] steps = 0 while(True): @@ -199,6 +201,31 @@ class Simulator: missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) + + cnS = "samples received" + cnN = "nodes ready" + cnV = "validators ready" + cnT0 = "TX builder mean" + cnT1 = "TX class1 mean" + cnT2 = "TX class2 mean" + cnR1 = "RX class1 mean" + cnR2 = "RX class2 mean" + cnD1 = "Dup class1 mean" + cnD2 = "Dup class2 mean" + + progressVector.append({ + cnS:sampleProgress, + cnN:nodeProgress, + cnV:validatorProgress, + cnT0: trafficStats[0]["Tx"]["mean"], + cnT1: trafficStats[1]["Tx"]["mean"], + cnT2: trafficStats[2]["Tx"]["mean"], + cnR1: trafficStats[1]["Rx"]["mean"], + cnR2: trafficStats[2]["Rx"]["mean"], + cnD1: trafficStats[1]["RxDup"]["mean"], + cnD2: trafficStats[2]["RxDup"]["mean"], + }) + if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) @@ -210,7 +237,9 @@ class Simulator: else: steps += 1 + progress = pd.DataFrame(progressVector) + if self.config.saveProgress: + self.result.addMetric("progress", progress.to_dict(orient='list')) self.result.populate(self.shape, missingVector) - self.result.addMetric("trafficStats", trafficStatsVector) return self.result diff --git a/config_example.py b/config_example.py index af55fc2..5b1a396 100644 --- a/config_example.py +++ b/config_example.py @@ -19,6 +19,9 @@ import numpy as np from DAS.shape import Shape dumpXML = 1 + +# save progress vectors to XML +saveProgress = 1 visualization = 1 logLevel = logging.INFO From dc51727b329ede185e7008e6d5a0f88b4e611025 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:16:05 +0200 Subject: [PATCH 17/43] plot progress per run Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 14 +++++++++++++- config_example.py | 4 ++++ study.py | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 677a260..c2f4654 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -3,6 +3,7 @@ import networkx as nx import logging, random import pandas as pd +import matplotlib from functools import partial, partialmethod from datetime import datetime from DAS.tools import * @@ -13,7 +14,7 @@ from DAS.validator import * class Simulator: """This class implements the main DAS simulator.""" - def __init__(self, shape, config): + def __init__(self, shape, config, execID): """It initializes the simulation with a set of parameters (shape).""" self.shape = shape self.config = config @@ -24,6 +25,7 @@ class Simulator: self.logLevel = config.logLevel self.proposerID = 0 self.glob = [] + self.execID = execID # In GossipSub the initiator might push messages without participating in the mesh. # proposerPublishOnly regulates this behavior. If set to true, the proposer is not @@ -240,6 +242,16 @@ class Simulator: progress = pd.DataFrame(progressVector) if self.config.saveProgress: self.result.addMetric("progress", progress.to_dict(orient='list')) + if self.config.plotProgress: + progress.plot.line(subplots = [[cnS, cnN, cnV], [cnT0], [cnT1, cnR1, cnD1], [cnT2, cnR2, cnD2]], + title = str(self.shape)) + if not os.path.exists("results"): + os.makedirs("results") + if not os.path.exists("results/"+self.execID): + os.makedirs("results/"+self.execID) + filePath = "results/"+self.execID+"/"+str(self.shape)+".png" + matplotlib.pyplot.savefig(filePath) + self.result.populate(self.shape, missingVector) return self.result diff --git a/config_example.py b/config_example.py index 5b1a396..461ee36 100644 --- a/config_example.py +++ b/config_example.py @@ -22,6 +22,10 @@ dumpXML = 1 # save progress vectors to XML saveProgress = 1 + +# plot progress for each run to PNG +plotProgress = 1 + visualization = 1 logLevel = logging.INFO diff --git a/study.py b/study.py index fde8099..aa27a5f 100644 --- a/study.py +++ b/study.py @@ -28,7 +28,7 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) - sim = Simulator(shape, config) + sim = Simulator(shape, config, execID) sim.initLogger() sim.initValidators() sim.initNetwork() From 04ad03f1751168368d17eaadd6da70e04d24d2dd Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:27:49 +0200 Subject: [PATCH 18/43] fixup: avoid warning on mean if empty Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 1bdf4d0..18c0461 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -70,6 +70,8 @@ class Observer: def getTrafficStats(self, validators): def maxOrNan(l): return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN trafficStats = {} for cl in range(0,3): @@ -77,9 +79,9 @@ class Observer: Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] trafficStats[cl] = { - "Tx": {"mean": np.mean(Tx), "max": maxOrNan(Tx)}, - "Rx": {"mean": np.mean(Rx), "max": maxOrNan(Rx)}, - "RxDup": {"mean": np.mean(RxDup), "max": maxOrNan(RxDup)}, + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, } return trafficStats From 75a9b484e915f5a804486936a50d25acc1c54793 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:28:12 +0200 Subject: [PATCH 19/43] fixup: close plot to release memory Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/DAS/simulator.py b/DAS/simulator.py index c2f4654..04d1eca 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -251,6 +251,7 @@ class Simulator: os.makedirs("results/"+self.execID) filePath = "results/"+self.execID+"/"+str(self.shape)+".png" matplotlib.pyplot.savefig(filePath) + matplotlib.pyplot.close() self.result.populate(self.shape, missingVector) return self.result From f85cdb401bf3682c916812d79c35f10bb6738406 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 29 Mar 2023 15:49:52 +0200 Subject: [PATCH 20/43] fix line allocation when evenLineDistribution=True vector should have chi elements for each validator Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index a6788b8..c58b20f 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -46,8 +46,9 @@ class Simulator: lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) totalValidators = lightVal + heavyVal - rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) - columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + totalRows = totalValidators * self.shape.chi + rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) offset = heavyVal*self.shape.chi random.shuffle(rows) random.shuffle(columns) From 9f3089c232d941b897a8393a5671c0bb557ff5b1 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 29 Mar 2023 15:54:44 +0200 Subject: [PATCH 21/43] rowIDs and columnIDs are sets Fixes issue 29, where multiple instances of an ID in rowIDs created a topology with nodes with a huge degree. This huge degree then created lots of duplicates, eating up available bandwidth. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 49165a0..32ef149 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -69,8 +69,8 @@ class Validator: # random.seed(self.ID) self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 - self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) - self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.rowIDs = set(rows) if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = set(columns) if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) From cb0a3ea1bac38810d065a6d5e6346ffa6ca71671 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:27:49 +0200 Subject: [PATCH 22/43] fixup: avoid warning on mean if empty Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 1bdf4d0..18c0461 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -70,6 +70,8 @@ class Observer: def getTrafficStats(self, validators): def maxOrNan(l): return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN trafficStats = {} for cl in range(0,3): @@ -77,9 +79,9 @@ class Observer: Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] trafficStats[cl] = { - "Tx": {"mean": np.mean(Tx), "max": maxOrNan(Tx)}, - "Rx": {"mean": np.mean(Rx), "max": maxOrNan(Rx)}, - "RxDup": {"mean": np.mean(RxDup), "max": maxOrNan(RxDup)}, + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, } return trafficStats From e285890fa734fcc2f9db7e06a4155a88de34071a Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Wed, 29 Mar 2023 16:42:09 +0200 Subject: [PATCH 23/43] Fixes allocation bug, remove duplicates in rowIDs and columnIDs, add diagnostics when the block is not available. Add number of steps without progress to stop condition. --- DAS/simulator.py | 78 ++++++++++++++++++++++++++++++++++++++--------- config_example.py | 6 ++++ 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 04d1eca..6ec3bfa 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -48,11 +48,19 @@ class Simulator: lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) totalValidators = lightVal + heavyVal - rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) - columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + totalRows = totalValidators * self.shape.chi + rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) offset = heavyVal*self.shape.chi random.shuffle(rows) random.shuffle(columns) + self.logger.debug("There is a total of %d validators" % totalValidators, extra=self.format) + self.logger.debug("Shuffling a total of %d rows/columns" % len(rows), extra=self.format) + self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) + self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) + + assignatedRows = [] + assignatedCols = [] for i in range(self.shape.numberNodes): if self.config.evenLineDistribution: if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes @@ -62,9 +70,17 @@ class Simulator: j = i - int(heavyVal/self.shape.vpn2) start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) - r = rows[start:end] - c = columns[start:end] + # Remove duplicates + r = list(dict.fromkeys(rows[start:end])) + c = list(dict.fromkeys(columns[start:end])) + r.sort() + c.sort() val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) + self.logger.debug("Validators %d row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) + self.logger.debug("Validators %d column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) + assignatedRows = assignatedRows + r + assignatedCols = assignatedCols + c + else: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: @@ -72,6 +88,11 @@ class Simulator: else: val.logIDs() self.validators.append(val) + + assignatedRows.sort() + assignatedCols.sort() + self.logger.debug("Rows assignated: %s" % str(assignatedRows), extra=self.format) + self.logger.debug("Columns assignated: %s" % str(assignatedCols), extra=self.format) self.logger.debug("Validators initialized.", extra=self.format) def initNetwork(self): @@ -86,12 +107,14 @@ class Simulator: columnChannels[id].append(v) # Check rows/columns distribution - #totalR = 0 - #totalC = 0 - #for r in rowChannels: - # totalR += len(r) - #for c in columnChannels: - # totalC += len(c) + distR = [] + distC = [] + for r in rowChannels: + distR.append(len(r)) + for c in columnChannels: + distC.append(len(c)) + self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(distR), max(distR)), extra=self.format) + self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(distC), max(distC)), extra=self.format) for id in range(self.shape.blockSize): @@ -164,6 +187,29 @@ class Simulator: logger.addHandler(ch) self.logger = logger + def printDiagnostics(self): + """Print all required diagnostics to check when a block does not become available""" + for val in self.validators: + (a, e) = val.checkStatus() + if e-a > 0 and val.ID != 0: + self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format) + for r in val.rowIDs: + row = val.getRow(r) + if row.count() < len(row): + self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format) + neiR = val.rowNeighbors[r] + for nr in neiR: + self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format) + self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format) + for c in val.columnIDs: + col = val.getColumn(c) + if col.count() < len(col): + self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format) + neiC = val.columnNeighbors[c] + for nc in neiC: + self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) + self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) + def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) @@ -194,14 +240,14 @@ class Simulator: # log TX and RX statistics trafficStats = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: %s" % + self.logger.debug("step %d: %s" % (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) - self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) cnS = "samples received" @@ -229,9 +275,13 @@ class Simulator: }) if missingSamples == oldMissingSamples: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if len(missingVector) > self.config.steps4StopCondition: + if missingSamples == missingVector[len(missingVector)-1-self.config.steps4StopCondition]: + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if self.config.diagnostics: + self.printDiagnostics() + break missingVector.append(missingSamples) - break elif missingSamples == 0: #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) diff --git a/config_example.py b/config_example.py index 461ee36..d07a899 100644 --- a/config_example.py +++ b/config_example.py @@ -74,6 +74,12 @@ deterministic = False # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" +# If True, print diagnostics when the block is not available +diagnostics = False + +# Number of steps without progress to stop simulation +steps4StopCondition = 7 + def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): From 98db10f7a6a095036416df4f991bbdd816127e17 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 30 Mar 2023 00:01:06 +0200 Subject: [PATCH 24/43] Add more documentation Signed-off-by: Csaba Kiraly --- DAS/observer.py | 12 ++++++++++++ DAS/results.py | 1 + 2 files changed, 13 insertions(+) diff --git a/DAS/observer.py b/DAS/observer.py index 18c0461..235ed60 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -58,6 +58,17 @@ class Observer: return (arrived, expected, ready, validated) def getProgress(self, validators): + """Calculate current simulation progress with different metrics. + + Returns: + - missingSamples: overall number of sample instances missing in nodes. + Sample are counted on both rows and columns, so intersections of interest are counted twice. + - sampleProgress: previous expressed as progress ratio + - nodeProgress: ratio of nodes having all segments interested in + - validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator, + but counts a validator only if its support node's all validators see all interesting segments + TODO: add real per validator progress counter + """ arrived, expected, ready, validated = self.checkStatus(validators) missingSamples = expected - arrived sampleProgress = arrived / expected @@ -68,6 +79,7 @@ class Observer: return missingSamples, sampleProgress, nodeProgress, validatorProgress def getTrafficStats(self, validators): + """Summary statistics of traffic measurements in a timestep.""" def maxOrNan(l): return np.max(l) if l else np.NaN def meanOrNan(l): diff --git a/DAS/results.py b/DAS/results.py index 61f4f04..c5687a5 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -28,6 +28,7 @@ class Result: self.tta = -1 def addMetric(self, name, metric): + """Generic function to add a metric to the results.""" self.metrics[name] = metric def dump(self, execID): From 9800161ac91576a63ce99121c454c2361400d9ed Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Mon, 27 Mar 2023 15:29:39 +0200 Subject: [PATCH 25/43] Switch from time steps to miliseconds. Rebase to traffic progress. --- DAS/results.py | 15 ++++++++------- DAS/simulator.py | 13 +++++++------ DAS/visualizer.py | 6 ++++-- config_example.py | 15 +++++++++------ study.py | 4 ++-- 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/DAS/results.py b/DAS/results.py index c5687a5..48512cd 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -7,22 +7,23 @@ from dicttoxml import dicttoxml class Result: """This class stores and process/store the results of a simulation.""" - def __init__(self, shape): + def __init__(self, shape, execID): """It initializes the instance with a specific shape.""" self.shape = shape + self.execID = execID self.blockAvailable = -1 self.tta = -1 self.missingVector = [] self.metrics = {} - def populate(self, shape, missingVector): + def populate(self, shape, config, missingVector): """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector missingSamples = missingVector[-1] if missingSamples == 0: self.blockAvailable = 1 - self.tta = len(missingVector) + self.tta = len(missingVector) * (1000/config.stepDuration) else: self.blockAvailable = 0 self.tta = -1 @@ -31,12 +32,12 @@ class Result: """Generic function to add a metric to the results.""" self.metrics[name] = metric - def dump(self, execID): + def dump(self): """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): os.makedirs("results") - if not os.path.exists("results/"+execID): - os.makedirs("results/"+execID) + if not os.path.exists("results/"+self.execID): + os.makedirs("results/"+self.execID) resd1 = self.shape.__dict__ resd2 = self.__dict__.copy() resd2.pop("shape") @@ -44,6 +45,6 @@ class Result: resXml = dicttoxml(resd1) xmlstr = minidom.parseString(resXml) xmlPretty = xmlstr.toprettyxml() - filePath = "results/"+execID+"/"+str(self.shape)+".xml" + filePath = "results/"+self.execID+"/"+str(self.shape)+".xml" with open(filePath, "w") as f: f.write(xmlPretty) diff --git a/DAS/simulator.py b/DAS/simulator.py index 677a260..4acf3f5 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -13,12 +13,13 @@ from DAS.validator import * class Simulator: """This class implements the main DAS simulator.""" - def __init__(self, shape, config): + def __init__(self, shape, config, execID): """It initializes the simulation with a set of parameters (shape).""" self.shape = shape self.config = config self.format = {"entity": "Simulator"} - self.result = Result(self.shape) + self.execID = execID + self.result = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -192,14 +193,14 @@ class Simulator: # log TX and RX statistics trafficStats = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: %s" % + self.logger.debug("step %d: %s" % (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) - self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) cnS = "samples received" @@ -231,7 +232,7 @@ class Simulator: missingVector.append(missingSamples) break elif missingSamples == 0: - #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) + self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break else: @@ -240,6 +241,6 @@ class Simulator: progress = pd.DataFrame(progressVector) if self.config.saveProgress: self.result.addMetric("progress", progress.to_dict(orient='list')) - self.result.populate(self.shape, missingVector) + self.result.populate(self.shape, self.config, missingVector) return self.result diff --git a/DAS/visualizer.py b/DAS/visualizer.py index 8afd006..db4b2d5 100644 --- a/DAS/visualizer.py +++ b/DAS/visualizer.py @@ -36,7 +36,7 @@ class Visualizer: bwUplinkProd = int(root.find('bwUplinkProd').text) bwUplink1 = int(root.find('bwUplink1').text) bwUplink2 = int(root.find('bwUplink2').text) - tta = int(root.find('tta').text) + tta = float(root.find('tta').text) # Loop over all possible combinations of of the parameters minus two for combination in combinations(self.parameters, len(self.parameters)-2): @@ -120,7 +120,7 @@ class Visualizer: hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas']) hist = hist.T fig, ax = plt.subplots(figsize=(10, 6)) - sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='Purples', cbar_kws={'label': 'Time to block availability'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax) + sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='Purples', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax) plt.xlabel(self.formatLabel(labels[0])) plt.ylabel(self.formatLabel(labels[1])) filename = "" @@ -131,6 +131,8 @@ class Visualizer: filename += f"{key[paramValueCnt]}" formattedTitle = self.formatTitle(key[paramValueCnt]) title += formattedTitle + if (paramValueCnt+1) % 5 == 0: + title += "\n" paramValueCnt += 1 title_obj = plt.title(title) font_size = 16 * fig.get_size_inches()[0] / 10 diff --git a/config_example.py b/config_example.py index 5b1a396..3ff5ae8 100644 --- a/config_example.py +++ b/config_example.py @@ -27,14 +27,14 @@ logLevel = logging.INFO # number of parallel workers. -1: all cores; 1: sequential # for more details, see joblib.Parallel -numJobs = 3 +numJobs = -1 # distribute rows/columns evenly between validators (True) # or generate it using local randomness (False) evenLineDistribution = True # Number of simulation runs with the same parameters for statistical relevance -runs = range(10) +runs = range(2) # Number of validators numberNodes = range(256, 513, 128) @@ -49,14 +49,14 @@ blockSizes = range(32,65,16) netDegrees = range(6, 9, 2) # number of rows and columns a validator is interested in -chis = range(1, 5, 2) +chis = range(2, 5, 2) # ratio of class1 nodes (see below for parameters per class) -class1ratios = np.arange(0, 1, .2) +class1ratios = [0.8, 0.9] # Number of validators per beacon node validatorsPerNode1 = [1] -validatorsPerNode2 = [2, 4, 8, 16, 32] +validatorsPerNode2 = [500] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -64,8 +64,11 @@ bwUplinksProd = [2200] bwUplinks1 = [110] bwUplinks2 = [2200] +# Step duration in miliseconds (Classic RTT is about 100ms) +stepDuration = 50 + # Set to True if you want your run to be deterministic, False if not -deterministic = False +deterministic = True # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" diff --git a/study.py b/study.py index fde8099..74dba63 100644 --- a/study.py +++ b/study.py @@ -28,7 +28,7 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) - sim = Simulator(shape, config) + sim = Simulator(shape, config, execID) sim.initLogger() sim.initValidators() sim.initNetwork() @@ -36,7 +36,7 @@ def runOnce(config, shape, execID): sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) if config.dumpXML: - result.dump(execID) + result.dump() return result From 41e83991591eab9d9d8ea2e787c0064628c10cd0 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Mon, 27 Mar 2023 21:24:25 +0200 Subject: [PATCH 26/43] Add Tx and Rx stats to resultsi. Rebase to traffic progress. --- DAS/results.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/DAS/results.py b/DAS/results.py index 48512cd..469810c 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -16,10 +16,16 @@ class Result: self.missingVector = [] self.metrics = {} - def populate(self, shape, config, missingVector): + def populate(self, shape, config, missingVector, bandwidthVector): """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector + self.proTx = bandwidthVector[0] + self.proRx = bandwidthVector[1] + self.aveTx = bandwidthVector[2] + self.maxTx = bandwidthVector[3] + self.aveRx = bandwidthVector[4] + self.maxRx = bandwidthVector[5] missingSamples = missingVector[-1] if missingSamples == 0: self.blockAvailable = 1 From b5390b9f1b8d777f8df75a1174fe757e759255a8 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Wed, 29 Mar 2023 17:01:28 +0200 Subject: [PATCH 27/43] Remove traffic statsi. Rebase to traffic progress. --- DAS/results.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/DAS/results.py b/DAS/results.py index 469810c..48512cd 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -16,16 +16,10 @@ class Result: self.missingVector = [] self.metrics = {} - def populate(self, shape, config, missingVector, bandwidthVector): + def populate(self, shape, config, missingVector): """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector - self.proTx = bandwidthVector[0] - self.proRx = bandwidthVector[1] - self.aveTx = bandwidthVector[2] - self.maxTx = bandwidthVector[3] - self.aveRx = bandwidthVector[4] - self.maxRx = bandwidthVector[5] missingSamples = missingVector[-1] if missingSamples == 0: self.blockAvailable = 1 From 795bb1d10d189315c310014ba857f959df8d2c8b Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 13:15:42 +0200 Subject: [PATCH 28/43] Move set to simulator for future diagnostic purposes --- DAS/simulator.py | 6 ++++-- DAS/validator.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index c58b20f..5aac495 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -61,8 +61,10 @@ class Simulator: j = i - int(heavyVal/self.shape.vpn2) start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) - r = rows[start:end] - c = columns[start:end] + r = list(set(rows[start:end])) + c = list(set(columns[start:end])) + r.sort() + c.sort() val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) else: val = Validator(i, int(not i!=0), self.logger, self.shape) diff --git a/DAS/validator.py b/DAS/validator.py index 32ef149..49165a0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -69,8 +69,8 @@ class Validator: # random.seed(self.ID) self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 - self.rowIDs = set(rows) if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) - self.columnIDs = set(columns) if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) From 296c4fb762be2b8409c3521ae561533454ab1a97 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 13:36:48 +0200 Subject: [PATCH 29/43] Rows and columns to sets --- DAS/simulator.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 5aac495..99cb5a5 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -61,10 +61,8 @@ class Simulator: j = i - int(heavyVal/self.shape.vpn2) start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) - r = list(set(rows[start:end])) - c = list(set(columns[start:end])) - r.sort() - c.sort() + r = set(rows[start:end]) + c = set(columns[start:end]) val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) else: val = Validator(i, int(not i!=0), self.logger, self.shape) From 7719f84a10a7f5d867aa608876cb29c03fa6b535 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 30 Mar 2023 13:41:50 +0200 Subject: [PATCH 30/43] Switch from time steps to miliseconds (#30) Switch from time steps to miliseconds --- DAS/results.py | 13 +++++++------ DAS/simulator.py | 13 +++++++------ DAS/visualizer.py | 6 ++++-- config_example.py | 15 +++++++++------ study.py | 2 +- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/DAS/results.py b/DAS/results.py index c5687a5..a11da24 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -7,22 +7,23 @@ from dicttoxml import dicttoxml class Result: """This class stores and process/store the results of a simulation.""" - def __init__(self, shape): + def __init__(self, shape, execID): """It initializes the instance with a specific shape.""" self.shape = shape + self.execID = execID self.blockAvailable = -1 self.tta = -1 self.missingVector = [] self.metrics = {} - def populate(self, shape, missingVector): + def populate(self, shape, config, missingVector): """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector missingSamples = missingVector[-1] if missingSamples == 0: self.blockAvailable = 1 - self.tta = len(missingVector) + self.tta = len(missingVector) * (config.stepDuration) else: self.blockAvailable = 0 self.tta = -1 @@ -35,8 +36,8 @@ class Result: """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): os.makedirs("results") - if not os.path.exists("results/"+execID): - os.makedirs("results/"+execID) + if not os.path.exists("results/"+self.execID): + os.makedirs("results/"+self.execID) resd1 = self.shape.__dict__ resd2 = self.__dict__.copy() resd2.pop("shape") @@ -44,6 +45,6 @@ class Result: resXml = dicttoxml(resd1) xmlstr = minidom.parseString(resXml) xmlPretty = xmlstr.toprettyxml() - filePath = "results/"+execID+"/"+str(self.shape)+".xml" + filePath = "results/"+self.execID+"/"+str(self.shape)+".xml" with open(filePath, "w") as f: f.write(xmlPretty) diff --git a/DAS/simulator.py b/DAS/simulator.py index 677a260..4acf3f5 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -13,12 +13,13 @@ from DAS.validator import * class Simulator: """This class implements the main DAS simulator.""" - def __init__(self, shape, config): + def __init__(self, shape, config, execID): """It initializes the simulation with a set of parameters (shape).""" self.shape = shape self.config = config self.format = {"entity": "Simulator"} - self.result = Result(self.shape) + self.execID = execID + self.result = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -192,14 +193,14 @@ class Simulator: # log TX and RX statistics trafficStats = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: %s" % + self.logger.debug("step %d: %s" % (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) - self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) cnS = "samples received" @@ -231,7 +232,7 @@ class Simulator: missingVector.append(missingSamples) break elif missingSamples == 0: - #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) + self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break else: @@ -240,6 +241,6 @@ class Simulator: progress = pd.DataFrame(progressVector) if self.config.saveProgress: self.result.addMetric("progress", progress.to_dict(orient='list')) - self.result.populate(self.shape, missingVector) + self.result.populate(self.shape, self.config, missingVector) return self.result diff --git a/DAS/visualizer.py b/DAS/visualizer.py index 8afd006..db4b2d5 100644 --- a/DAS/visualizer.py +++ b/DAS/visualizer.py @@ -36,7 +36,7 @@ class Visualizer: bwUplinkProd = int(root.find('bwUplinkProd').text) bwUplink1 = int(root.find('bwUplink1').text) bwUplink2 = int(root.find('bwUplink2').text) - tta = int(root.find('tta').text) + tta = float(root.find('tta').text) # Loop over all possible combinations of of the parameters minus two for combination in combinations(self.parameters, len(self.parameters)-2): @@ -120,7 +120,7 @@ class Visualizer: hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas']) hist = hist.T fig, ax = plt.subplots(figsize=(10, 6)) - sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='Purples', cbar_kws={'label': 'Time to block availability'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax) + sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='Purples', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax) plt.xlabel(self.formatLabel(labels[0])) plt.ylabel(self.formatLabel(labels[1])) filename = "" @@ -131,6 +131,8 @@ class Visualizer: filename += f"{key[paramValueCnt]}" formattedTitle = self.formatTitle(key[paramValueCnt]) title += formattedTitle + if (paramValueCnt+1) % 5 == 0: + title += "\n" paramValueCnt += 1 title_obj = plt.title(title) font_size = 16 * fig.get_size_inches()[0] / 10 diff --git a/config_example.py b/config_example.py index 5b1a396..3ff5ae8 100644 --- a/config_example.py +++ b/config_example.py @@ -27,14 +27,14 @@ logLevel = logging.INFO # number of parallel workers. -1: all cores; 1: sequential # for more details, see joblib.Parallel -numJobs = 3 +numJobs = -1 # distribute rows/columns evenly between validators (True) # or generate it using local randomness (False) evenLineDistribution = True # Number of simulation runs with the same parameters for statistical relevance -runs = range(10) +runs = range(2) # Number of validators numberNodes = range(256, 513, 128) @@ -49,14 +49,14 @@ blockSizes = range(32,65,16) netDegrees = range(6, 9, 2) # number of rows and columns a validator is interested in -chis = range(1, 5, 2) +chis = range(2, 5, 2) # ratio of class1 nodes (see below for parameters per class) -class1ratios = np.arange(0, 1, .2) +class1ratios = [0.8, 0.9] # Number of validators per beacon node validatorsPerNode1 = [1] -validatorsPerNode2 = [2, 4, 8, 16, 32] +validatorsPerNode2 = [500] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -64,8 +64,11 @@ bwUplinksProd = [2200] bwUplinks1 = [110] bwUplinks2 = [2200] +# Step duration in miliseconds (Classic RTT is about 100ms) +stepDuration = 50 + # Set to True if you want your run to be deterministic, False if not -deterministic = False +deterministic = True # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" diff --git a/study.py b/study.py index fde8099..aa27a5f 100644 --- a/study.py +++ b/study.py @@ -28,7 +28,7 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) - sim = Simulator(shape, config) + sim = Simulator(shape, config, execID) sim.initLogger() sim.initValidators() sim.initNetwork() From 98423d29c056ee3424d65850be5551d0cf7e9072 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 13:49:01 +0200 Subject: [PATCH 31/43] Typos and sets --- DAS/simulator.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 6ec3bfa..83ba876 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -59,8 +59,8 @@ class Simulator: self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) - assignatedRows = [] - assignatedCols = [] + assignedRows = [] + assignedCols = [] for i in range(self.shape.numberNodes): if self.config.evenLineDistribution: if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes @@ -71,15 +71,13 @@ class Simulator: start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) # Remove duplicates - r = list(dict.fromkeys(rows[start:end])) - c = list(dict.fromkeys(columns[start:end])) - r.sort() - c.sort() + r = set(rows[start:end]) + c = set(columns[start:end]) val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) self.logger.debug("Validators %d row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) self.logger.debug("Validators %d column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) - assignatedRows = assignatedRows + r - assignatedCols = assignatedCols + c + assignedRows = assignedRows + list(r) + assignedCols = assignedCols + list(c) else: val = Validator(i, int(not i!=0), self.logger, self.shape) @@ -89,10 +87,10 @@ class Simulator: val.logIDs() self.validators.append(val) - assignatedRows.sort() - assignatedCols.sort() - self.logger.debug("Rows assignated: %s" % str(assignatedRows), extra=self.format) - self.logger.debug("Columns assignated: %s" % str(assignatedCols), extra=self.format) + assignedRows.sort() + assignedCols.sort() + self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format) + self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format) self.logger.debug("Validators initialized.", extra=self.format) def initNetwork(self): From 4b7bf81cee8b7125facf544d3c1fa402db4e3fda Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 13:52:40 +0200 Subject: [PATCH 32/43] Remove stop condition fix --- DAS/simulator.py | 10 ++++------ config_example.py | 3 --- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 83ba876..b7b7dff 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -273,13 +273,11 @@ class Simulator: }) if missingSamples == oldMissingSamples: - if len(missingVector) > self.config.steps4StopCondition: - if missingSamples == missingVector[len(missingVector)-1-self.config.steps4StopCondition]: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) - if self.config.diagnostics: - self.printDiagnostics() - break + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if self.config.diagnostics: + self.printDiagnostics() missingVector.append(missingSamples) + break elif missingSamples == 0: #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) diff --git a/config_example.py b/config_example.py index d07a899..f799c20 100644 --- a/config_example.py +++ b/config_example.py @@ -77,9 +77,6 @@ randomSeed = "DAS" # If True, print diagnostics when the block is not available diagnostics = False -# Number of steps without progress to stop simulation -steps4StopCondition = 7 - def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): From d2a653b07c637204929473da3ef9ea42a86d9b27 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 14:11:12 +0200 Subject: [PATCH 33/43] Remove plotting config --- DAS/simulator.py | 1 - config_example.py | 4 ---- 2 files changed, 5 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 674c1e7..b0ea19f 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -3,7 +3,6 @@ import networkx as nx import logging, random import pandas as pd -import matplotlib from functools import partial, partialmethod from datetime import datetime from DAS.tools import * diff --git a/config_example.py b/config_example.py index 2187108..e048028 100644 --- a/config_example.py +++ b/config_example.py @@ -22,10 +22,6 @@ dumpXML = 1 # save progress vectors to XML saveProgress = 1 - -# plot progress for each run to PNG -plotProgress = 1 - visualization = 1 logLevel = logging.INFO From ae36844e7f5a66450e64701af01007a8f9732bb7 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 14:26:14 +0200 Subject: [PATCH 34/43] Add stop condition fix --- DAS/simulator.py | 6 ++++-- config_example.py | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index a8d5509..6561ea0 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -229,9 +229,11 @@ class Simulator: }) if missingSamples == oldMissingSamples: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if len(missingVector) > self.config.steps4StopCondition: + if missingSamples == missingVector[len(missingVector)-1-self.config.steps4StopCondition]: + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + break missingVector.append(missingSamples) - break elif missingSamples == 0: self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) diff --git a/config_example.py b/config_example.py index 3ff5ae8..6725280 100644 --- a/config_example.py +++ b/config_example.py @@ -73,6 +73,9 @@ deterministic = True # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" +# Number of steps without progress to stop simulation +steps4StopCondition = 7 + def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): From c4ad3cf80e64a1c14c8ff022cafb2c6180cabe49 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 30 Mar 2023 16:10:10 +0200 Subject: [PATCH 35/43] Small fix --- DAS/simulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 6561ea0..734f0d3 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -230,7 +230,7 @@ class Simulator: if missingSamples == oldMissingSamples: if len(missingVector) > self.config.steps4StopCondition: - if missingSamples == missingVector[len(missingVector)-1-self.config.steps4StopCondition]: + if missingSamples == missingVector[-self.config.steps4StopCondition]: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) break missingVector.append(missingSamples) From 73e02a132db23adeff65efaa235ec7fb97b2e85f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 30 Mar 2023 16:22:00 +0200 Subject: [PATCH 36/43] fix formatting Signed-off-by: Csaba Kiraly --- config_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config_example.py b/config_example.py index 6725280..a17477f 100644 --- a/config_example.py +++ b/config_example.py @@ -75,7 +75,7 @@ randomSeed = "DAS" # Number of steps without progress to stop simulation steps4StopCondition = 7 - + def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): From ea93e9412f297865942f35c5ed6704ca7b89a398 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 13 Apr 2023 15:22:50 +0200 Subject: [PATCH 37/43] Changing the success condition --- DAS/results.py | 4 +++- config_example.py | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/DAS/results.py b/DAS/results.py index 7134ff2..c6a87f1 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -21,7 +21,9 @@ class Result: self.shape = shape self.missingVector = missingVector missingSamples = missingVector[-1] - if missingSamples == 0: + validatorsReady = self.metrics["progress"]["validators ready"][-1] + #print("There are %05.3f%% validators ready" % (validatorsReady*100)) + if validatorsReady > config.successCondition: self.blockAvailable = 1 self.tta = len(missingVector) * (config.stepDuration) else: diff --git a/config_example.py b/config_example.py index 39a0591..7263ca1 100644 --- a/config_example.py +++ b/config_example.py @@ -76,6 +76,9 @@ randomSeed = "DAS" # Number of steps without progress to stop simulation steps4StopCondition = 7 +# Number of validators ready to asume block is available +successCondition = 0.9 + # If True, print diagnostics when the block is not available diagnostics = False From 9855f1b8c4550cb00c40c944b987f56b354b0f1e Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 13 Apr 2023 23:28:40 +0200 Subject: [PATCH 38/43] calcualate tta based on successCondition Signed-off-by: Csaba Kiraly --- DAS/results.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/DAS/results.py b/DAS/results.py index c6a87f1..60cfc81 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -1,6 +1,7 @@ #!/bin/python3 import os +import bisect from xml.dom import minidom from dicttoxml import dicttoxml @@ -20,12 +21,11 @@ class Result: """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector - missingSamples = missingVector[-1] - validatorsReady = self.metrics["progress"]["validators ready"][-1] - #print("There are %05.3f%% validators ready" % (validatorsReady*100)) - if validatorsReady > config.successCondition: + v = self.metrics["progress"]["validators ready"] + tta = bisect.bisect(v, config.successCondition) + if tta != len(v): self.blockAvailable = 1 - self.tta = len(missingVector) * (config.stepDuration) + self.tta = tta * (config.stepDuration) else: self.blockAvailable = 0 self.tta = -1 From b4f3d35f91aa869217936db1b62f3d32e4897278 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 13 Apr 2023 06:01:04 +0200 Subject: [PATCH 39/43] save config and code state for reproducibility Signed-off-by: Csaba Kiraly --- study.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/study.py b/study.py index 74dba63..8d6c088 100644 --- a/study.py +++ b/study.py @@ -2,6 +2,7 @@ import time, sys, random, copy import importlib +import subprocess from joblib import Parallel, delayed from DAS import * @@ -63,6 +64,18 @@ def study(): now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) + # save config and code state for reproducibility + if not os.path.exists("results"): + os.makedirs("results") + dir = "results/"+execID + if not os.path.exists(dir): + os.makedirs(dir) + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) + subprocess.run(["cp", sys.argv[1], dir+"/"]) + logger.info("Starting simulations:", extra=format) start = time.time() results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) From f3a9e4b8fdd941ba403dddeac7244a87d7bf5ea2 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Mon, 17 Apr 2023 10:56:10 +0200 Subject: [PATCH 40/43] Fix corner case when last iteration is equal to success condition --- DAS/results.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/results.py b/DAS/results.py index 60cfc81..76d96d1 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -23,7 +23,7 @@ class Result: self.missingVector = missingVector v = self.metrics["progress"]["validators ready"] tta = bisect.bisect(v, config.successCondition) - if tta != len(v): + if v[-1] >= config.successCondition: self.blockAvailable = 1 self.tta = tta * (config.stepDuration) else: From ebe1a4c87b1f6b37e2ca598bad49c217a67b47ca Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Mon, 17 Apr 2023 12:14:39 +0200 Subject: [PATCH 41/43] Make git save an option for when code is downloaded instead of cloned --- config_example.py | 3 +++ study.py | 9 +++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/config_example.py b/config_example.py index 39a0591..7c2267c 100644 --- a/config_example.py +++ b/config_example.py @@ -79,6 +79,9 @@ steps4StopCondition = 7 # If True, print diagnostics when the block is not available diagnostics = False +# True to save git diff and git commit +saveGit = False + def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): diff --git a/study.py b/study.py index 8d6c088..6fb9340 100644 --- a/study.py +++ b/study.py @@ -70,10 +70,11 @@ def study(): dir = "results/"+execID if not os.path.exists(dir): os.makedirs(dir) - with open(dir+"/git.diff", 'w') as f: - subprocess.run(["git", "diff"], stdout=f) - with open(dir+"/git.describe", 'w') as f: - subprocess.run(["git", "describe", "--always"], stdout=f) + if config.saveGit: + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) subprocess.run(["cp", sys.argv[1], dir+"/"]) logger.info("Starting simulations:", extra=format) From b74ac19557536212d55a4f6f5fe28b70234f5ede Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 10 Apr 2023 22:05:48 +0200 Subject: [PATCH 42/43] fixup: increase step counter while steps4StopCondition Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 6a9db26..34ed8d7 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -283,8 +283,7 @@ class Simulator: self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break - else: - steps += 1 + steps += 1 progress = pd.DataFrame(progressVector) if self.config.saveProgress: From 1638a185071ec93e26a85bd1e1bf61e1064a0e54 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Sun, 9 Apr 2023 01:13:28 +0200 Subject: [PATCH 43/43] make failureRate exact simplify code and make sure failureRate is the exact portion of segments missing, not just a probability per sample. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 1 - DAS/validator.py | 23 ++++------------------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 6a9db26..c61818d 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -210,7 +210,6 @@ class Simulator: def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) - self.validators[self.proposerID].broadcastBlock() arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] diff --git a/DAS/validator.py b/DAS/validator.py index cfa7fac..79d9ea2 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -113,33 +113,18 @@ class Validator: def initBlock(self): """It initializes the block for the proposer.""" - if self.amIproposer == 1: - self.logger.debug("I am a block proposer.", extra=self.format) - self.block = Block(self.shape.blockSize) - self.block.fill() - #self.block.print() - else: - self.logger.warning("I am not a block proposer."% self.ID) - - def broadcastBlock(self): - """The block proposer broadcasts the block to all validators.""" if self.amIproposer == 0: self.logger.warning("I am not a block proposer", extra=self.format) else: - self.logger.debug("Broadcasting my block...", extra=self.format) + self.logger.debug("Creating block...", extra=self.format) order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] - random.shuffle(order) - while(order): - i = order.pop() - if (random.randint(0,99) >= self.shape.failureRate): - self.block.data[i] = 1 - else: - self.block.data[i] = 0 + order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) + for i in order: + self.block.data[i] = 1 nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) - #broadcasted.print() def getColumn(self, index): """It returns a given column."""