diff --git a/.gitignore b/.gitignore index b948985..492044a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ *.swp *.pyc +results/* +myenv +doc/_build +!results/plots.py +Frontend/ diff --git a/DAS/__init__.py b/DAS/__init__.py index 8dd11bf..67af3ae 100644 --- a/DAS/__init__.py +++ b/DAS/__init__.py @@ -1,3 +1,3 @@ from DAS.simulator import * -from DAS.configuration import * from DAS.shape import * +from DAS.visualizer import * diff --git a/DAS/block.py b/DAS/block.py index aa5fe01..f76a944 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -5,43 +5,73 @@ from bitarray import bitarray from bitarray.util import zeros class Block: - - blockSize = 0 - data = bitarray() + """This class represents a block in the Ethereum blockchain.""" def __init__(self, blockSize): + """Initialize the block with a data array of blocksize^2 zeros.""" self.blockSize = blockSize self.data = zeros(self.blockSize*self.blockSize) def fill(self): + """It fills the block data with ones.""" self.data.setall(1) def merge(self, merged): + """It merges (OR) the existing block with the received one.""" self.data |= merged.data + def getSegment(self, rowID, columnID): + """Check whether a segment is included""" + return self.data[rowID*self.blockSize + columnID] + + def setSegment(self, rowID, columnID, value = 1): + """Set value for a segment (default 1)""" + self.data[rowID*self.blockSize + columnID] = value + def getColumn(self, columnID): + """It returns the block column corresponding to columnID.""" return self.data[columnID::self.blockSize] def mergeColumn(self, columnID, column): + """It merges (OR) the existing column with the received one.""" self.data[columnID::self.blockSize] |= column def repairColumn(self, id): - success = self.data[id::self.blockSize].count(1) + """It repairs the entire column if it has at least blockSize/2 ones. + Returns: list of repaired segments + """ + line = self.data[id::self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id::self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def getRow(self, rowID): + """It returns the block row corresponding to rowID.""" return self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] def mergeRow(self, rowID, row): + """It merges (OR) the existing row with the received one.""" self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row def repairRow(self, id): - success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1) + """It repairs the entire row if it has at least blockSize/2 ones. + Returns: list of repaired segments. + """ + line = self.data[id*self.blockSize:(id+1)*self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id*self.blockSize:(id+1)*self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def print(self): + """It prints the block in the terminal (outside of the logger rules)).""" dash = "-" * (self.blockSize+2) print(dash) for i in range(self.blockSize): diff --git a/DAS/configuration.py b/DAS/configuration.py deleted file mode 100644 index 91df16f..0000000 --- a/DAS/configuration.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/python3 - -import configparser - -class Configuration: - - deterministic = 0 - - def __init__(self, fileName): - - config = configparser.RawConfigParser() - config.read(fileName) - - self.nvStart = int(config.get("Simulation Space", "numberValidatorStart")) - self.nvStop = int(config.get("Simulation Space", "numberValidatorStop")) - self.nvStep = int(config.get("Simulation Space", "numberValidatorStep")) - - self.blockSizeStart = int(config.get("Simulation Space", "blockSizeStart")) - self.blockSizeStop = int(config.get("Simulation Space", "blockSizeStop")) - self.blockSizeStep = int(config.get("Simulation Space", "blockSizeStep")) - - self.netDegreeStart = int(config.get("Simulation Space", "netDegreeStart")) - self.netDegreeStop = int(config.get("Simulation Space", "netDegreeStop")) - self.netDegreeStep = int(config.get("Simulation Space", "netDegreeStep")) - - self.failureRateStart = int(config.get("Simulation Space", "failureRateStart")) - self.failureRateStop = int(config.get("Simulation Space", "failureRateStop")) - self.failureRateStep = int(config.get("Simulation Space", "failureRateStep")) - - self.chiStart = int(config.get("Simulation Space", "chiStart")) - self.chiStop = int(config.get("Simulation Space", "chiStop")) - self.chiStep = int(config.get("Simulation Space", "chiStep")) - - self.numberRuns = int(config.get("Advanced", "numberRuns")) - self.deterministic = config.get("Advanced", "deterministic") - - if self.nvStop < (self.blockSizeStart*4): - print("ERROR: The number of validators cannot be lower than the block size * 4") - exit(1) - if self.chiStart < 1: - print("Chi has to be greater than 0") - exit(1) - if self.chiStop > self.blockSizeStart: - print("Chi (%d) has to be smaller or equal to block the size (%d)" % (self.chiStop, self.blockSizeStart)) - exit(1) - - - diff --git a/DAS/observer.py b/DAS/observer.py index ad9052b..beba4ad 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,30 +1,24 @@ #!/bin/python3 +import numpy as np from DAS.block import * class Observer: - - block = [] - rows = [] - columns = [] - goldenData = [] - broadcasted = [] - config = [] - logger = [] + """This class gathers global data from the simulation, like an 'all-seen god'.""" def __init__(self, logger, config): + """It initializes the observer with a logger and given configuration.""" self.config = config self.format = {"entity": "Observer"} self.logger = logger - - def reset(self): self.block = [0] * self.config.blockSize * self.config.blockSize - self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize self.broadcasted = Block(self.config.blockSize) + def checkRowsColumns(self, validators): + """It checks how many validators have been assigned to each row and column.""" for val in validators: if val.amIproposer == 0: for r in val.rowIDs: @@ -37,11 +31,8 @@ class Observer: if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) - def setGoldenData(self, block): - for i in range(self.config.blockSize*self.config.blockSize): - self.goldenData[i] = block.data[i] - def checkBroadcasted(self): + """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 for i in range(self.blockSize * self.blockSize): if self.broadcasted.data[i] == 0: @@ -51,11 +42,61 @@ class Observer: return zeros def checkStatus(self, validators): + """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + ready = 0 + validatedall = 0 + validated = 0 for val in validators: if val.amIproposer == 0: - (a, e) = val.checkStatus() + (a, e, v) = val.checkStatus() arrived += a expected += e - return (arrived, expected) + if a == e: + ready += 1 + validatedall += val.vpn + validated += v + return (arrived, expected, ready, validatedall, validated) + + def getProgress(self, validators): + """Calculate current simulation progress with different metrics. + + Returns: + - missingSamples: overall number of sample instances missing in nodes. + Sample are counted on both rows and columns, so intersections of interest are counted twice. + - sampleProgress: previous expressed as progress ratio + - nodeProgress: ratio of nodes having all segments interested in + - validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator, + but counts a validator only if its support node's all validators see all interesting segments + TODO: add real per validator progress counter + """ + arrived, expected, ready, validatedall, validated = self.checkStatus(validators) + missingSamples = expected - arrived + sampleProgress = arrived / expected + nodeProgress = ready / (len(validators)-1) + validatorCnt = sum([v.vpn for v in validators[1:]]) + validatorAllProgress = validatedall / validatorCnt + validatorProgress = validated / validatorCnt + + return missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress + + def getTrafficStats(self, validators): + """Summary statistics of traffic measurements in a timestep.""" + def maxOrNan(l): + return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN + + trafficStats = {} + for cl in range(0,3): + Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl] + Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] + RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] + trafficStats[cl] = { + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, + } + + return trafficStats diff --git a/DAS/requirements.txt b/DAS/requirements.txt index d0bb457..76b14c7 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,3 +1,8 @@ bitarray==2.6.0 -DAS==0.28.7 +dicttoxml==1.7.16 +matplotlib==3.6.2 +mplfinance==0.12.9b7 networkx==3.0 +numpy==1.23.5 +seaborn==0.12.2 +joblib==1.2.0 diff --git a/DAS/results.py b/DAS/results.py index 8468b09..76d96d1 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -1,17 +1,52 @@ #!/bin/python3 +import os +import bisect +from xml.dom import minidom +from dicttoxml import dicttoxml class Result: + """This class stores and process/store the results of a simulation.""" - config = [] - missingVector = [] - blockAvailable = -1 - - def __init__(self, config): - self.config = config + def __init__(self, shape, execID): + """It initializes the instance with a specific shape.""" + self.shape = shape + self.execID = execID self.blockAvailable = -1 + self.tta = -1 self.missingVector = [] + self.metrics = {} - - def addMissing(self, missingVector): + def populate(self, shape, config, missingVector): + """It populates part of the result data inside a vector.""" + self.shape = shape self.missingVector = missingVector + v = self.metrics["progress"]["validators ready"] + tta = bisect.bisect(v, config.successCondition) + if v[-1] >= config.successCondition: + self.blockAvailable = 1 + self.tta = tta * (config.stepDuration) + else: + self.blockAvailable = 0 + self.tta = -1 + + def addMetric(self, name, metric): + """Generic function to add a metric to the results.""" + self.metrics[name] = metric + + def dump(self): + """It dumps the results of the simulation in an XML file.""" + if not os.path.exists("results"): + os.makedirs("results") + if not os.path.exists("results/"+self.execID): + os.makedirs("results/"+self.execID) + resd1 = self.shape.__dict__ + resd2 = self.__dict__.copy() + resd2.pop("shape") + resd1.update(resd2) + resXml = dicttoxml(resd1) + xmlstr = minidom.parseString(resXml) + xmlPretty = xmlstr.toprettyxml() + filePath = "results/"+self.execID+"/"+str(self.shape)+".xml" + with open(filePath, "w") as f: + f.write(xmlPretty) diff --git a/DAS/shape.py b/DAS/shape.py index 2918422..9f6d573 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -1,19 +1,44 @@ #!/bin/python3 class Shape: - numberValidators = 0 - failureRate = 0 - blockSize = 0 - netDegree = 0 - chi = 0 + """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberValidators, failureRate, chi, netDegree): - self.numberValidators = numberValidators - self.failureRate = failureRate + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + """Initializes the shape with the parameters passed in argument.""" + self.run = run + self.numberNodes = numberNodes self.blockSize = blockSize + self.failureModel = failureModel + self.failureRate = failureRate self.netDegree = netDegree + self.class1ratio = class1ratio self.chi = chi + self.vpn1 = vpn1 + self.vpn2 = vpn2 + self.bwUplinkProd = bwUplinkProd + self.bwUplink1 = bwUplink1 + self.bwUplink2 = bwUplink2 + self.randomSeed = "" + def __repr__(self): + """Returns a printable representation of the shape""" + shastr = "" + shastr += "bs-"+str(self.blockSize) + shastr += "-nn-"+str(self.numberNodes) + shastr += "-fm-"+str(self.failureModel) + shastr += "-fr-"+str(self.failureRate) + shastr += "-c1r-"+str(self.class1ratio) + shastr += "-chi-"+str(self.chi) + shastr += "-vpn1-"+str(self.vpn1) + shastr += "-vpn2-"+str(self.vpn2) + shastr += "-bwupprod-"+str(self.bwUplinkProd) + shastr += "-bwup1-"+str(self.bwUplink1) + shastr += "-bwup2-"+str(self.bwUplink2) + shastr += "-nd-"+str(self.netDegree) + shastr += "-r-"+str(self.run) + return shastr - + def setSeed(self, seed): + """Adds the random seed to the shape""" + self.randomSeed = seed diff --git a/DAS/simulator.py b/DAS/simulator.py index 676b891..b46a340 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,8 @@ import networkx as nx import logging, random +import pandas as pd +from functools import partial, partialmethod from datetime import datetime from DAS.tools import * from DAS.results import * @@ -9,130 +11,299 @@ from DAS.observer import * from DAS.validator import * class Simulator: + """This class implements the main DAS simulator.""" - proposerID = 0 - logLevel = logging.INFO - validators = [] - glob = [] - result = [] - shape = [] - logger = [] - format = {} - - def __init__(self, shape): + def __init__(self, shape, config, execID): + """It initializes the simulation with a set of parameters (shape).""" self.shape = shape + self.config = config self.format = {"entity": "Simulator"} - self.result = Result(self.shape) + self.execID = execID + self.result = Result(self.shape, self.execID) + self.validators = [] + self.logger = [] + self.logLevel = config.logLevel + self.proposerID = 0 + self.glob = [] + self.execID = execID + self.distR = [] + self.distC = [] + self.nodeRows = [] + self.nodeColumns = [] + + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree def initValidators(self): + """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) - self.glob.reset() self.validators = [] - rows = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize) - columns = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize) - random.shuffle(rows) - random.shuffle(columns) - for i in range(self.shape.numberValidators): - val = Validator(i, int(not i!=0), self.logger, self.shape, rows, columns) + if self.config.evenLineDistribution: + + lightNodes = int(self.shape.numberNodes * self.shape.class1ratio) + heavyNodes = self.shape.numberNodes - lightNodes + lightVal = lightNodes * self.shape.vpn1 + heavyVal = heavyNodes * self.shape.vpn2 + totalValidators = lightVal + heavyVal + totalRows = totalValidators * self.shape.chi + rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + rows = rows[0:totalRows] + columns = columns[0:totalRows] + random.shuffle(rows) + random.shuffle(columns) + offset = lightVal*self.shape.chi + self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format) + self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format) + self.logger.debug("Shuffling a total of %d rows/columns to be assigned (X=%d)" % (len(rows), self.shape.chi), extra=self.format) + self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) + self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) + + assignedRows = [] + assignedCols = [] + for i in range(self.shape.numberNodes): + if self.config.evenLineDistribution: + if i < int(lightVal/self.shape.vpn1): # First start with the light nodes + start = i *self.shape.chi*self.shape.vpn1 + end = (i+1)*self.shape.chi*self.shape.vpn1 + else: + j = i - int(lightVal/self.shape.vpn1) + start = offset+( j *self.shape.chi*self.shape.vpn2) + end = offset+((j+1)*self.shape.chi*self.shape.vpn2) + r = rows[start:end] + c = columns[start:end] + val = Validator(i, int(not i!=0), self.logger, self.shape, self.config, r, c) + self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) + self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) + assignedRows = assignedRows + list(r) + assignedCols = assignedCols + list(c) + self.nodeRows.append(val.rowIDs) + self.nodeColumns.append(val.columnIDs) + + else: + val = Validator(i, int(not i!=0), self.logger, self.shape, self.config) if i == self.proposerID: val.initBlock() - self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) + assignedRows.sort() + assignedCols.sort() + self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format) + self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format) + self.logger.debug("Validators initialized.", extra=self.format) + def initNetwork(self): - self.shape.netDegree = 6 + """It initializes the simulated network.""" rowChannels = [[] for i in range(self.shape.blockSize)] columnChannels = [[] for i in range(self.shape.blockSize)] for v in self.validators: - for id in v.rowIDs: - rowChannels[id].append(v) - for id in v.columnIDs: - columnChannels[id].append(v) + if not (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + rowChannels[id].append(v) + for id in v.columnIDs: + columnChannels[id].append(v) + + # Check rows/columns distribution + for r in rowChannels: + self.distR.append(len(r)) + for c in columnChannels: + self.distC.append(len(c)) + self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format) + self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format) for id in range(self.shape.blockSize): - if (len(rowChannels[id]) < self.shape.netDegree): - self.logger.error("Graph degree higher than %d" % len(rowChannels[id]), extra=self.format) - G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id])) + # If the number of nodes in a channel is smaller or equal to the + # requested degree, a fully connected graph is used. For n>d, a random + # d-regular graph is set up. (For n=d+1, the two are the same.) + if not rowChannels[id]: + self.logger.error("No nodes for row %d !" % id, extra=self.format) + continue + elif (len(rowChannels[id]) <= self.shape.netDegree): + self.logger.debug("Graph fully connected with degree %d !" % (len(rowChannels[id]) - 1), extra=self.format) + G = nx.complete_graph(len(rowChannels[id])) + else: + G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id])) if not nx.is_connected(G): self.logger.error("Graph not connected for row %d !" % id, extra=self.format) for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)}) - if (len(columnChannels[id]) < self.shape.netDegree): - self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format) - G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id])) + if not columnChannels[id]: + self.logger.error("No nodes for column %d !" % id, extra=self.format) + continue + elif (len(columnChannels[id]) <= self.shape.netDegree): + self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format) + G = nx.complete_graph(len(columnChannels[id])) + else: + G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id])) if not nx.is_connected(G): self.logger.error("Graph not connected for column %d !" % id, extra=self.format) for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) + + for v in self.validators: + if (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + count = min(self.proposerPublishTo, len(rowChannels[id])) + publishTo = random.sample(rowChannels[id], count) + for vi in publishTo: + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)}) + for id in v.columnIDs: + count = min(self.proposerPublishTo, len(columnChannels[id])) + publishTo = random.sample(columnChannels[id], count) + for vi in publishTo: + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)}) + + if self.logger.isEnabledFor(logging.DEBUG): + for i in range(0, self.shape.numberNodes): + self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) + self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) def initLogger(self): + """It initializes the logger.""" + logging.TRACE = 5 + logging.addLevelName(logging.TRACE, 'TRACE') + logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = partial(logging.log, logging.TRACE) + logger = logging.getLogger("DAS") - logger.setLevel(self.logLevel) - ch = logging.StreamHandler() - ch.setLevel(self.logLevel) - ch.setFormatter(CustomFormatter()) - logger.addHandler(ch) + if len(logger.handlers) == 0: + logger.setLevel(self.logLevel) + ch = logging.StreamHandler() + ch.setLevel(self.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) self.logger = logger - - def resetShape(self, shape): - self.shape = shape + def printDiagnostics(self): + """Print all required diagnostics to check when a block does not become available""" for val in self.validators: - val.shape.failureRate = shape.failureRate - val.shape.chi = shape.chi - + (a, e) = val.checkStatus() + if e-a > 0 and val.ID != 0: + self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format) + for r in val.rowIDs: + row = val.getRow(r) + if row.count() < len(row): + self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format) + neiR = val.rowNeighbors[r] + for nr in neiR: + self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format) + self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format) + for c in val.columnIDs: + col = val.getColumn(c) + if col.count() < len(col): + self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format) + neiC = val.columnNeighbors[c] + for nc in neiC: + self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) + self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) def run(self): + """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) - self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) + for i in range(0,self.shape.numberNodes): + if i == self.proposerID: + self.validators[i].initBlock() + else: + self.validators[i].logIDs() + arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + progressVector = [] + trafficStatsVector = [] steps = 0 while(True): missingVector.append(missingSamples) oldMissingSamples = missingSamples - for i in range(0,self.shape.numberValidators): - self.validators[i].sendRows() - self.validators[i].sendColumns() - for i in range(1,self.shape.numberValidators): + self.logger.debug("PHASE SEND %d" % steps, extra=self.format) + for i in range(0,self.shape.numberNodes): + self.validators[i].send() + self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() - for i in range(1,self.shape.numberValidators): + self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): self.validators[i].restoreRows() self.validators[i].restoreColumns() - for i in range(0,self.shape.numberValidators): + self.logger.debug("PHASE LOG %d" % steps, extra=self.format) + for i in range(0,self.shape.numberNodes): self.validators[i].logRows() self.validators[i].logColumns() + + # log TX and RX statistics + trafficStats = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: %s" % + (steps, trafficStats), extra=self.format) + for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() + trafficStatsVector.append(trafficStats) + + missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress = self.glob.getProgress(self.validators) + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validatedall %0.02f %%, , validated %0.02f %%" + % (steps, sampleProgress*100, nodeProgress*100, validatorAllProgress*100, validatorProgress*100), extra=self.format) + + cnS = "samples received" + cnN = "nodes ready" + cnV = "validators ready" + cnT0 = "TX builder mean" + cnT1 = "TX class1 mean" + cnT2 = "TX class2 mean" + cnR1 = "RX class1 mean" + cnR2 = "RX class2 mean" + cnD1 = "Dup class1 mean" + cnD2 = "Dup class2 mean" + + progressVector.append({ + cnS:sampleProgress, + cnN:nodeProgress, + cnV:validatorProgress, + cnT0: trafficStats[0]["Tx"]["mean"], + cnT1: trafficStats[1]["Tx"]["mean"], + cnT2: trafficStats[2]["Tx"]["mean"], + cnR1: trafficStats[1]["Rx"]["mean"], + cnR2: trafficStats[2]["Rx"]["mean"], + cnD1: trafficStats[1]["RxDup"]["mean"], + cnD2: trafficStats[2]["RxDup"]["mean"], + }) - arrived, expected = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) if missingSamples == oldMissingSamples: - break + if len(missingVector) > self.config.steps4StopCondition: + if missingSamples == missingVector[-self.config.steps4StopCondition]: + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + if self.config.diagnostics: + self.printDiagnostics() + break + missingVector.append(missingSamples) elif missingSamples == 0: + self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) + missingVector.append(missingSamples) break - else: - steps += 1 + steps += 1 - self.result.addMissing(missingVector) - if missingSamples == 0: - self.result.blockAvailable = 1 - self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) - return self.result - else: - self.result.blockAvailable = 0 - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) - return self.result + progress = pd.DataFrame(progressVector) + if self.config.saveRCdist: + self.result.addMetric("rowDist", self.distR) + self.result.addMetric("columnDist", self.distC) + if self.config.saveProgress: + self.result.addMetric("progress", progress.to_dict(orient='list')) + self.result.populate(self.shape, self.config, missingVector) + return self.result diff --git a/DAS/tools.py b/DAS/tools.py index b9e4a8e..2b47b11 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -1,27 +1,87 @@ #!/bin/python3 import logging +import sys +import random +from bitarray.util import zeros +class CustomFormatter(): + """This class defines the terminal output formatting.""" -class CustomFormatter(logging.Formatter): - - blue = "\x1b[34;20m" - grey = "\x1b[38;20m" - yellow = "\x1b[33;20m" - red = "\x1b[31;20m" - bold_red = "\x1b[31;1m" - reset = "\x1b[0m" - format = "%(levelname)s : %(entity)s : %(message)s" - - FORMATS = { - logging.DEBUG: grey + format + reset, - logging.INFO: blue + format + reset, - logging.WARNING: yellow + format + reset, - logging.ERROR: red + format + reset, - logging.CRITICAL: bold_red + format + reset - } + def __init__(self): + """Initializes 5 different formats for logging with different colors.""" + self.blue = "\x1b[34;20m" + self.grey = "\x1b[38;20m" + self.yellow = "\x1b[33;20m" + self.red = "\x1b[31;20m" + self.bold_red = "\x1b[31;1m" + self.reset = "\x1b[0m" + self.reformat = "%(levelname)s : %(entity)s : %(message)s" + self.FORMATS = { + logging.DEBUG: self.grey + self.reformat + self.reset, + logging.INFO: self.blue + self.reformat + self.reset, + logging.WARNING: self.yellow + self.reformat + self.reset, + logging.ERROR: self.red + self.reformat + self.reset, + logging.CRITICAL: self.bold_red + self.reformat + self.reset + } def format(self, record): + """Returns the formatter with the format corresponding to record.""" log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) +def shuffled(lis, shuffle=True): + """Generator yielding list in shuffled order.""" + # based on https://stackoverflow.com/a/60342323 + if shuffle: + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + else: + for v in lis: + yield v +def shuffledDict(d, shuffle=True): + """Generator yielding dictionary in shuffled order. + + Shuffle, except if not (optional parameter useful for experiment setup). + """ + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv + +def sampleLine(line, limit): + """Sample up to 'limit' bits from a bitarray. + + Since this is quite expensive, we use a number of heuristics to get it fast. + """ + if limit == sys.maxsize : + return line + else: + w = line.count(1) + if limit >= w : + return line + else: + l = len(line) + r = zeros(l) + if w < l/10 or limit > l/2 : + indices = [ i for i in range(l) if line[i] ] + sample = random.sample(indices, limit) + for i in sample: + r[i] = 1 + return r + else: + while limit: + i = random.randrange(0, l) + if line[i] and not r[i]: + r[i] = 1 + limit -= 1 + return r + +def unionOfSamples(population, sampleSize, times): + selected = set() + for t in range(times): + selected |= set(random.sample(population, sampleSize)) + return selected diff --git a/DAS/validator.py b/DAS/validator.py index 950fdea..4e8d350 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -4,57 +4,78 @@ import random import collections import logging from DAS.block import * -from bitarray import bitarray +from DAS.tools import shuffled, shuffledDict, unionOfSamples from bitarray.util import zeros +from collections import deque +from itertools import chain class Neighbor: + """This class implements a node neighbor to monitor sent and received data. + + It represents one side of a P2P link in the overlay. Sent and received + segments are monitored to avoid sending twice or sending back what was + received from a link. + """ def __repr__(self): - return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) + """It returns the amount of sent and received data.""" + return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) - def __init__(self, v, blockSize): + def __init__(self, v, dim, blockSize): + """It initializes the neighbor with the node and sets counters to zero.""" self.node = v + self.dim = dim # 0:row 1:col self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) + self.sendQueue = deque() + class Validator: - - ID = 0 - amIproposer = 0 - shape = [] - format = {} - logger = [] + """This class implements a validator/node in the network.""" def __repr__(self): + """It returns the validator ID.""" return str(self.ID) - def __init__(self, ID, amIproposer, logger, shape, rows, columns): + def __init__(self, ID, amIproposer, logger, shape, config, rows = None, columns = None): + """It initializes the validator with the logger shape and rows/columns. + + If rows/columns are specified these are observed, otherwise (default) + chi rows and columns are selected randomly. + """ + self.shape = shape FORMAT = "%(levelname)s : %(entity)s : %(message)s" self.ID = ID self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) + self.receivedQueue = deque() + self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: self.logger.error("Chi has to be greater than 0", extra=self.format) elif self.shape.chi > self.shape.blockSize: - self.logger.error("Chi has to be smaller than %d" % blockSize, extra=self.format) + self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: + self.nodeClass = 0 self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: - self.rowIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)] - self.columnIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)] #if shape.deterministic: # random.seed(self.ID) - #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.changedRow = {id:False for id in self.rowIDs} - self.changedColumn = {id:False for id in self.columnIDs} + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 + self.vRowIDs = [] + self.vColumnIDs = [] + for i in range(self.vpn): + self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSize), self.shape.chi))) + self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSize), self.shape.chi))) + self.rowIDs = set.union(*self.vRowIDs) + self.columnIDs = set.union(*self.vColumnIDs) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -63,8 +84,33 @@ class Validator: self.statsTxPerSlot = [] self.statsRxInSlot = 0 self.statsRxPerSlot = [] + self.statsRxDupInSlot = 0 + self.statsRxDupPerSlot = [] + + # Set uplink bandwidth. + # Assuming segments of ~560 bytes and timesteps of 50ms, we get + # 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep + if self.amIproposer: + self.bwUplink = shape.bwUplinkProd + elif self.nodeClass == 1: + self.bwUplink = shape.bwUplink1 + else: + self.bwUplink = shape.bwUplink2 + self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize + + self.repairOnTheFly = True + self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps def logIDs(self): + """It logs the assigned rows and columns.""" if self.amIproposer == 1: self.logger.warning("I am a block proposer."% self.ID) else: @@ -72,161 +118,429 @@ class Validator: self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) def initBlock(self): - self.logger.debug("I am a block proposer.", extra=self.format) - self.block = Block(self.shape.blockSize) - self.block.fill() - #self.block.print() - - def broadcastBlock(self): + """It initializes the block for the proposer.""" if self.amIproposer == 0: - self.logger.error("I am NOT a block proposer", extra=self.format) + self.logger.warning("I am not a block proposer", extra=self.format) else: - self.logger.debug("Broadcasting my block...", extra=self.format) - order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] - random.shuffle(order) - while(order): - i = order.pop() - if (random.randint(0,99) >= self.shape.failureRate): + self.logger.debug("Creating block...", extra=self.format) + if self.shape.failureModel == "random": + order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] + order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) + for i in order: self.block.data[i] = 1 - else: - self.block.data[i] = 0 - - self.changedRow = {id:True for id in self.rowIDs} - self.changedColumn = {id:True for id in self.columnIDs} + elif self.shape.failureModel == "sequential": + order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] + order = order[:int((1 - self.shape.failureRate/100) * len(order))] + for i in order: + self.block.data[i] = 1 + elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if r > k or c > k: + self.block.setSegment(r,c) + elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if r > k or c > k: + self.block.setSegment(r,c) + self.block.setSegment(0, 0) + elif self.shape.failureModel == "DEP": + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if (r+c) % self.shape.blockSize > k: + self.block.setSegment(r,c) + elif self.shape.failureModel == "DEP+1": + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if (r+c) % self.shape.blockSize > k: + self.block.setSegment(r,c) + self.block.setSegment(0, 0) + elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if r < k and c < k: + self.block.setSegment(r,c) + elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable + for r in range(self.shape.blockSize): + for c in range(self.shape.blockSize): + k = self.shape.blockSize/2 + if r < k and c < k: + self.block.setSegment(r,c) + self.block.setSegment(0, 0, 0) nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) - #broadcasted.print() def getColumn(self, index): + """It returns a given column.""" return self.block.getColumn(index) def getRow(self, index): + """It returns a given row.""" return self.block.getRow(index) - def receiveColumn(self, id, column, src): - if id in self.columnIDs: - # register receive so that we are not sending back - self.columnNeighbors[id][src].receiving |= column - self.receivedBlock.mergeColumn(id, column) - self.statsRxInSlot += column.count(1) + def receiveSegment(self, rID, cID, src): + """Receive a segment, register it, and queue for forwarding as needed.""" + # register receive so that we are not sending back + if rID in self.rowIDs: + if src in self.rowNeighbors[rID]: + self.rowNeighbors[rID][src].receiving[cID] = 1 + if cID in self.columnIDs: + if src in self.columnNeighbors[cID]: + self.columnNeighbors[cID][src].receiving[rID] = 1 + if not self.receivedBlock.getSegment(rID, cID): + self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.receivedBlock.setSegment(rID, cID) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) else: - pass + self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.statsRxDupInSlot += 1 + self.statsRxInSlot += 1 - def receiveRow(self, id, row, src): - if id in self.rowIDs: - # register receive so that we are not sending back - self.rowNeighbors[id][src].receiving |= row - self.receivedBlock.mergeRow(id, row) - self.statsRxInSlot += row.count(1) - else: - pass + def addToSendQueue(self, rID, cID): + """Queue a segment for forwarding.""" + if self.perNodeQueue: + self.sendQueue.append((rID, cID)) + if self.perNeighborQueue: + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) def receiveRowsColumns(self): + """Finalize time step by merging newly received segments in state.""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: - self.logger.debug("Receiving the data...", extra=self.format) + self.logger.trace("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) - self.changedRow = { id: - self.getRow(id) != self.receivedBlock.getRow(id) - for id in self.rowIDs - } - - self.changedColumn = { id: - self.getColumn(id) != self.receivedBlock.getColumn(id) - for id in self.columnIDs - } - self.block.merge(self.receivedBlock) - for neighs in self.rowNeighbors.values(): + for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()): for neigh in neighs.values(): neigh.received |= neigh.receiving neigh.receiving.setall(0) - for neighs in self.columnNeighbors.values(): - for neigh in neighs.values(): - neigh.received |= neigh.receiving - neigh.receiving.setall(0) + # add newly received segments to the send queue + if self.perNodeQueue or self.perNeighborQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + self.addToSendQueue(rID, cID) + def updateStats(self): + """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsRxDupPerSlot.append(self.statsRxDupInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 + self.statsRxDupInSlot = 0 self.statsTxInSlot = 0 + def checkSegmentToNeigh(self, rID, cID, neigh): + """Check if a segment should be sent to a neighbor.""" + if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + return False # sent enough, other side can restore + i = rID if neigh.dim else cID + if not neigh.sent[i] and not neigh.received[i] : + return True + else: + return False # received or already sent - def sendColumn(self, columnID): - line = self.getColumn(columnID) - if line.any(): - self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID].values(): + def sendSegmentToNeigh(self, rID, cID, neigh): + """Send segment to a neighbor (without checks).""" + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + i = rID if neigh.dim else cID + neigh.sent[i] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveColumn(columnID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + def checkSendSegmentToNeigh(self, rID, cID, neigh): + """Check and send a segment to a neighbor if needed.""" + if self.checkSegmentToNeigh(rID, cID, neigh): + self.sendSegmentToNeigh(rID, cID, neigh) + return True + else: + return False - def sendRow(self, rowID): - line = self.getRow(rowID) - if line.any(): - self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID].values(): + def processSendQueue(self): + """Send out segments from queue until bandwidth limit reached. - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveRow(rowID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + SendQueue is a centralized queue from which segments are sent out + in FIFO order to all interested neighbors. + """ + while self.sendQueue: + (rID, cID) = self.sendQueue[0] - def sendRows(self): - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + if rID in self.rowIDs: + for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) - def sendColumns(self): - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + if self.statsTxInSlot >= self.bwUplink: + return + + if cID in self.columnIDs: + for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + self.sendQueue.popleft() + + def processPerNeighborSendQueue(self): + """Send out segments from per-neighbor queues until bandwidth limit reached. + + Segments are dispatched from per-neighbor transmission queues in a shuffled + round-robin order, emulating a type of fair queuing. Since neighborhood is + handled at the topic (column or row) level, fair queuing is also at the level + of flows per topic and per peer. A per-peer model might be closer to the + reality of libp2p implementations where topics between two nodes are + multiplexed over the same transport. + """ + progress = True + while (progress): + progress = False + + queues = [] + # collect and shuffle + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((0, rID, neigh)) + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((1, cID, neigh)) + + for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): + if dim == 0: + self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + def runSegmentShuffleScheduler(self): + """ Schedule chunks for sending. + + This scheduler check which owned segments needs sending (at least + one neighbor needing it). Then it sends each segment that's worth sending + once, in shuffled order. This is repeated until bw limit. + """ + + def collectSegmentsToSend(): + # yields list of segments to send as (dim, lineID, id) + segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((1, cID, i)) + + return segmentsToSend + + def nextSegment(): + while True: + # send each collected segment once + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + for dim, lineID, id in self.segmentShuffleGen: + if dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(lineID, id, neigh): + yield((lineID, id, neigh)) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(id, lineID, neigh): + yield((id, lineID, neigh)) + break + + # collect segments for next round + segmentsToSend = collectSegmentsToSend() + + # finish if empty or set up shuffled generator based on collected segments + if not segmentsToSend: + break + else: + self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines) + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentShuffleGen = None + return + + def runDumbRandomScheduler(self, tries = 100): + """Random scheduler picking segments at random. + + This scheduler implements a simple random scheduling order picking + segments at random and peers potentially interested in that segment + also at random. + It serves more as a performance baseline than as a realistic model. + """ + + def nextSegment(): + t = tries + while t: + if self.rowIDs: + rID = random.choice(self.rowIDs) + cID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.rowNeighbors[rID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + if self.columnIDs: + cID = random.choice(self.columnIDs) + rID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.columnNeighbors[cID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + t -= 1 + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + def send(self): + """ Send as much as we can in the timestep, limited by bwUplink.""" + + # process node level send queue + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process neighbor level send queues in shuffled breadth-first order + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return + + if self.dumbRandomScheduler: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): + """It logs the rows assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): for id in self.rowIDs: self.logger.debug("Row %d: %s", id, self.getRow(id), extra=self.format) def logColumns(self): + """It logs the columns assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): for id in self.columnIDs: self.logger.debug("Column %d: %s", id, self.getColumn(id), extra=self.format) def restoreRows(self): - for id in self.rowIDs: - self.block.repairRow(id) + """It restores the rows assigned to the validator, that can be repaired.""" + if self.repairOnTheFly: + for id in self.rowIDs: + self.restoreRow(id) + + def restoreRow(self, id): + """Restore a given row if repairable.""" + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.trace("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): - for id in self.columnIDs: - self.block.repairColumn(id) + """It restores the columns assigned to the validator, that can be repaired.""" + if self.repairOnTheFly: + for id in self.columnIDs: + self.restoreColumn(id) + + def restoreColumn(self, id): + """Restore a given column if repairable.""" + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.trace("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): - arrived = 0 - expected = 0 - for id in self.columnIDs: - line = self.getColumn(id) - arrived += line.count(1) - expected += len(line) - for id in self.rowIDs: - line = self.getRow(id) - arrived += line.count(1) - expected += len(line) + """It checks how many expected/arrived samples are for each assigned row/column.""" + + def checkStatus(columnIDs, rowIDs): + arrived = 0 + expected = 0 + for id in columnIDs: + line = self.getColumn(id) + arrived += line.count(1) + expected += len(line) + for id in rowIDs: + line = self.getRow(id) + arrived += line.count(1) + expected += len(line) + return arrived, expected + + arrived, expected = checkStatus(self.columnIDs, self.rowIDs) self.logger.debug("status: %d / %d", arrived, expected, extra=self.format) - return (arrived, expected) + validated = 0 + for i in range(self.vpn): + a, e = checkStatus(self.vColumnIDs[i], self.vRowIDs[i]) + if a == e: + validated+=1 + + return arrived, expected, validated diff --git a/DAS/visualizer.py b/DAS/visualizer.py new file mode 100644 index 0000000..cf095c1 --- /dev/null +++ b/DAS/visualizer.py @@ -0,0 +1,277 @@ +#!/bin/python3 +import os, sys +import time +import xml.etree.ElementTree as ET +import matplotlib.pyplot as plt +import numpy as np +import seaborn as sns +from itertools import combinations +from mplfinance.original_flavor import candlestick_ohlc +import os + + +class Visualizer: + + def __init__(self, execID, config): + self.execID = execID + self.config = config + self.folderPath = "results/"+self.execID + self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] + self.minimumDataPoints = 2 + self.maxTTA = 11000 + + def plottingData(self): + """Store data with a unique key for each params combination""" + data = {} + bw = [] + print("Getting data from the folder...") + """Loop over the xml files in the folder""" + for filename in os.listdir(self.folderPath): + """Loop over the xmls and store the data in variables""" + if filename.endswith('.xml'): + tree = ET.parse(os.path.join(self.folderPath, filename)) + root = tree.getroot() + run = int(root.find('run').text) + blockSize = int(root.find('blockSize').text) + failureRate = int(root.find('failureRate').text) + numberNodes = int(root.find('numberNodes').text) + class1ratio = float(root.find('class1ratio').text) + netDegree = int(root.find('netDegree').text) + chi = int(root.find('chi').text) + vpn1 = int(root.find('vpn1').text) + vpn2 = int(root.find('vpn2').text) + bwUplinkProd = int(root.find('bwUplinkProd').text) + bwUplink1 = int(root.find('bwUplink1').text) + bwUplink2 = int(root.find('bwUplink2').text) + tta = float(root.find('tta').text) + + """Store BW""" + bw.append(bwUplinkProd) + + """Loop over all possible combinations of length of the parameters minus x, y params""" + for combination in combinations(self.parameters, len(self.parameters)-2): + # Get the indices and values of the parameters in the combination + + indices = [self.parameters.index(element) for element in combination] + selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2] + values = [selectedValues[index] for index in indices] + names = [self.parameters[i] for i in indices] + keyComponents = [f"{name}_{value}" for name, value in zip(names, values)] + key = tuple(keyComponents[:len(self.parameters)-2]) + """Get the names of the other parameters that are not included in the key""" + otherParams = [self.parameters[i] for i in range(len(self.parameters)) if i not in indices] + """Append the values of the other parameters and the ttas to the lists for the key""" + otherIndices = [i for i in range(len(self.parameters)) if i not in indices] + + """Initialize the dictionary for the key if it doesn't exist yet""" + if key not in data: + data[key] = {} + """Initialize lists for the other parameters and the ttas with the key""" + data[key][otherParams[0]] = [] + data[key][otherParams[1]] = [] + data[key]['ttas'] = [] + + if otherParams[0] in data[key]: + data[key][otherParams[0]].append(selectedValues[otherIndices[0]]) + else: + data[key][otherParams[0]] = [selectedValues[otherIndices[0]]] + if otherParams[1] in data[key]: + data[key][otherParams[1]].append(selectedValues[otherIndices[1]]) + else: + data[key][otherParams[1]] = [selectedValues[otherIndices[1]]] + data[key]['ttas'].append(tta) + return data + + def averageRuns(self, data, runs): + """Get the average of all runs for each key""" + newData = {} + print("Getting the average of the runs...") + for key, value in data.items(): + runExists = False + """Check if the key contains 'run_' with a numerical value""" + for item in key: + if item.startswith('run_'): + runExists = True + break + if runExists: + ps = list(data[key].keys()) + for item in key: + """Create a new key with the other items in the tuple""" + if item.startswith('run_'): + newKey = tuple([x for x in key if x != item]) + """Average the similar key values""" + tta_sums = {} + nbRuns = {} + ttRuns = [] + total = [] + p0 = [] + p1 = [] + p2 = [] + p3 = [] + for i in range(runs): + key0 = (f'run_{i}',) + newKey + #Create a dictionary to store the sums of ttas for each unique pair of values in subkeys + for i in range(len(data[key0][ps[0]])): + keyPair = (data[key0][ps[0]][i], data[key0][ps[1]][i]) + if data[key0]["ttas"][i] == -1: + data[key0]["ttas"][i] = self.maxTTA + try: + tta_sums[keyPair] += data[key0]['ttas'][i] + if data[key0]["ttas"][i] != self.maxTTA: + nbRuns[keyPair] += 1 + except KeyError: + tta_sums[keyPair] = data[key0]['ttas'][i] + if data[key0]["ttas"][i] != self.maxTTA: + nbRuns[keyPair] = 1 + else: + nbRuns[keyPair] = 0 + for k, tta in tta_sums.items(): + p0.append(k[0]) + p1.append(k[1]) + total.append(tta) + for k, run in nbRuns.items(): + p2.append(k[0]) + p3.append(k[1]) + ttRuns.append(run) + for i in range(len(total)): + if(ttRuns[i] == 0): # All tta = -1 + total[i] = self.maxTTA + elif ttRuns[i] < runs: # Some tta = -1 + total[i] -= (runs-ttRuns[i]) * self.maxTTA + total[i] = total[i]/ttRuns[i] + else: # No tta = -1 + total[i] = total[i]/ttRuns[i] + averages = {} + averages[ps[0]] = p0 + averages[ps[1]] = p1 + averages['ttas'] = total + newData[newKey] = averages + return newData + + def similarKeys(self, data): + """Get the keys for all data with the same x and y labels""" + filteredKeys = {} + for key1, value1 in data.items(): + subKeys1 = list(value1.keys()) + filteredKeys[(subKeys1[0], subKeys1[1])] = [key1] + for key2, value2 in data.items(): + subKeys2 = list(value2.keys()) + if key1 != key2 and subKeys1[0] == subKeys2[0] and subKeys1[1] == subKeys2[1]: + try: + filteredKeys[(subKeys1[0], subKeys1[1])].append(key2) + except KeyError: + filteredKeys[(subKeys1[0], subKeys1[1])] = [key2] + print("Getting filtered keys from data...") + return filteredKeys + + def formatLabel(self, label): + """Label formatting for the figures""" + result = ''.join([f" {char}" if char.isupper() else char for char in label]) + return result.title() + + def formatTitle(self, key): + """Title formatting for the figures""" + name = ''.join([f" {char}" if char.isupper() else char for char in key.split('_')[0]]) + number = key.split('_')[1] + return f"{name.title()}: {number} " + + def plotHeatmaps(self): + """Plot and store the 2D heatmaps in subfolders""" + data= self.plottingData() + """Average the runs if needed""" + if(len(self.config.runs) > 1): + data = self.averageRuns(data, len(self.config.runs)) + filteredKeys = self.similarKeys(data) + vmin, vmax = 0, self.maxTTA+1000 + print("Plotting heatmaps...") + + """Create the directory if it doesn't exist already""" + heatmapsFolder = self.folderPath + '/heatmaps' + if not os.path.exists(heatmapsFolder): + os.makedirs(heatmapsFolder) + + """Plot""" + for labels, keys in filteredKeys.items(): + for key in keys: + xlabels = np.sort(np.unique(data[key][labels[0]])) + ylabels = np.sort(np.unique(data[key][labels[1]])) + if len(xlabels) < self.minimumDataPoints or len(ylabels) < self.minimumDataPoints: + continue + hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas']) + hist = hist.T + fig, ax = plt.subplots(figsize=(10, 6)) + sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax) + plt.xlabel(self.formatLabel(labels[0])) + plt.ylabel(self.formatLabel(labels[1])) + filename = "" + title = "" + paramValueCnt = 0 + for param in self.parameters: + if param != labels[0] and param != labels[1] and param != 'run': + filename += f"{key[paramValueCnt]}" + formattedTitle = self.formatTitle(key[paramValueCnt]) + title += formattedTitle + if (paramValueCnt+1) % 5 == 0: + title += "\n" + paramValueCnt += 1 + title = "Time to Block Availability (ms)" + title_obj = plt.title(title) + font_size = 16 * fig.get_size_inches()[0] / 10 + title_obj.set_fontsize(font_size) + filename = filename + ".png" + targetFolder = os.path.join(heatmapsFolder, f"{labels[0]}Vs{labels[1]}") + if not os.path.exists(targetFolder): + os.makedirs(targetFolder) + plt.savefig(os.path.join(targetFolder, filename)) + plt.close() + plt.clf() + + def plotHist(self, bandwidth): + """Plot Bandwidth Frequency Histogram""" + plt.hist(bandwidth, bins=5) + plt.xlabel('Bandwidth') + plt.ylabel('Frequency') + plt.title('Bandwidth Histogram') + + """Create the directory if it doesn't exist already""" + histogramFolder = self.folderPath + '/histogram' + if not os.path.exists(histogramFolder): + os.makedirs(histogramFolder) + filename = os.path.join(histogramFolder, 'histogram.png') + plt.savefig(filename) + plt.clf() + + def plotHist(self, bandwidth): + """Plot Bandwidth Frequency Histogram""" + plt.hist(bandwidth, bins=5) + plt.xlabel('Bandwidth') + plt.ylabel('Frequency') + plt.title('Bandwidth Histogram') + + """Create the directory if it doesn't exist already""" + histogramFolder = self.folderPath + '/histogram' + if not os.path.exists(histogramFolder): + os.makedirs(histogramFolder) + filename = os.path.join(histogramFolder, 'histogram.png') + plt.savefig(filename) + plt.clf() + + def plotCandleStick(self, TX_prod, TX_avg, TX_max): + #x-axis corresponding to steps + steps = range(len(TX_prod)) + + #Plot the candlestick chart + ohlc = [] + for i in range(len(TX_prod)): + ohlc.append([steps[i], TX_prod[i], TX_max[i], TX_avg[i]]) + fig, ax = plt.subplots() + candlestick_ohlc(ax, ohlc, width=0.6, colorup='green', colordown='red') + + #Ticks, title and labels + plt.xticks(steps, ['run{}'.format(i) for i in steps], rotation=45) + plt.title('Candlestick Chart') + plt.xlabel('Step') + plt.ylabel('Price') + + #Test + plt.show() diff --git a/config.das b/config.das deleted file mode 100644 index f5d608e..0000000 --- a/config.das +++ /dev/null @@ -1,27 +0,0 @@ -[Simulation Space] - -numberValidatorStart = 256 -numberValidatorStop = 512 -numberValidatorStep = 128 - -failureRateStart = 10 -failureRateStop = 90 -failureRateStep = 40 - -blockSizeStart = 32 -blockSizeStop = 64 -blockSizeStep = 16 - -netDegreeStart = 6 -netDegreeStop = 8 -netDegreeStep = 1 - -chiStart = 4 -chiStop = 8 -chiStep = 2 - - -[Advanced] - -deterministic = 0 -numberRuns = 2 diff --git a/doc/Makefile b/doc/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/doc/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/doc/conf.py b/doc/conf.py new file mode 100644 index 0000000..7524d9c --- /dev/null +++ b/doc/conf.py @@ -0,0 +1,64 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +import os +import sys +sys.path.insert(0, os.path.abspath('../DAS')) + + +# -- Project information ----------------------------------------------------- + +project = 'DAS simulator' +copyright = '2023, Leonardo A. Bautista-Gomez, Csaba Kiraly' +author = 'Leonardo A. Bautista-Gomez, Csaba Kiraly' + +# The full version, including alpha/beta/rc tags +release = '1' + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = ['sphinx.ext.autodoc' +] + + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'myenv'] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'alabaster' + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + + +# -- Options for autodoc ------------------------------------------------- + +autodoc_mock_imports = ["django", "dicttoxml", "bitarray", "DAS", "networkx"] + + + diff --git a/doc/index.rst b/doc/index.rst new file mode 100644 index 0000000..e253201 --- /dev/null +++ b/doc/index.rst @@ -0,0 +1,44 @@ +.. DAS simulator documentation master file, created by + sphinx-quickstart on Wed Feb 8 20:56:44 2023. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to DAS simulator's documentation! +========================================= + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + +.. automodule:: block + :members: + +.. automodule:: configuration + :members: + +.. automodule:: observer + :members: + +.. automodule:: results + :members: + +.. automodule:: shape + :members: + +.. automodule:: simulator + :members: + +.. automodule:: tools + :members: + +.. automodule:: validator + :members: + + diff --git a/doc/make.bat b/doc/make.bat new file mode 100644 index 0000000..153be5e --- /dev/null +++ b/doc/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/smallConf.py b/smallConf.py new file mode 100644 index 0000000..7ab3f44 --- /dev/null +++ b/smallConf.py @@ -0,0 +1,110 @@ +"""Example configuration file + +This file illustrates how to define options and simulation parameter ranges. +It also defines the traversal order of the simulation space. As the file +extension suggests, configuration is pure python code, allowing complex +setups. Use at your own risk. + +To use this example, run + python3 study.py config_example + +Otherwise copy it and modify as needed. The default traversal order defined +in the nested loop of nextShape() is good for most cases, but customizable +if needed. +""" + +import logging +import itertools +import numpy as np +from DAS.shape import Shape + +# Dump results into XML files +dumpXML = 1 + +# save progress and row/column distribution vectors to XML +saveProgress = 1 + +# plot progress for each run to PNG +plotProgress = 1 + +# Save row and column distributions +saveRCdist = 1 + +# Plot all figures +visualization = 1 + +# Verbosity level +logLevel = logging.INFO + +# number of parallel workers. -1: all cores; 1: sequential +# for more details, see joblib.Parallel +numJobs = -1 + +# distribute rows/columns evenly between validators (True) +# or generate it using local randomness (False) +evenLineDistribution = True + +# Number of simulation runs with the same parameters for statistical relevance +runs = range(3) + +# Number of validators +numberNodes = range(128, 513, 128) + +# select failure model between: "random, sequential, MEP, MEP+1, DEP, DEP+1, MREP, MREP-1" +failureModels = ["random"] + +# Percentage of block not released by producer +failureRates = range(40, 81, 20) + +# Block size in one dimension in segments. Block is blockSizes * blockSizes segments. +blockSizes = range(64, 113, 128) + +# Per-topic mesh neighborhood size +netDegrees = range(8, 9, 2) + +# number of rows and columns a validator is interested in +chis = range(2, 3, 2) + +# ratio of class1 nodes (see below for parameters per class) +class1ratios = [0.8] + +# Number of validators per beacon node +validatorsPerNode1 = [1] +validatorsPerNode2 = [500] + +# Set uplink bandwidth in megabits/second +bwUplinksProd = [200] +bwUplinks1 = [10] +bwUplinks2 = [200] + +# Step duration in miliseconds (Classic RTT is about 100ms) +stepDuration = 50 + +# Segment size in bytes (with proof) +segmentSize = 560 + +# Set to True if you want your run to be deterministic, False if not +deterministic = True + +# If your run is deterministic you can decide the random seed. This is ignore otherwise. +randomSeed = "DAS" + +# Number of steps without progress to stop simulation +steps4StopCondition = 7 + +# Number of validators ready to asume block is available +successCondition = 0.9 + +# If True, print diagnostics when the block is not available +diagnostics = False + +# True to save git diff and git commit +saveGit = False + +def nextShape(): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): + # Network Degree has to be an even number + if netDegree % 2 == 0: + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) + yield shape diff --git a/study.py b/study.py index ae27e5c..badb7f3 100644 --- a/study.py +++ b/study.py @@ -1,46 +1,91 @@ #! /bin/python3 -import time, sys +import time, sys, random, copy +import importlib +import subprocess +from joblib import Parallel, delayed from DAS import * +# Parallel execution: +# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see +# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop +# For fixing logging issues in parallel execution, see +# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls +# and https://github.com/joblib/joblib/issues/1017 + +def initLogger(config): + """It initializes the logger.""" + logger = logging.getLogger("Study") + logger.setLevel(config.logLevel) + ch = logging.StreamHandler() + ch.setLevel(config.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger + +def runOnce(config, shape, execID): + + if config.deterministic: + shape.setSeed(config.randomSeed+"-"+str(shape)) + random.seed(shape.randomSeed) + + sim = Simulator(shape, config, execID) + sim.initLogger() + sim.initValidators() + sim.initNetwork() + result = sim.run() + sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + + if config.dumpXML: + result.dump() + + return result def study(): if len(sys.argv) < 2: print("You need to pass a configuration file in parameter") exit(1) - config = Configuration(sys.argv[1]) - sim = Simulator(config) - sim.initLogger() + try: + config = importlib.import_module(sys.argv[1]) + except ModuleNotFoundError as e: + try: + config = importlib.import_module(str(sys.argv[1]).replace(".py", "")) + except ModuleNotFoundError as e: + print(e) + print("You need to pass a configuration file in parameter") + exit(1) + + logger = initLogger(config) + format = {"entity": "Study"} + results = [] - simCnt = 0 - sim.logger.info("Starting simulations:", extra=sim.format) + now = datetime.now() + execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) + + # save config and code state for reproducibility + if not os.path.exists("results"): + os.makedirs("results") + dir = "results/"+execID + if not os.path.exists(dir): + os.makedirs(dir) + if config.saveGit: + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) + subprocess.run(["cp", sys.argv[1], dir+"/"]) + + logger.info("Starting simulations:", extra=format) start = time.time() - - for run in range(config.numberRuns): - for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep): - for chi in range(config.chiStart, config.chiStop+1, config.chiStep): - for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep): - for nv in range(config.nvStart, config.nvStop+1, config.nvStep): - for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep): - - if not config.deterministic: - random.seed(datetime.now()) - - shape = Shape(blockSize, nv, fr, chi, netDegree) - sim.resetShape(shape) - sim.initValidators() - sim.initNetwork() - result = sim.run() - sim.logger.info("Run %d, FR: %d %%, Chi: %d, BlockSize: %d, Nb.Val: %d, netDegree: %d ... Block Available: %d" % (run, fr, chi, blockSize, nv, netDegree, result.blockAvailable), extra=sim.format) - results.append(result) - simCnt += 1 - + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format) + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) + if config.visualization: + vis = Visualizer(execID, config) + vis.plotHeatmaps() - -study() - +if __name__ == "__main__": + study()