diff --git a/.gitignore b/.gitignore index 42fd1ff..492044a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ *.swp *.pyc results/* +myenv +doc/_build !results/plots.py -Frontend/ \ No newline at end of file +Frontend/ diff --git a/DAS/__init__.py b/DAS/__init__.py index c7fc7c0..67af3ae 100644 --- a/DAS/__init__.py +++ b/DAS/__init__.py @@ -1,4 +1,3 @@ from DAS.simulator import * -from DAS.configuration import * from DAS.shape import * from DAS.visualizer import * diff --git a/DAS/block.py b/DAS/block.py index aa5fe01..f76a944 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -5,43 +5,73 @@ from bitarray import bitarray from bitarray.util import zeros class Block: - - blockSize = 0 - data = bitarray() + """This class represents a block in the Ethereum blockchain.""" def __init__(self, blockSize): + """Initialize the block with a data array of blocksize^2 zeros.""" self.blockSize = blockSize self.data = zeros(self.blockSize*self.blockSize) def fill(self): + """It fills the block data with ones.""" self.data.setall(1) def merge(self, merged): + """It merges (OR) the existing block with the received one.""" self.data |= merged.data + def getSegment(self, rowID, columnID): + """Check whether a segment is included""" + return self.data[rowID*self.blockSize + columnID] + + def setSegment(self, rowID, columnID, value = 1): + """Set value for a segment (default 1)""" + self.data[rowID*self.blockSize + columnID] = value + def getColumn(self, columnID): + """It returns the block column corresponding to columnID.""" return self.data[columnID::self.blockSize] def mergeColumn(self, columnID, column): + """It merges (OR) the existing column with the received one.""" self.data[columnID::self.blockSize] |= column def repairColumn(self, id): - success = self.data[id::self.blockSize].count(1) + """It repairs the entire column if it has at least blockSize/2 ones. + Returns: list of repaired segments + """ + line = self.data[id::self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id::self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def getRow(self, rowID): + """It returns the block row corresponding to rowID.""" return self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] def mergeRow(self, rowID, row): + """It merges (OR) the existing row with the received one.""" self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row def repairRow(self, id): - success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1) + """It repairs the entire row if it has at least blockSize/2 ones. + Returns: list of repaired segments. + """ + line = self.data[id*self.blockSize:(id+1)*self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id*self.blockSize:(id+1)*self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def print(self): + """It prints the block in the terminal (outside of the logger rules)).""" dash = "-" * (self.blockSize+2) print(dash) for i in range(self.blockSize): diff --git a/DAS/configuration.py b/DAS/configuration.py deleted file mode 100644 index a2e614e..0000000 --- a/DAS/configuration.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/python3 - -import configparser - -class Configuration: - - deterministic = 0 - - def __init__(self, fileName): - - config = configparser.RawConfigParser() - config.read(fileName) - - self.nvStart = int(config.get("Simulation Space", "numberValidatorStart")) - self.nvStop = int(config.get("Simulation Space", "numberValidatorStop")) - self.nvStep = int(config.get("Simulation Space", "numberValidatorStep")) - - self.blockSizeStart = int(config.get("Simulation Space", "blockSizeStart")) - self.blockSizeStop = int(config.get("Simulation Space", "blockSizeStop")) - self.blockSizeStep = int(config.get("Simulation Space", "blockSizeStep")) - - self.netDegreeStart = int(config.get("Simulation Space", "netDegreeStart")) - self.netDegreeStop = int(config.get("Simulation Space", "netDegreeStop")) - self.netDegreeStep = int(config.get("Simulation Space", "netDegreeStep")) - - self.failureRateStart = int(config.get("Simulation Space", "failureRateStart")) - self.failureRateStop = int(config.get("Simulation Space", "failureRateStop")) - self.failureRateStep = int(config.get("Simulation Space", "failureRateStep")) - - self.chiStart = int(config.get("Simulation Space", "chiStart")) - self.chiStop = int(config.get("Simulation Space", "chiStop")) - self.chiStep = int(config.get("Simulation Space", "chiStep")) - - self.numberRuns = int(config.get("Advanced", "numberRuns")) - self.deterministic = config.get("Advanced", "deterministic") - self.dumpXML = config.get("Advanced", "dumpXML") - - if self.nvStop < (self.blockSizeStart*4): - print("ERROR: The number of validators cannot be lower than the block size * 4") - exit(1) - if self.chiStart < 1: - print("Chi has to be greater than 0") - exit(1) - if self.chiStop > self.blockSizeStart: - print("Chi (%d) has to be smaller or equal to block the size (%d)" % (self.chiStop, self.blockSizeStart)) - exit(1) - - - diff --git a/DAS/observer.py b/DAS/observer.py index ad9052b..af5866a 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -3,21 +3,22 @@ from DAS.block import * class Observer: - - block = [] - rows = [] - columns = [] - goldenData = [] - broadcasted = [] - config = [] - logger = [] + """This class gathers global data from the simulation, like an 'all-seen god'.""" def __init__(self, logger, config): + """It initializes the observer with a logger and given configuration.""" self.config = config self.format = {"entity": "Observer"} self.logger = logger + self.block = [] + self.rows = [] + self.columns = [] + self.goldenData = [] + self.broadcasted = [] + def reset(self): + """It resets all the gathered data to zeros.""" self.block = [0] * self.config.blockSize * self.config.blockSize self.goldenData = [0] * self.config.blockSize * self.config.blockSize self.rows = [0] * self.config.blockSize @@ -25,6 +26,7 @@ class Observer: self.broadcasted = Block(self.config.blockSize) def checkRowsColumns(self, validators): + """It checks how many validators have been assigned to each row and column.""" for val in validators: if val.amIproposer == 0: for r in val.rowIDs: @@ -38,10 +40,12 @@ class Observer: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) def setGoldenData(self, block): + """Stores the original real data to compare it with future situations.""" for i in range(self.config.blockSize*self.config.blockSize): self.goldenData[i] = block.data[i] def checkBroadcasted(self): + """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 for i in range(self.blockSize * self.blockSize): if self.broadcasted.data[i] == 0: @@ -51,6 +55,7 @@ class Observer: return zeros def checkStatus(self, validators): + """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 for val in validators: diff --git a/DAS/requirements.txt b/DAS/requirements.txt index d0bb457..da7dcc7 100644 --- a/DAS/requirements.txt +++ b/DAS/requirements.txt @@ -1,3 +1,7 @@ bitarray==2.6.0 -DAS==0.28.7 +DAS==0.29.0 +dicttoxml==1.7.16 +matplotlib==3.6.2 networkx==3.0 +numpy==1.23.5 +seaborn==0.12.2 diff --git a/DAS/results.py b/DAS/results.py index 48b6cbc..0efe26d 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -5,19 +5,17 @@ from xml.dom import minidom from dicttoxml import dicttoxml class Result: - - shape = [] - missingVector = [] - blockAvailable = -1 - tta = -1 + """This class stores and process/store the results of a simulation.""" def __init__(self, shape): + """It initializes the instance with a specific shape.""" self.shape = shape self.blockAvailable = -1 self.tta = -1 self.missingVector = [] def populate(self, shape, missingVector): + """It populates part of the result data inside a vector.""" self.shape = shape self.missingVector = missingVector missingSamples = missingVector[-1] @@ -29,6 +27,7 @@ class Result: self.tta = -1 def dump(self, execID): + """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): os.makedirs("results") if not os.path.exists("results/"+execID): @@ -40,11 +39,6 @@ class Result: resXml = dicttoxml(resd1) xmlstr = minidom.parseString(resXml) xmlPretty = xmlstr.toprettyxml() - filePath = "results/"+execID+"/nbv-"+str(self.shape.numberValidators)+\ - "-bs-"+str(self.shape.blockSize)+\ - "-nd-"+str(self.shape.netDegree)+\ - "-fr-"+str(self.shape.failureRate)+\ - "-chi-"+str(self.shape.chi)+\ - "-r-"+str(self.shape.run)+".xml" + filePath = "results/"+execID+"/"+str(self.shape)+".xml" with open(filePath, "w") as f: f.write(xmlPretty) diff --git a/DAS/shape.py b/DAS/shape.py index 243ae8e..d83351d 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -1,21 +1,42 @@ #!/bin/python3 class Shape: - run = 0 - numberValidators = 0 - blockSize = 0 - failureRate = 0 - netDegree = 0 - chi = 0 + """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberValidators, failureRate, chi, netDegree, run): + def __init__(self, blockSize, numberNodes, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + """Initializes the shape with the parameters passed in argument.""" self.run = run - self.numberValidators = numberValidators + self.numberNodes = numberNodes self.blockSize = blockSize self.failureRate = failureRate self.netDegree = netDegree + self.class1ratio = class1ratio self.chi = chi + self.vpn1 = vpn1 + self.vpn2 = vpn2 + self.bwUplinkProd = bwUplinkProd + self.bwUplink1 = bwUplink1 + self.bwUplink2 = bwUplink2 + self.randomSeed = "" + def __repr__(self): + """Returns a printable representation of the shape""" + shastr = "" + shastr += "bs-"+str(self.blockSize) + shastr += "-nn-"+str(self.numberNodes) + shastr += "-fr-"+str(self.failureRate) + shastr += "-c1r-"+str(self.class1ratio) + shastr += "-chi-"+str(self.chi) + shastr += "-vpn1-"+str(self.vpn1) + shastr += "-vpn2-"+str(self.vpn2) + shastr += "-bwupprod-"+str(self.bwUplinkProd) + shastr += "-bwup1-"+str(self.bwUplink1) + shastr += "-bwup2-"+str(self.bwUplink2) + shastr += "-nd-"+str(self.netDegree) + shastr += "-r-"+str(self.run) + return shastr - + def setSeed(self, seed): + """Adds the random seed to the shape""" + self.randomSeed = seed diff --git a/DAS/simulator.py b/DAS/simulator.py index 20ecb19..30c0d86 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -3,96 +3,175 @@ import networkx as nx import logging, random from datetime import datetime +from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * class Simulator: + """This class implements the main DAS simulator.""" - proposerID = 0 - logLevel = logging.INFO - validators = [] - glob = [] - result = [] - shape = [] - logger = [] - format = {} - - def __init__(self, shape): + def __init__(self, shape, config): + """It initializes the simulation with a set of parameters (shape).""" self.shape = shape + self.config = config self.format = {"entity": "Simulator"} self.result = Result(self.shape) + self.validators = [] + self.logger = [] + self.logLevel = config.logLevel + self.proposerID = 0 + self.glob = [] def initValidators(self): + """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) self.glob.reset() self.validators = [] - rows = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize) - columns = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize) - random.shuffle(rows) - random.shuffle(columns) - for i in range(self.shape.numberValidators): - val = Validator(i, int(not i!=0), self.logger, self.shape, rows, columns) + if self.config.evenLineDistribution: + + lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) + heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) + totalValidators = lightVal + heavyVal + rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) + offset = heavyVal*self.shape.chi + random.shuffle(rows) + random.shuffle(columns) + for i in range(self.shape.numberNodes): + if self.config.evenLineDistribution: + if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes + start = i *self.shape.chi*self.shape.vpn2 + end = (i+1)*self.shape.chi*self.shape.vpn2 + else: # Then the solo stakers + j = i - int(heavyVal/self.shape.vpn2) + start = offset+( j *self.shape.chi) + end = offset+((j+1)*self.shape.chi) + r = rows[start:end] + c = columns[start:end] + val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) + else: + val = Validator(i, int(not i!=0), self.logger, self.shape) if i == self.proposerID: val.initBlock() self.glob.setGoldenData(val.block) else: val.logIDs() self.validators.append(val) + self.logger.debug("Validators initialized.", extra=self.format) def initNetwork(self): + """It initializes the simulated network.""" rowChannels = [[] for i in range(self.shape.blockSize)] columnChannels = [[] for i in range(self.shape.blockSize)] for v in self.validators: - for id in v.rowIDs: - rowChannels[id].append(v) - for id in v.columnIDs: - columnChannels[id].append(v) + if not (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + rowChannels[id].append(v) + for id in v.columnIDs: + columnChannels[id].append(v) + + # Check rows/columns distribution + #totalR = 0 + #totalC = 0 + #for r in rowChannels: + # totalR += len(r) + #for c in columnChannels: + # totalC += len(c) for id in range(self.shape.blockSize): - if (len(rowChannels[id]) < self.shape.netDegree): - self.logger.error("Graph degree higher than %d" % len(rowChannels[id]), extra=self.format) - G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id])) + # If the number of nodes in a channel is smaller or equal to the + # requested degree, a fully connected graph is used. For n>d, a random + # d-regular graph is set up. (For n=d+1, the two are the same.) + if not rowChannels[id]: + self.logger.error("No nodes for row %d !" % id, extra=self.format) + continue + elif (len(rowChannels[id]) <= self.shape.netDegree): + self.logger.debug("Graph fully connected with degree %d !" % (len(rowChannels[id]) - 1), extra=self.format) + G = nx.complete_graph(len(rowChannels[id])) + else: + G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id])) if not nx.is_connected(G): self.logger.error("Graph not connected for row %d !" % id, extra=self.format) for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)}) - if (len(columnChannels[id]) < self.shape.netDegree): - self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format) - G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id])) + if not columnChannels[id]: + self.logger.error("No nodes for column %d !" % id, extra=self.format) + continue + elif (len(columnChannels[id]) <= self.shape.netDegree): + self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format) + G = nx.complete_graph(len(columnChannels[id])) + else: + G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id])) if not nx.is_connected(G): self.logger.error("Graph not connected for column %d !" % id, extra=self.format) for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) + + for v in self.validators: + if (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + count = min(self.proposerPublishTo, len(rowChannels[id])) + publishTo = random.sample(rowChannels[id], count) + for vi in publishTo: + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)}) + for id in v.columnIDs: + count = min(self.proposerPublishTo, len(columnChannels[id])) + publishTo = random.sample(columnChannels[id], count) + for vi in publishTo: + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)}) + + if self.logger.isEnabledFor(logging.DEBUG): + for i in range(0, self.shape.numberNodes): + self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) + self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) def initLogger(self): + """It initializes the logger.""" logger = logging.getLogger("DAS") - logger.setLevel(self.logLevel) - ch = logging.StreamHandler() - ch.setLevel(self.logLevel) - ch.setFormatter(CustomFormatter()) - logger.addHandler(ch) + if len(logger.handlers) == 0: + logger.setLevel(self.logLevel) + ch = logging.StreamHandler() + ch.setLevel(self.logLevel) + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) self.logger = logger def resetShape(self, shape): + """It resets the parameters of the simulation.""" self.shape = shape self.result = Result(self.shape) for val in self.validators: val.shape.failureRate = shape.failureRate val.shape.chi = shape.chi + val.shape.vpn1 = shape.vpn1 + val.shape.vpn2 = shape.vpn2 + + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree def run(self): + """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() arrived, expected = self.glob.checkStatus(self.validators) @@ -102,17 +181,29 @@ class Simulator: while(True): missingVector.append(missingSamples) oldMissingSamples = missingSamples - for i in range(0,self.shape.numberValidators): - self.validators[i].sendRows() - self.validators[i].sendColumns() - for i in range(1,self.shape.numberValidators): + self.logger.debug("PHASE SEND %d" % steps, extra=self.format) + for i in range(0,self.shape.numberNodes): + self.validators[i].send() + self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() - for i in range(1,self.shape.numberValidators): + self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): self.validators[i].restoreRows() self.validators[i].restoreColumns() - for i in range(0,self.shape.numberValidators): + self.logger.debug("PHASE LOG %d" % steps, extra=self.format) + for i in range(0,self.shape.numberNodes): self.validators[i].logRows() self.validators[i].logColumns() + + # log TX and RX statistics + statsTxInSlot = [v.statsTxInSlot for v in self.validators] + statsRxInSlot = [v.statsRxInSlot for v in self.validators] + self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + (steps, statsTxInSlot[0], statsRxInSlot[0], + mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), + mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) @@ -120,7 +211,7 @@ class Simulator: missingRate = missingSamples*100/expected self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) if missingSamples == oldMissingSamples: - #self.logger.info("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) break elif missingSamples == 0: diff --git a/DAS/tools.py b/DAS/tools.py index b9e4a8e..2b47b11 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -1,27 +1,87 @@ #!/bin/python3 import logging +import sys +import random +from bitarray.util import zeros +class CustomFormatter(): + """This class defines the terminal output formatting.""" -class CustomFormatter(logging.Formatter): - - blue = "\x1b[34;20m" - grey = "\x1b[38;20m" - yellow = "\x1b[33;20m" - red = "\x1b[31;20m" - bold_red = "\x1b[31;1m" - reset = "\x1b[0m" - format = "%(levelname)s : %(entity)s : %(message)s" - - FORMATS = { - logging.DEBUG: grey + format + reset, - logging.INFO: blue + format + reset, - logging.WARNING: yellow + format + reset, - logging.ERROR: red + format + reset, - logging.CRITICAL: bold_red + format + reset - } + def __init__(self): + """Initializes 5 different formats for logging with different colors.""" + self.blue = "\x1b[34;20m" + self.grey = "\x1b[38;20m" + self.yellow = "\x1b[33;20m" + self.red = "\x1b[31;20m" + self.bold_red = "\x1b[31;1m" + self.reset = "\x1b[0m" + self.reformat = "%(levelname)s : %(entity)s : %(message)s" + self.FORMATS = { + logging.DEBUG: self.grey + self.reformat + self.reset, + logging.INFO: self.blue + self.reformat + self.reset, + logging.WARNING: self.yellow + self.reformat + self.reset, + logging.ERROR: self.red + self.reformat + self.reset, + logging.CRITICAL: self.bold_red + self.reformat + self.reset + } def format(self, record): + """Returns the formatter with the format corresponding to record.""" log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) +def shuffled(lis, shuffle=True): + """Generator yielding list in shuffled order.""" + # based on https://stackoverflow.com/a/60342323 + if shuffle: + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + else: + for v in lis: + yield v +def shuffledDict(d, shuffle=True): + """Generator yielding dictionary in shuffled order. + + Shuffle, except if not (optional parameter useful for experiment setup). + """ + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv + +def sampleLine(line, limit): + """Sample up to 'limit' bits from a bitarray. + + Since this is quite expensive, we use a number of heuristics to get it fast. + """ + if limit == sys.maxsize : + return line + else: + w = line.count(1) + if limit >= w : + return line + else: + l = len(line) + r = zeros(l) + if w < l/10 or limit > l/2 : + indices = [ i for i in range(l) if line[i] ] + sample = random.sample(indices, limit) + for i in sample: + r[i] = 1 + return r + else: + while limit: + i = random.randrange(0, l) + if line[i] and not r[i]: + r[i] = 1 + limit -= 1 + return r + +def unionOfSamples(population, sampleSize, times): + selected = set() + for t in range(times): + selected |= set(random.sample(population, sampleSize)) + return selected diff --git a/DAS/validator.py b/DAS/validator.py index 950fdea..f869171 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -4,57 +4,71 @@ import random import collections import logging from DAS.block import * -from bitarray import bitarray +from DAS.tools import shuffled, shuffledDict, unionOfSamples from bitarray.util import zeros +from collections import deque +from itertools import chain class Neighbor: + """This class implements a node neighbor to monitor sent and received data. + + It represents one side of a P2P link in the overlay. Sent and received + segments are monitored to avoid sending twice or sending back what was + received from a link. + """ def __repr__(self): - return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) + """It returns the amount of sent and received data.""" + return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) - def __init__(self, v, blockSize): + def __init__(self, v, dim, blockSize): + """It initializes the neighbor with the node and sets counters to zero.""" self.node = v + self.dim = dim # 0:row 1:col self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) + self.sendQueue = deque() + class Validator: - - ID = 0 - amIproposer = 0 - shape = [] - format = {} - logger = [] + """This class implements a validator/node in the network.""" def __repr__(self): + """It returns the validator ID.""" return str(self.ID) - def __init__(self, ID, amIproposer, logger, shape, rows, columns): + def __init__(self, ID, amIproposer, logger, shape, rows = None, columns = None): + """It initializes the validator with the logger shape and rows/columns. + + If rows/columns are specified these are observed, otherwise (default) + chi rows and columns are selected randomly. + """ + self.shape = shape FORMAT = "%(levelname)s : %(entity)s : %(message)s" self.ID = ID self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) + self.receivedQueue = deque() + self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: self.logger.error("Chi has to be greater than 0", extra=self.format) elif self.shape.chi > self.shape.blockSize: - self.logger.error("Chi has to be smaller than %d" % blockSize, extra=self.format) + self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) else: if amIproposer: self.rowIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize) else: - self.rowIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)] - self.columnIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)] #if shape.deterministic: # random.seed(self.ID) - #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.changedRow = {id:False for id in self.rowIDs} - self.changedColumn = {id:False for id in self.columnIDs} + vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 + self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) + self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -64,7 +78,29 @@ class Validator: self.statsRxInSlot = 0 self.statsRxPerSlot = [] + # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) + # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 + # TODO: this should be a parameter + if self.amIproposer: + self.bwUplink = shape.bwUplinkProd + elif self.ID <= shape.numberNodes * shape.class1ratio: + self.bwUplink = shape.bwUplink1 + else: + self.bwUplink = shape.bwUplink2 + + self.repairOnTheFly = True + self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + def logIDs(self): + """It logs the assigned rows and columns.""" if self.amIproposer == 1: self.logger.warning("I am a block proposer."% self.ID) else: @@ -72,14 +108,19 @@ class Validator: self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) def initBlock(self): - self.logger.debug("I am a block proposer.", extra=self.format) - self.block = Block(self.shape.blockSize) - self.block.fill() - #self.block.print() + """It initializes the block for the proposer.""" + if self.amIproposer == 1: + self.logger.debug("I am a block proposer.", extra=self.format) + self.block = Block(self.shape.blockSize) + self.block.fill() + #self.block.print() + else: + self.logger.warning("I am not a block proposer."% self.ID) def broadcastBlock(self): + """The block proposer broadcasts the block to all validators.""" if self.amIproposer == 0: - self.logger.error("I am NOT a block proposer", extra=self.format) + self.logger.warning("I am not a block proposer", extra=self.format) else: self.logger.debug("Broadcasting my block...", extra=self.format) order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] @@ -91,132 +132,352 @@ class Validator: else: self.block.data[i] = 0 - self.changedRow = {id:True for id in self.rowIDs} - self.changedColumn = {id:True for id in self.columnIDs} - nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) #broadcasted.print() def getColumn(self, index): + """It returns a given column.""" return self.block.getColumn(index) def getRow(self, index): + """It returns a given row.""" return self.block.getRow(index) - def receiveColumn(self, id, column, src): - if id in self.columnIDs: - # register receive so that we are not sending back - self.columnNeighbors[id][src].receiving |= column - self.receivedBlock.mergeColumn(id, column) - self.statsRxInSlot += column.count(1) + def receiveSegment(self, rID, cID, src): + """Receive a segment, register it, and queue for forwarding as needed.""" + # register receive so that we are not sending back + if rID in self.rowIDs: + if src in self.rowNeighbors[rID]: + self.rowNeighbors[rID][src].receiving[cID] = 1 + if cID in self.columnIDs: + if src in self.columnNeighbors[cID]: + self.columnNeighbors[cID][src].receiving[rID] = 1 + if not self.receivedBlock.getSegment(rID, cID): + self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.receivedBlock.setSegment(rID, cID) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) else: - pass + self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + # self.statsRxDuplicateInSlot += 1 + self.statsRxInSlot += 1 - def receiveRow(self, id, row, src): - if id in self.rowIDs: - # register receive so that we are not sending back - self.rowNeighbors[id][src].receiving |= row - self.receivedBlock.mergeRow(id, row) - self.statsRxInSlot += row.count(1) - else: - pass + def addToSendQueue(self, rID, cID): + """Queue a segment for forwarding.""" + if self.perNodeQueue: + self.sendQueue.append((rID, cID)) + if self.perNeighborQueue: + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) def receiveRowsColumns(self): + """Finalize time step by merging newly received segments in state.""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) - self.changedRow = { id: - self.getRow(id) != self.receivedBlock.getRow(id) - for id in self.rowIDs - } - - self.changedColumn = { id: - self.getColumn(id) != self.receivedBlock.getColumn(id) - for id in self.columnIDs - } - self.block.merge(self.receivedBlock) - for neighs in self.rowNeighbors.values(): + for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()): for neigh in neighs.values(): neigh.received |= neigh.receiving neigh.receiving.setall(0) - for neighs in self.columnNeighbors.values(): - for neigh in neighs.values(): - neigh.received |= neigh.receiving - neigh.receiving.setall(0) + # add newly received segments to the send queue + if self.perNodeQueue or self.perNeighborQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + self.addToSendQueue(rID, cID) + def updateStats(self): + """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 self.statsTxInSlot = 0 + def checkSegmentToNeigh(self, rID, cID, neigh): + """Check if a segment should be sent to a neighbor.""" + if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + return False # sent enough, other side can restore + i = rID if neigh.dim else cID + if not neigh.sent[i] and not neigh.received[i] : + return True + else: + return False # received or already sent - def sendColumn(self, columnID): - line = self.getColumn(columnID) - if line.any(): - self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID].values(): + def sendSegmentToNeigh(self, rID, cID, neigh): + """Send segment to a neighbor (without checks).""" + self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + i = rID if neigh.dim else cID + neigh.sent[i] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveColumn(columnID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + def checkSendSegmentToNeigh(self, rID, cID, neigh): + """Check and send a segment to a neighbor if needed.""" + if self.checkSegmentToNeigh(rID, cID, neigh): + self.sendSegmentToNeigh(rID, cID, neigh) + return True + else: + return False - def sendRow(self, rowID): - line = self.getRow(rowID) - if line.any(): - self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID].values(): + def processSendQueue(self): + """Send out segments from queue until bandwidth limit reached. - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveRow(rowID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + SendQueue is a centralized queue from which segments are sent out + in FIFO order to all interested neighbors. + """ + while self.sendQueue: + (rID, cID) = self.sendQueue[0] - def sendRows(self): - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + if rID in self.rowIDs: + for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) - def sendColumns(self): - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + if self.statsTxInSlot >= self.bwUplink: + return + + if cID in self.columnIDs: + for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + self.sendQueue.popleft() + + def processPerNeighborSendQueue(self): + """Send out segments from per-neighbor queues until bandwidth limit reached. + + Segments are dispatched from per-neighbor transmission queues in a shuffled + round-robin order, emulating a type of fair queuing. Since neighborhood is + handled at the topic (column or row) level, fair queuing is also at the level + of flows per topic and per peer. A per-peer model might be closer to the + reality of libp2p implementations where topics between two nodes are + multiplexed over the same transport. + """ + progress = True + while (progress): + progress = False + + queues = [] + # collect and shuffle + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((0, rID, neigh)) + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((1, cID, neigh)) + + for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): + if dim == 0: + self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + def runSegmentShuffleScheduler(self): + """ Schedule chunks for sending. + + This scheduler check which owned segments needs sending (at least + one neighbor needing it). Then it sends each segment that's worth sending + once, in shuffled order. This is repeated until bw limit. + """ + + def collectSegmentsToSend(): + # yields list of segments to send as (dim, lineID, id) + segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((1, cID, i)) + + return segmentsToSend + + def nextSegment(): + while True: + # send each collected segment once + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + for dim, lineID, id in self.segmentShuffleGen: + if dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(lineID, id, neigh): + yield((lineID, id, neigh)) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(id, lineID, neigh): + yield((id, lineID, neigh)) + break + + # collect segments for next round + segmentsToSend = collectSegmentsToSend() + + # finish if empty or set up shuffled generator based on collected segments + if not segmentsToSend: + break + else: + self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines) + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentShuffleGen = None + return + + def runDumbRandomScheduler(self, tries = 100): + """Random scheduler picking segments at random. + + This scheduler implements a simple random scheduling order picking + segments at random and peers potentially interested in that segment + also at random. + It serves more as a performance baseline than as a realistic model. + """ + + def nextSegment(): + t = tries + while t: + if self.rowIDs: + rID = random.choice(self.rowIDs) + cID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.rowNeighbors[rID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + if self.columnIDs: + cID = random.choice(self.columnIDs) + rID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.columnNeighbors[cID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + t -= 1 + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + def send(self): + """ Send as much as we can in the timestep, limited by bwUplink.""" + + # process node level send queue + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process neighbor level send queues in shuffled breadth-first order + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return + + if self.dumbRandomScheduler: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): + """It logs the rows assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): for id in self.rowIDs: self.logger.debug("Row %d: %s", id, self.getRow(id), extra=self.format) def logColumns(self): + """It logs the columns assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): for id in self.columnIDs: self.logger.debug("Column %d: %s", id, self.getColumn(id), extra=self.format) def restoreRows(self): - for id in self.rowIDs: - self.block.repairRow(id) + """It restores the rows assigned to the validator, that can be repaired.""" + if self.repairOnTheFly: + for id in self.rowIDs: + self.restoreRow(id) + + def restoreRow(self, id): + """Restore a given row if repairable.""" + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): - for id in self.columnIDs: - self.block.repairColumn(id) + """It restores the columns assigned to the validator, that can be repaired.""" + if self.repairOnTheFly: + for id in self.columnIDs: + self.restoreColumn(id) + + def restoreColumn(self, id): + """Restore a given column if repairable.""" + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): + """It checks how many expected/arrived samples are for each assigned row/column.""" arrived = 0 expected = 0 for id in self.columnIDs: diff --git a/DAS/visualizer.py b/DAS/visualizer.py index bb8c527..c44d32a 100644 --- a/DAS/visualizer.py +++ b/DAS/visualizer.py @@ -12,7 +12,8 @@ class Visualizer: def __init__(self, execID): self.execID = execID self.folderPath = "results/"+self.execID - self.parameters = ['run', 'blockSize', 'failureRate', 'numberValidators', 'netDegree', 'chi'] + self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', + 'chi', 'vpn1', 'vpn2', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] self.minimumDataPoints = 2 def plottingData(self): @@ -27,22 +28,28 @@ class Visualizer: run = int(root.find('run').text) blockSize = int(root.find('blockSize').text) failureRate = int(root.find('failureRate').text) - numberValidators = int(root.find('numberValidators').text) + numberNodes = int(root.find('numberNodes').text) netDegree = int(root.find('netDegree').text) chi = int(root.find('chi').text) + vpn1 = int(root.find('vpn1').text) + vpn2 = int(root.find('vpn2').text) + bwUplinkProd = int(root.find('bwUplinkProd').text) + bwUplink1 = int(root.find('bwUplink1').text) + bwUplink2 = int(root.find('bwUplink2').text) tta = int(root.find('tta').text) """Loop over all possible combinations of length 4 of the parameters""" - for combination in combinations(self.parameters, 4): - """Get the indices and values of the parameters in the combination""" + for combination in combinations(self.parameters, len(self.parameters)-2): + # Get the indices and values of the parameters in the combination + indices = [self.parameters.index(element) for element in combination] - selectedValues = [run, blockSize, failureRate, numberValidators, netDegree, chi] + selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, bwUplinkProd, bwUplink1, bwUplink2] values = [selectedValues[index] for index in indices] names = [self.parameters[i] for i in indices] keyComponents = [f"{name}_{value}" for name, value in zip(names, values)] - key = tuple(keyComponents[:4]) + key = tuple(keyComponents[:len(self.parameters)-2]) """Get the names of the other 2 parameters that are not included in the key""" - otherParams = [self.parameters[i] for i in range(6) if i not in indices] + otherParams = [self.parameters[i] for i in range(len(self.parameters)) if i not in indices] """Append the values of the other 2 parameters and the ttas to the lists for the key""" otherIndices = [i for i in range(len(self.parameters)) if i not in indices] @@ -119,7 +126,7 @@ class Visualizer: """Label formatting for the figures""" result = ''.join([f" {char}" if char.isupper() else char for char in label]) return result.title() - + def formatTitle(self, key): """Title formatting for the figures""" name = ''.join([f" {char}" if char.isupper() else char for char in key.split('_')[0]]) @@ -132,7 +139,7 @@ class Visualizer: data = self.averageRuns(data) filteredKeys = self.similarKeys(data) print("Plotting heatmaps...") - + """Create the directory if it doesn't exist already""" heatmapsFolder = self.folderPath + '/heatmaps' if not os.path.exists(heatmapsFolder): diff --git a/config.das b/config.das deleted file mode 100644 index 052f754..0000000 --- a/config.das +++ /dev/null @@ -1,28 +0,0 @@ -[Simulation Space] - -numberValidatorStart = 256 -numberValidatorStop = 512 -numberValidatorStep = 128 - -failureRateStart = 10 -failureRateStop = 90 -failureRateStep = 40 - -blockSizeStart = 32 -blockSizeStop = 64 -blockSizeStep = 16 - -netDegreeStart = 6 -netDegreeStop = 8 -netDegreeStep = 1 - -chiStart = 4 -chiStop = 8 -chiStep = 2 - - -[Advanced] - -deterministic = 0 -numberRuns = 2 -dumpXML = 1 diff --git a/config_example.py b/config_example.py new file mode 100644 index 0000000..af55fc2 --- /dev/null +++ b/config_example.py @@ -0,0 +1,76 @@ +"""Example configuration file + +This file illustrates how to define options and simulation parameter ranges. +It also defines the traversal order of the simulation space. As the file +extension suggests, configuration is pure python code, allowing complex +setups. Use at your own risk. + +To use this example, run + python3 study.py config_example + +Otherwise copy it and modify as needed. The default traversal order defined +in the nested loop of nextShape() is good for most cases, but customizable +if needed. +""" + +import logging +import itertools +import numpy as np +from DAS.shape import Shape + +dumpXML = 1 +visualization = 1 +logLevel = logging.INFO + +# number of parallel workers. -1: all cores; 1: sequential +# for more details, see joblib.Parallel +numJobs = 3 + +# distribute rows/columns evenly between validators (True) +# or generate it using local randomness (False) +evenLineDistribution = True + +# Number of simulation runs with the same parameters for statistical relevance +runs = range(10) + +# Number of validators +numberNodes = range(256, 513, 128) + +# Percentage of block not released by producer +failureRates = range(10, 91, 40) + +# Block size in one dimension in segments. Block is blockSizes * blockSizes segments. +blockSizes = range(32,65,16) + +# Per-topic mesh neighborhood size +netDegrees = range(6, 9, 2) + +# number of rows and columns a validator is interested in +chis = range(1, 5, 2) + +# ratio of class1 nodes (see below for parameters per class) +class1ratios = np.arange(0, 1, .2) + +# Number of validators per beacon node +validatorsPerNode1 = [1] +validatorsPerNode2 = [2, 4, 8, 16, 32] + +# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) +# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 +bwUplinksProd = [2200] +bwUplinks1 = [110] +bwUplinks2 = [2200] + +# Set to True if you want your run to be deterministic, False if not +deterministic = False + +# If your run is deterministic you can decide the random seed. This is ignore otherwise. +randomSeed = "DAS" + +def nextShape(): + for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( + runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): + # Network Degree has to be an even number + if netDegree % 2 == 0: + shape = Shape(blockSize, nn, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) + yield shape diff --git a/doc/Makefile b/doc/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/doc/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/doc/conf.py b/doc/conf.py new file mode 100644 index 0000000..7524d9c --- /dev/null +++ b/doc/conf.py @@ -0,0 +1,64 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +import os +import sys +sys.path.insert(0, os.path.abspath('../DAS')) + + +# -- Project information ----------------------------------------------------- + +project = 'DAS simulator' +copyright = '2023, Leonardo A. Bautista-Gomez, Csaba Kiraly' +author = 'Leonardo A. Bautista-Gomez, Csaba Kiraly' + +# The full version, including alpha/beta/rc tags +release = '1' + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = ['sphinx.ext.autodoc' +] + + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'myenv'] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'alabaster' + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + + +# -- Options for autodoc ------------------------------------------------- + +autodoc_mock_imports = ["django", "dicttoxml", "bitarray", "DAS", "networkx"] + + + diff --git a/doc/index.rst b/doc/index.rst new file mode 100644 index 0000000..e253201 --- /dev/null +++ b/doc/index.rst @@ -0,0 +1,44 @@ +.. DAS simulator documentation master file, created by + sphinx-quickstart on Wed Feb 8 20:56:44 2023. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to DAS simulator's documentation! +========================================= + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + +.. automodule:: block + :members: + +.. automodule:: configuration + :members: + +.. automodule:: observer + :members: + +.. automodule:: results + :members: + +.. automodule:: shape + :members: + +.. automodule:: simulator + :members: + +.. automodule:: tools + :members: + +.. automodule:: validator + :members: + + diff --git a/doc/make.bat b/doc/make.bat new file mode 100644 index 0000000..153be5e --- /dev/null +++ b/doc/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/study.py b/study.py index 438fba4..2b2aab6 100644 --- a/study.py +++ b/study.py @@ -1,58 +1,65 @@ #! /bin/python3 import time, sys, random, copy +import importlib +from joblib import Parallel, delayed from DAS import * +# Parallel execution: +# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see +# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop +# For fixing logging issues in parallel execution, see +# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls +# and https://github.com/joblib/joblib/issues/1017 + +def runOnce(sim, config, shape): + if config.deterministic: + shape.setSeed(config.randomSeed+"-"+str(shape)) + random.seed(shape.randomSeed) + + sim.initLogger() + sim.resetShape(shape) + sim.initValidators() + sim.initNetwork() + result = sim.run() + sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + return result def study(): if len(sys.argv) < 2: print("You need to pass a configuration file in parameter") exit(1) - config = Configuration(sys.argv[1]) - shape = Shape(0, 0, 0, 0, 0, 0) - sim = Simulator(shape) + try: + config = importlib.import_module(sys.argv[1]) + except ModuleNotFoundError as e: + try: + config = importlib.import_module(str(sys.argv[1]).replace(".py", "")) + except ModuleNotFoundError as e: + print(e) + print("You need to pass a configuration file in parameter") + exit(1) + + shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + sim = Simulator(shape, config) sim.initLogger() results = [] - simCnt = 0 now = datetime.now() execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) sim.logger.info("Starting simulations:", extra=sim.format) start = time.time() - - for run in range(config.numberRuns): - for nv in range(config.nvStart, config.nvStop+1, config.nvStep): - for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep): - for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep): - for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep): - for chi in range(config.chiStart, config.chiStop+1, config.chiStep): - - if not config.deterministic: - random.seed(datetime.now()) - - # Network Degree has to be an even number - if netDegree % 2 == 0: - shape = Shape(blockSize, nv, fr, chi, netDegree, run) - sim.resetShape(shape) - sim.initValidators() - sim.initNetwork() - result = sim.run() - sim.logger.info("Shape: %s ... Block Available: %d" % (str(sim.shape.__dict__), result.blockAvailable), extra=sim.format) - results.append(copy.deepcopy(result)) - simCnt += 1 - + results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) end = time.time() - sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format) + sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) if config.dumpXML: for res in results: res.dump(execID) sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) - visualization = 1 - if visualization: + if config.visualization: vis = Visualizer(execID) vis.plotHeatmaps()