diff --git a/DAS/observer.py b/DAS/observer.py index af5866a..235ed60 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,5 +1,6 @@ #!/bin/python3 +import numpy as np from DAS.block import * class Observer: @@ -10,21 +11,12 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [] - self.rows = [] - self.columns = [] - self.goldenData = [] - self.broadcasted = [] - - - def reset(self): - """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize - self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize self.broadcasted = Block(self.config.blockSize) + def checkRowsColumns(self, validators): """It checks how many validators have been assigned to each row and column.""" for val in validators: @@ -39,11 +31,6 @@ class Observer: if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) - def setGoldenData(self, block): - """Stores the original real data to compare it with future situations.""" - for i in range(self.config.blockSize*self.config.blockSize): - self.goldenData[i] = block.data[i] - def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 @@ -58,9 +45,55 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + ready = 0 + validated = 0 for val in validators: if val.amIproposer == 0: (a, e) = val.checkStatus() arrived += a expected += e - return (arrived, expected) + if a == e: + ready += 1 + validated += val.vpn + return (arrived, expected, ready, validated) + + def getProgress(self, validators): + """Calculate current simulation progress with different metrics. + + Returns: + - missingSamples: overall number of sample instances missing in nodes. + Sample are counted on both rows and columns, so intersections of interest are counted twice. + - sampleProgress: previous expressed as progress ratio + - nodeProgress: ratio of nodes having all segments interested in + - validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator, + but counts a validator only if its support node's all validators see all interesting segments + TODO: add real per validator progress counter + """ + arrived, expected, ready, validated = self.checkStatus(validators) + missingSamples = expected - arrived + sampleProgress = arrived / expected + nodeProgress = ready / (len(validators)-1) + validatorCnt = sum([v.vpn for v in validators[1:]]) + validatorProgress = validated / validatorCnt + + return missingSamples, sampleProgress, nodeProgress, validatorProgress + + def getTrafficStats(self, validators): + """Summary statistics of traffic measurements in a timestep.""" + def maxOrNan(l): + return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN + + trafficStats = {} + for cl in range(0,3): + Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl] + Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] + RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] + trafficStats[cl] = { + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, + } + + return trafficStats diff --git a/DAS/requirements.txt b/DAS/requirements.txt index 83eca18..76b14c7 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,8 +1,8 @@ bitarray==2.6.0 -DAS==0.30.0 dicttoxml==1.7.16 matplotlib==3.6.2 mplfinance==0.12.9b7 networkx==3.0 numpy==1.23.5 seaborn==0.12.2 +joblib==1.2.0 diff --git a/DAS/results.py b/DAS/results.py index 0efe26d..76d96d1 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -1,37 +1,45 @@ #!/bin/python3 import os +import bisect from xml.dom import minidom from dicttoxml import dicttoxml class Result: """This class stores and process/store the results of a simulation.""" - def __init__(self, shape): + def __init__(self, shape, execID): """It initializes the instance with a specific shape.""" self.shape = shape + self.execID = execID self.blockAvailable = -1 self.tta = -1 self.missingVector = [] + self.metrics = {} - def populate(self, shape, missingVector): + def populate(self, shape, config, missingVector): """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector - missingSamples = missingVector[-1] - if missingSamples == 0: + v = self.metrics["progress"]["validators ready"] + tta = bisect.bisect(v, config.successCondition) + if v[-1] >= config.successCondition: self.blockAvailable = 1 - self.tta = len(missingVector) + self.tta = tta * (config.stepDuration) else: self.blockAvailable = 0 self.tta = -1 - def dump(self, execID): + def addMetric(self, name, metric): + """Generic function to add a metric to the results.""" + self.metrics[name] = metric + + def dump(self): """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): os.makedirs("results") - if not os.path.exists("results/"+execID): - os.makedirs("results/"+execID) + if not os.path.exists("results/"+self.execID): + os.makedirs("results/"+self.execID) resd1 = self.shape.__dict__ resd2 = self.__dict__.copy() resd2.pop("shape") @@ -39,6 +47,6 @@ class Result: resXml = dicttoxml(resd1) xmlstr = minidom.parseString(resXml) xmlPretty = xmlstr.toprettyxml() - filePath = "results/"+execID+"/"+str(self.shape)+".xml" + filePath = "results/"+self.execID+"/"+str(self.shape)+".xml" with open(filePath, "w") as f: f.write(xmlPretty) diff --git a/DAS/simulator.py b/DAS/simulator.py index 30c0d86..8bebd89 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,8 +2,9 @@ import networkx as nx import logging, random +import pandas as pd +from functools import partial, partialmethod from datetime import datetime -from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -12,33 +13,54 @@ from DAS.validator import * class Simulator: """This class implements the main DAS simulator.""" - def __init__(self, shape, config): + def __init__(self, shape, config, execID): """It initializes the simulation with a set of parameters (shape).""" self.shape = shape self.config = config self.format = {"entity": "Simulator"} - self.result = Result(self.shape) + self.execID = execID + self.result = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel self.proposerID = 0 self.glob = [] + self.execID = execID + + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) - self.glob.reset() self.validators = [] if self.config.evenLineDistribution: lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) totalValidators = lightVal + heavyVal - rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) - columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + totalRows = totalValidators * self.shape.chi + rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) offset = heavyVal*self.shape.chi random.shuffle(rows) random.shuffle(columns) + self.logger.debug("There is a total of %d validators" % totalValidators, extra=self.format) + self.logger.debug("Shuffling a total of %d rows/columns" % len(rows), extra=self.format) + self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) + self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) + + assignedRows = [] + assignedCols = [] for i in range(self.shape.numberNodes): if self.config.evenLineDistribution: if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes @@ -48,17 +70,26 @@ class Simulator: j = i - int(heavyVal/self.shape.vpn2) start = offset+( j *self.shape.chi) end = offset+((j+1)*self.shape.chi) - r = rows[start:end] - c = columns[start:end] + r = set(rows[start:end]) + c = set(columns[start:end]) val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) + self.logger.debug("Validators %d row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) + self.logger.debug("Validators %d column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) + assignedRows = assignedRows + list(r) + assignedCols = assignedCols + list(c) + else: val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: val.initBlock() - self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) + + assignedRows.sort() + assignedCols.sort() + self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format) + self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format) self.logger.debug("Validators initialized.", extra=self.format) def initNetwork(self): @@ -73,12 +104,14 @@ class Simulator: columnChannels[id].append(v) # Check rows/columns distribution - #totalR = 0 - #totalC = 0 - #for r in rowChannels: - # totalR += len(r) - #for c in columnChannels: - # totalC += len(c) + distR = [] + distC = [] + for r in rowChannels: + distR.append(len(r)) + for c in columnChannels: + distC.append(len(c)) + self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(distR), max(distR)), extra=self.format) + self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(distC), max(distC)), extra=self.format) for id in range(self.shape.blockSize): @@ -137,6 +170,11 @@ class Simulator: def initLogger(self): """It initializes the logger.""" + logging.TRACE = 5 + logging.addLevelName(logging.TRACE, 'TRACE') + logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = partial(logging.log, logging.TRACE) + logger = logging.getLogger("DAS") if len(logger.handlers) == 0: logger.setLevel(self.logLevel) @@ -146,37 +184,37 @@ class Simulator: logger.addHandler(ch) self.logger = logger - - def resetShape(self, shape): - """It resets the parameters of the simulation.""" - self.shape = shape - self.result = Result(self.shape) + def printDiagnostics(self): + """Print all required diagnostics to check when a block does not become available""" for val in self.validators: - val.shape.failureRate = shape.failureRate - val.shape.chi = shape.chi - val.shape.vpn1 = shape.vpn1 - val.shape.vpn2 = shape.vpn2 - - # In GossipSub the initiator might push messages without participating in the mesh. - # proposerPublishOnly regulates this behavior. If set to true, the proposer is not - # part of the p2p distribution graph, only pushes segments to it. If false, the proposer - # might get back segments from other peers since links are symmetric. - self.proposerPublishOnly = True - - # If proposerPublishOnly == True, this regulates how many copies of each segment are - # pushed out by the proposer. - # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) - # self.shape.netDegree: default behavior similar (but not same) to previous code - self.proposerPublishTo = self.shape.netDegree - + (a, e) = val.checkStatus() + if e-a > 0 and val.ID != 0: + self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format) + for r in val.rowIDs: + row = val.getRow(r) + if row.count() < len(row): + self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format) + neiR = val.rowNeighbors[r] + for nr in neiR: + self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format) + self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format) + for c in val.columnIDs: + col = val.getColumn(c) + if col.count() < len(col): + self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format) + neiC = val.columnNeighbors[c] + for nc in neiC: + self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) + self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) def run(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) - self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + progressVector = [] + trafficStatsVector = [] steps = 0 while(True): missingVector.append(missingSamples) @@ -197,30 +235,58 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - statsTxInSlot = [v.statsTxInSlot for v in self.validators] - statsRxInSlot = [v.statsRxInSlot for v in self.validators] - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, statsTxInSlot[0], statsRxInSlot[0], - mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), - mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + trafficStats = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: %s" % + (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() + trafficStatsVector.append(trafficStats) + + missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) + + cnS = "samples received" + cnN = "nodes ready" + cnV = "validators ready" + cnT0 = "TX builder mean" + cnT1 = "TX class1 mean" + cnT2 = "TX class2 mean" + cnR1 = "RX class1 mean" + cnR2 = "RX class2 mean" + cnD1 = "Dup class1 mean" + cnD2 = "Dup class2 mean" + + progressVector.append({ + cnS:sampleProgress, + cnN:nodeProgress, + cnV:validatorProgress, + cnT0: trafficStats[0]["Tx"]["mean"], + cnT1: trafficStats[1]["Tx"]["mean"], + cnT2: trafficStats[2]["Tx"]["mean"], + cnR1: trafficStats[1]["Rx"]["mean"], + cnR2: trafficStats[2]["Rx"]["mean"], + cnD1: trafficStats[1]["RxDup"]["mean"], + cnD2: trafficStats[2]["RxDup"]["mean"], + }) - arrived, expected = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) if missingSamples == oldMissingSamples: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if len(missingVector) > self.config.steps4StopCondition: + if missingSamples == missingVector[-self.config.steps4StopCondition]: + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if self.config.diagnostics: + self.printDiagnostics() + break missingVector.append(missingSamples) - break elif missingSamples == 0: - #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) + self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break - else: - steps += 1 + steps += 1 - self.result.populate(self.shape, missingVector) + progress = pd.DataFrame(progressVector) + if self.config.saveProgress: + self.result.addMetric("progress", progress.to_dict(orient='list')) + self.result.populate(self.shape, self.config, missingVector) return self.result diff --git a/DAS/validator.py b/DAS/validator.py index f869171..79d9ea2 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,14 +61,16 @@ class Validator: self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: + self.nodeClass = 0 self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: #if shape.deterministic: # random.seed(self.ID) - vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 - self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) - self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -77,13 +79,15 @@ class Validator: self.statsTxPerSlot = [] self.statsRxInSlot = 0 self.statsRxPerSlot = [] + self.statsRxDupInSlot = 0 + self.statsRxDupPerSlot = [] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 # TODO: this should be a parameter if self.amIproposer: self.bwUplink = shape.bwUplinkProd - elif self.ID <= shape.numberNodes * shape.class1ratio: + elif self.nodeClass == 1: self.bwUplink = shape.bwUplink1 else: self.bwUplink = shape.bwUplink2 @@ -109,33 +113,18 @@ class Validator: def initBlock(self): """It initializes the block for the proposer.""" - if self.amIproposer == 1: - self.logger.debug("I am a block proposer.", extra=self.format) - self.block = Block(self.shape.blockSize) - self.block.fill() - #self.block.print() - else: - self.logger.warning("I am not a block proposer."% self.ID) - - def broadcastBlock(self): - """The block proposer broadcasts the block to all validators.""" if self.amIproposer == 0: self.logger.warning("I am not a block proposer", extra=self.format) else: - self.logger.debug("Broadcasting my block...", extra=self.format) + self.logger.debug("Creating block...", extra=self.format) order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] - random.shuffle(order) - while(order): - i = order.pop() - if (random.randint(0,99) >= self.shape.failureRate): - self.block.data[i] = 1 - else: - self.block.data[i] = 0 + order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) + for i in order: + self.block.data[i] = 1 nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) - #broadcasted.print() def getColumn(self, index): """It returns a given column.""" @@ -155,13 +144,13 @@ class Validator: if src in self.columnNeighbors[cID]: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) if self.perNodeQueue or self.perNeighborQueue: self.receivedQueue.append((rID, cID)) else: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) - # self.statsRxDuplicateInSlot += 1 + self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): @@ -183,7 +172,7 @@ class Validator: if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: - self.logger.debug("Receiving the data...", extra=self.format) + self.logger.trace("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) self.block.merge(self.receivedBlock) @@ -203,8 +192,10 @@ class Validator: """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsRxDupPerSlot.append(self.statsRxDupInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 + self.statsRxDupInSlot = 0 self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): @@ -219,7 +210,7 @@ class Validator: def sendSegmentToNeigh(self, rID, cID, neigh): """Send segment to a neighbor (without checks).""" - self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) @@ -454,7 +445,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.logger.trace("Rep: %d,%d", id, i, extra=self.format) self.addToSendQueue(id, i) # self.statsRepairInSlot += rep.count(1) @@ -472,7 +463,7 @@ class Validator: # be queued after successful repair. for i in range(len(rep)): if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.logger.trace("Rep: %d,%d", i, id, extra=self.format) self.addToSendQueue(i, id) # self.statsRepairInSlot += rep.count(1) diff --git a/DAS/visualizer.py b/DAS/visualizer.py index 621f579..cf095c1 100644 --- a/DAS/visualizer.py +++ b/DAS/visualizer.py @@ -18,7 +18,7 @@ class Visualizer: self.folderPath = "results/"+self.execID self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] self.minimumDataPoints = 2 - self.maxTTA = 50 + self.maxTTA = 11000 def plottingData(self): """Store data with a unique key for each params combination""" @@ -43,7 +43,7 @@ class Visualizer: bwUplinkProd = int(root.find('bwUplinkProd').text) bwUplink1 = int(root.find('bwUplink1').text) bwUplink2 = int(root.find('bwUplink2').text) - tta = int(root.find('tta').text) + tta = float(root.find('tta').text) """Store BW""" bw.append(bwUplinkProd) @@ -182,7 +182,7 @@ class Visualizer: if(len(self.config.runs) > 1): data = self.averageRuns(data, len(self.config.runs)) filteredKeys = self.similarKeys(data) - vmin, vmax = 0, self.maxTTA+10 + vmin, vmax = 0, self.maxTTA+1000 print("Plotting heatmaps...") """Create the directory if it doesn't exist already""" @@ -200,7 +200,7 @@ class Visualizer: hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas']) hist = hist.T fig, ax = plt.subplots(figsize=(10, 6)) - sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax) + sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax) plt.xlabel(self.formatLabel(labels[0])) plt.ylabel(self.formatLabel(labels[1])) filename = "" @@ -209,10 +209,12 @@ class Visualizer: for param in self.parameters: if param != labels[0] and param != labels[1] and param != 'run': filename += f"{key[paramValueCnt]}" - #formattedTitle = self.formatTitle(key[paramValueCnt]) - formattedTitle = "Time to block availability" + formattedTitle = self.formatTitle(key[paramValueCnt]) title += formattedTitle + if (paramValueCnt+1) % 5 == 0: + title += "\n" paramValueCnt += 1 + title = "Time to Block Availability (ms)" title_obj = plt.title(title) font_size = 16 * fig.get_size_inches()[0] / 10 title_obj.set_fontsize(font_size) diff --git a/smallConf.py b/smallConf.py index 5711380..1e4b3a8 100644 --- a/smallConf.py +++ b/smallConf.py @@ -18,6 +18,7 @@ import itertools import numpy as np from DAS.shape import Shape +# Dump results into XML files dumpXML = 1 # save progress vectors to XML @@ -26,7 +27,13 @@ saveProgress = 1 # plot progress for each run to PNG plotProgress = 1 +# Save row and column distributions +saveRCdist = 1 + +# Plot all figures visualization = 1 + +# Verbosity level logLevel = logging.INFO # number of parallel workers. -1: all cores; 1: sequential @@ -59,8 +66,8 @@ chis = range(2, 3, 2) class1ratios = [0.8] # Number of validators per beacon node -validatorsPerNode1 = [2] -validatorsPerNode2 = [4] +validatorsPerNode1 = [1] +validatorsPerNode2 = [500] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -68,26 +75,27 @@ bwUplinksProd = [2200] bwUplinks1 = [110] bwUplinks2 = [2200] +# Step duration in miliseconds (Classic RTT is about 100ms) +stepDuration = 50 + # Set to True if you want your run to be deterministic, False if not deterministic = True # If your run is deterministic you can decide the random seed. This is ignore otherwise. randomSeed = "DAS" -saveProgress = 1 -saveRCdist = 1 +# Number of steps without progress to stop simulation +steps4StopCondition = 7 + +# Number of validators ready to asume block is available +successCondition = 0.9 # If True, print diagnostics when the block is not available diagnostics = False -# Number of steps without progress to stop simulation -steps4StopCondition = 7 - +# True to save git diff and git commit saveGit = False -successCondition = 0.9 -stepDuration = 50 - def nextShape(): for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): diff --git a/study.py b/study.py index a15e7b2..badb7f3 100644 --- a/study.py +++ b/study.py @@ -2,6 +2,7 @@ import time, sys, random, copy import importlib +import subprocess from joblib import Parallel, delayed from DAS import * @@ -12,17 +13,32 @@ from DAS import * # https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls # and https://github.com/joblib/joblib/issues/1017 -def runOnce(sim, config, shape): +def initLogger(config): + """It initializes the logger.""" + logger = logging.getLogger("Study") + logger.setLevel(config.logLevel) + ch = logging.StreamHandler() + ch.setLevel(config.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger + +def runOnce(config, shape, execID): + if config.deterministic: shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + sim = Simulator(shape, config, execID) sim.initLogger() - sim.resetShape(shape) sim.initValidators() sim.initNetwork() result = sim.run() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + + if config.dumpXML: + result.dump() + return result def study(): @@ -40,29 +56,36 @@ def study(): print("You need to pass a configuration file in parameter") exit(1) - shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - sim = Simulator(shape, config) - sim.initLogger() + logger = initLogger(config) + format = {"entity": "Study"} + results = [] now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) - sim.logger.info("Starting simulations:", extra=sim.format) - start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) - end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) + # save config and code state for reproducibility + if not os.path.exists("results"): + os.makedirs("results") + dir = "results/"+execID + if not os.path.exists(dir): + os.makedirs(dir) + if config.saveGit: + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) + subprocess.run(["cp", sys.argv[1], dir+"/"]) - if config.dumpXML: - for res in results: - res.dump(execID) - sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) + logger.info("Starting simulations:", extra=format) + start = time.time() + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) + end = time.time() + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) if config.visualization: vis = Visualizer(execID, config) vis.plotHeatmaps() - -study() - +if __name__ == "__main__": + study()