From d38b8074f49087130b62dc4107c4e3bf11512fb8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:09:04 +0100 Subject: [PATCH 01/10] use main Signed-off-by: Csaba Kiraly --- study.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/study.py b/study.py index 2b2aab6..2532d13 100644 --- a/study.py +++ b/study.py @@ -63,6 +63,5 @@ def study(): vis = Visualizer(execID) vis.plotHeatmaps() - -study() - +if __name__ == "__main__": + study() From 954d40e75876c0d822b2909c4efbdf5e07ccd276 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:10:13 +0100 Subject: [PATCH 02/10] use new simulator object instead of reset Signed-off-by: Csaba Kiraly # Conflicts: # study.py --- study.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/study.py b/study.py index 2532d13..3b956e8 100644 --- a/study.py +++ b/study.py @@ -12,7 +12,20 @@ from DAS import * # https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls # and https://github.com/joblib/joblib/issues/1017 -def runOnce(sim, config, shape): +def initLogger(config): + """It initializes the logger.""" + logger = logging.getLogger("Study") + logger.setLevel(config.logLevel) + ch = logging.StreamHandler() + ch.setLevel(config.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger + +def runOnce(config, shape): + + sim = Simulator(shape, config) + if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) @@ -40,24 +53,24 @@ def study(): print("You need to pass a configuration file in parameter") exit(1) - shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - sim = Simulator(shape, config) - sim.initLogger() + logger = initLogger(config) + format = {"entity": "Study"} + results = [] now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) - sim.logger.info("Starting simulations:", extra=sim.format) + logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape) for shape in config.nextShape()) end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) if config.dumpXML: for res in results: res.dump(execID) - sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) + logger.info("Results dumped into results/%s/" % (execID), extra=format) if config.visualization: vis = Visualizer(execID) From 952d191ccd018276ddc09b04c380306d45addee5 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:12:08 +0100 Subject: [PATCH 03/10] remove unused golden data Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 +------- DAS/simulator.py | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index af5866a..ee0604b 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -13,18 +13,17 @@ class Observer: self.block = [] self.rows = [] self.columns = [] - self.goldenData = [] self.broadcasted = [] def reset(self): """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize - self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize self.broadcasted = Block(self.config.blockSize) + def checkRowsColumns(self, validators): """It checks how many validators have been assigned to each row and column.""" for val in validators: @@ -39,11 +38,6 @@ class Observer: if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) - def setGoldenData(self, block): - """Stores the original real data to compare it with future situations.""" - for i in range(self.config.blockSize*self.config.blockSize): - self.goldenData[i] = block.data[i] - def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 diff --git a/DAS/simulator.py b/DAS/simulator.py index 30c0d86..900609c 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -55,7 +55,6 @@ class Simulator: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: val.initBlock() - self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) From af124c07558ecb53beef94e2707d2e7cb6792c00 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 10:12:54 +0100 Subject: [PATCH 04/10] simplify code Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 -------- DAS/simulator.py | 1 - 2 files changed, 9 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index ee0604b..88fc8c4 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -10,14 +10,6 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [] - self.rows = [] - self.columns = [] - self.broadcasted = [] - - - def reset(self): - """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize diff --git a/DAS/simulator.py b/DAS/simulator.py index 900609c..bb27a8a 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -27,7 +27,6 @@ class Simulator: def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) - self.glob.reset() self.validators = [] if self.config.evenLineDistribution: From 6e4b37a3d2e41c0da1e827071a1b6e40dbc9bfb4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 11:16:43 +0100 Subject: [PATCH 05/10] adding log level TRACE Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 7 ++++++- DAS/validator.py | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index bb27a8a..0bdbfd6 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,7 @@ import networkx as nx import logging, random +from functools import partial, partialmethod from datetime import datetime from statistics import mean from DAS.tools import * @@ -135,6 +136,11 @@ class Simulator: def initLogger(self): """It initializes the logger.""" + logging.TRACE = 5 + logging.addLevelName(logging.TRACE, 'TRACE') + logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = partial(logging.log, logging.TRACE) + logger = logging.getLogger("DAS") if len(logger.handlers) == 0: logger.setLevel(self.logLevel) @@ -144,7 +150,6 @@ class Simulator: logger.addHandler(ch) self.logger = logger - def resetShape(self, shape): """It resets the parameters of the simulation.""" self.shape = shape diff --git a/DAS/validator.py b/DAS/validator.py index f869171..9a83d42 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -155,12 +155,12 @@ class Validator: if src in self.columnNeighbors[cID]: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) if self.perNodeQueue or self.perNeighborQueue: self.receivedQueue.append((rID, cID)) else: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 @@ -183,7 +183,7 @@ class Validator: if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: - self.logger.debug("Receiving the data...", extra=self.format) + self.logger.trace("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) self.block.merge(self.receivedBlock) @@ -219,7 +219,7 @@ class Validator: def sendSegmentToNeigh(self, rID, cID, neigh): """Send segment to a neighbor (without checks).""" - self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) @@ -454,7 +454,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -472,7 +472,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1) From 122f6a8348c5cb80559ab6c62a8441e15dc3a9af Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 11:29:06 +0100 Subject: [PATCH 06/10] remove resetShape Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 35 ++++++++++++----------------------- study.py | 4 +--- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 0bdbfd6..a6788b8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -25,6 +25,18 @@ class Simulator: self.proposerID = 0 self.glob = [] + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) @@ -150,29 +162,6 @@ class Simulator: logger.addHandler(ch) self.logger = logger - def resetShape(self, shape): - """It resets the parameters of the simulation.""" - self.shape = shape - self.result = Result(self.shape) - for val in self.validators: - val.shape.failureRate = shape.failureRate - val.shape.chi = shape.chi - val.shape.vpn1 = shape.vpn1 - val.shape.vpn2 = shape.vpn2 - - # In GossipSub the initiator might push messages without participating in the mesh. - # proposerPublishOnly regulates this behavior. If set to true, the proposer is not - # part of the p2p distribution graph, only pushes segments to it. If false, the proposer - # might get back segments from other peers since links are symmetric. - self.proposerPublishOnly = True - - # If proposerPublishOnly == True, this regulates how many copies of each segment are - # pushed out by the proposer. - # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) - # self.shape.netDegree: default behavior similar (but not same) to previous code - self.proposerPublishTo = self.shape.netDegree - - def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) diff --git a/study.py b/study.py index 3b956e8..e7e6ce6 100644 --- a/study.py +++ b/study.py @@ -24,14 +24,12 @@ def initLogger(config): def runOnce(config, shape): - sim = Simulator(shape, config) - if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + sim = Simulator(shape, config) sim.initLogger() - sim.resetShape(shape) sim.initValidators() sim.initNetwork() result = sim.run() From ff93161b8df53336e9852904805a68c8f2bd6ada Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 12:04:00 +0100 Subject: [PATCH 07/10] set vpn in validator Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 9a83d42..5ae1cd5 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -66,9 +66,9 @@ class Validator: else: #if shape.deterministic: # random.seed(self.ID) - vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 - self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) - self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) + self.vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) From 894009b4145aa8ec0aee23402acc0937e398ac4d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 08:34:02 +0100 Subject: [PATCH 08/10] Validator node: add nodeClass property Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5ae1cd5..49165a0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,12 +61,14 @@ class Validator: self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: + self.nodeClass = 0 self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: #if shape.deterministic: # random.seed(self.ID) - self.vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) @@ -83,7 +85,7 @@ class Validator: # TODO: this should be a parameter if self.amIproposer: self.bwUplink = shape.bwUplinkProd - elif self.ID <= shape.numberNodes * shape.class1ratio: + elif self.nodeClass == 1: self.bwUplink = shape.bwUplink1 else: self.bwUplink = shape.bwUplink2 From ec6ed7c1e8eda838f73072cc9f98d337f31c16e4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 14:53:48 +0100 Subject: [PATCH 09/10] dump results to XML after each run Signed-off-by: Csaba Kiraly --- study.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/study.py b/study.py index e7e6ce6..fde8099 100644 --- a/study.py +++ b/study.py @@ -22,7 +22,7 @@ def initLogger(config): logger.addHandler(ch) return logger -def runOnce(config, shape): +def runOnce(config, shape, execID): if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) @@ -34,6 +34,10 @@ def runOnce(config, shape): sim.initNetwork() result = sim.run() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + + if config.dumpXML: + result.dump(execID) + return result def study(): @@ -61,15 +65,10 @@ def study(): logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(config, shape) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) end = time.time() logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) - if config.dumpXML: - for res in results: - res.dump(execID) - logger.info("Results dumped into results/%s/" % (execID), extra=format) - if config.visualization: vis = Visualizer(execID) vis.plotHeatmaps() From 3795948564e96c78fe95499528239e5f22e01aaf Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Sat, 25 Mar 2023 10:27:23 +0100 Subject: [PATCH 10/10] fix requirements.txt --- DAS/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/requirements.txt b/DAS/requirements.txt index da7dcc7..f8c9fe4 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,7 +1,7 @@ bitarray==2.6.0 -DAS==0.29.0 dicttoxml==1.7.16 matplotlib==3.6.2 networkx==3.0 numpy==1.23.5 seaborn==0.12.2 +joblib==1.2.0