diff --git a/DAS/observer.py b/DAS/observer.py index af5866a..88fc8c4 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -10,21 +10,12 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [] - self.rows = [] - self.columns = [] - self.goldenData = [] - self.broadcasted = [] - - - def reset(self): - """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize - self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize self.broadcasted = Block(self.config.blockSize) + def checkRowsColumns(self, validators): """It checks how many validators have been assigned to each row and column.""" for val in validators: @@ -39,11 +30,6 @@ class Observer: if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) - def setGoldenData(self, block): - """Stores the original real data to compare it with future situations.""" - for i in range(self.config.blockSize*self.config.blockSize): - self.goldenData[i] = block.data[i] - def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 diff --git a/DAS/requirements.txt b/DAS/requirements.txt index da7dcc7..f8c9fe4 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,7 +1,7 @@ bitarray==2.6.0 -DAS==0.29.0 dicttoxml==1.7.16 matplotlib==3.6.2 networkx==3.0 numpy==1.23.5 seaborn==0.12.2 +joblib==1.2.0 diff --git a/DAS/simulator.py b/DAS/simulator.py index 30c0d86..a6788b8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,7 @@ import networkx as nx import logging, random +from functools import partial, partialmethod from datetime import datetime from statistics import mean from DAS.tools import * @@ -24,10 +25,21 @@ class Simulator: self.proposerID = 0 self.glob = [] + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) - self.glob.reset() self.validators = [] if self.config.evenLineDistribution: @@ -55,7 +67,6 @@ class Simulator: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: val.initBlock() - self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) @@ -137,6 +148,11 @@ class Simulator: def initLogger(self): """It initializes the logger.""" + logging.TRACE = 5 + logging.addLevelName(logging.TRACE, 'TRACE') + logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = partial(logging.log, logging.TRACE) + logger = logging.getLogger("DAS") if len(logger.handlers) == 0: logger.setLevel(self.logLevel) @@ -146,30 +162,6 @@ class Simulator: logger.addHandler(ch) self.logger = logger - - def resetShape(self, shape): - """It resets the parameters of the simulation.""" - self.shape = shape - self.result = Result(self.shape) - for val in self.validators: - val.shape.failureRate = shape.failureRate - val.shape.chi = shape.chi - val.shape.vpn1 = shape.vpn1 - val.shape.vpn2 = shape.vpn2 - - # In GossipSub the initiator might push messages without participating in the mesh. - # proposerPublishOnly regulates this behavior. If set to true, the proposer is not - # part of the p2p distribution graph, only pushes segments to it. If false, the proposer - # might get back segments from other peers since links are symmetric. - self.proposerPublishOnly = True - - # If proposerPublishOnly == True, this regulates how many copies of each segment are - # pushed out by the proposer. - # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) - # self.shape.netDegree: default behavior similar (but not same) to previous code - self.proposerPublishTo = self.shape.netDegree - - def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) diff --git a/DAS/validator.py b/DAS/validator.py index f869171..49165a0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,14 +61,16 @@ class Validator: self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: + self.nodeClass = 0 self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: #if shape.deterministic: # random.seed(self.ID) - vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 - self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) - self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -83,7 +85,7 @@ class Validator: # TODO: this should be a parameter if self.amIproposer: self.bwUplink = shape.bwUplinkProd - elif self.ID <= shape.numberNodes * shape.class1ratio: + elif self.nodeClass == 1: self.bwUplink = shape.bwUplink1 else: self.bwUplink = shape.bwUplink2 @@ -155,12 +157,12 @@ class Validator: if src in self.columnNeighbors[cID]: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) if self.perNodeQueue or self.perNeighborQueue: self.receivedQueue.append((rID, cID)) else: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 @@ -183,7 +185,7 @@ class Validator: if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: - self.logger.debug("Receiving the data...", extra=self.format) + self.logger.trace("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) self.block.merge(self.receivedBlock) @@ -219,7 +221,7 @@ class Validator: def sendSegmentToNeigh(self, rID, cID, neigh): """Send segment to a neighbor (without checks).""" - self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) @@ -454,7 +456,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -472,7 +474,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1) diff --git a/study.py b/study.py index 2b2aab6..fde8099 100644 --- a/study.py +++ b/study.py @@ -12,17 +12,32 @@ from DAS import * # https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls # and https://github.com/joblib/joblib/issues/1017 -def runOnce(sim, config, shape): +def initLogger(config): + """It initializes the logger.""" + logger = logging.getLogger("Study") + logger.setLevel(config.logLevel) + ch = logging.StreamHandler() + ch.setLevel(config.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger + +def runOnce(config, shape, execID): + if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + sim = Simulator(shape, config) sim.initLogger() - sim.resetShape(shape) sim.initValidators() sim.initNetwork() result = sim.run() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + + if config.dumpXML: + result.dump(execID) + return result def study(): @@ -40,29 +55,23 @@ def study(): print("You need to pass a configuration file in parameter") exit(1) - shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - sim = Simulator(shape, config) - sim.initLogger() + logger = initLogger(config) + format = {"entity": "Study"} + results = [] now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) - sim.logger.info("Starting simulations:", extra=sim.format) + logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) - - if config.dumpXML: - for res in results: - res.dump(execID) - sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) if config.visualization: vis = Visualizer(execID) vis.plotHeatmaps() - -study() - +if __name__ == "__main__": + study()