Merge branch 'develop' into vis

This commit is contained in:
HajarZaiz 2023-03-26 13:01:07 +00:00 committed by GitHub
commit aa99601826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 944 additions and 301 deletions

4
.gitignore vendored
View File

@ -1,5 +1,7 @@
*.swp
*.pyc
results/*
myenv
doc/_build
!results/plots.py
Frontend/
Frontend/

View File

@ -1,4 +1,3 @@
from DAS.simulator import *
from DAS.configuration import *
from DAS.shape import *
from DAS.visualizer import *

View File

@ -5,43 +5,73 @@ from bitarray import bitarray
from bitarray.util import zeros
class Block:
blockSize = 0
data = bitarray()
"""This class represents a block in the Ethereum blockchain."""
def __init__(self, blockSize):
"""Initialize the block with a data array of blocksize^2 zeros."""
self.blockSize = blockSize
self.data = zeros(self.blockSize*self.blockSize)
def fill(self):
"""It fills the block data with ones."""
self.data.setall(1)
def merge(self, merged):
"""It merges (OR) the existing block with the received one."""
self.data |= merged.data
def getSegment(self, rowID, columnID):
"""Check whether a segment is included"""
return self.data[rowID*self.blockSize + columnID]
def setSegment(self, rowID, columnID, value = 1):
"""Set value for a segment (default 1)"""
self.data[rowID*self.blockSize + columnID] = value
def getColumn(self, columnID):
"""It returns the block column corresponding to columnID."""
return self.data[columnID::self.blockSize]
def mergeColumn(self, columnID, column):
"""It merges (OR) the existing column with the received one."""
self.data[columnID::self.blockSize] |= column
def repairColumn(self, id):
success = self.data[id::self.blockSize].count(1)
"""It repairs the entire column if it has at least blockSize/2 ones.
Returns: list of repaired segments
"""
line = self.data[id::self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id::self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def getRow(self, rowID):
"""It returns the block row corresponding to rowID."""
return self.data[rowID*self.blockSize:(rowID+1)*self.blockSize]
def mergeRow(self, rowID, row):
"""It merges (OR) the existing row with the received one."""
self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row
def repairRow(self, id):
success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1)
"""It repairs the entire row if it has at least blockSize/2 ones.
Returns: list of repaired segments.
"""
line = self.data[id*self.blockSize:(id+1)*self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id*self.blockSize:(id+1)*self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def print(self):
"""It prints the block in the terminal (outside of the logger rules))."""
dash = "-" * (self.blockSize+2)
print(dash)
for i in range(self.blockSize):

View File

@ -1,49 +0,0 @@
#!/bin/python3
import configparser
class Configuration:
deterministic = 0
def __init__(self, fileName):
config = configparser.RawConfigParser()
config.read(fileName)
self.nvStart = int(config.get("Simulation Space", "numberValidatorStart"))
self.nvStop = int(config.get("Simulation Space", "numberValidatorStop"))
self.nvStep = int(config.get("Simulation Space", "numberValidatorStep"))
self.blockSizeStart = int(config.get("Simulation Space", "blockSizeStart"))
self.blockSizeStop = int(config.get("Simulation Space", "blockSizeStop"))
self.blockSizeStep = int(config.get("Simulation Space", "blockSizeStep"))
self.netDegreeStart = int(config.get("Simulation Space", "netDegreeStart"))
self.netDegreeStop = int(config.get("Simulation Space", "netDegreeStop"))
self.netDegreeStep = int(config.get("Simulation Space", "netDegreeStep"))
self.failureRateStart = int(config.get("Simulation Space", "failureRateStart"))
self.failureRateStop = int(config.get("Simulation Space", "failureRateStop"))
self.failureRateStep = int(config.get("Simulation Space", "failureRateStep"))
self.chiStart = int(config.get("Simulation Space", "chiStart"))
self.chiStop = int(config.get("Simulation Space", "chiStop"))
self.chiStep = int(config.get("Simulation Space", "chiStep"))
self.numberRuns = int(config.get("Advanced", "numberRuns"))
self.deterministic = config.get("Advanced", "deterministic")
self.dumpXML = config.get("Advanced", "dumpXML")
if self.nvStop < (self.blockSizeStart*4):
print("ERROR: The number of validators cannot be lower than the block size * 4")
exit(1)
if self.chiStart < 1:
print("Chi has to be greater than 0")
exit(1)
if self.chiStop > self.blockSizeStart:
print("Chi (%d) has to be smaller or equal to block the size (%d)" % (self.chiStop, self.blockSizeStart))
exit(1)

View File

@ -3,21 +3,22 @@
from DAS.block import *
class Observer:
block = []
rows = []
columns = []
goldenData = []
broadcasted = []
config = []
logger = []
"""This class gathers global data from the simulation, like an 'all-seen god'."""
def __init__(self, logger, config):
"""It initializes the observer with a logger and given configuration."""
self.config = config
self.format = {"entity": "Observer"}
self.logger = logger
self.block = []
self.rows = []
self.columns = []
self.goldenData = []
self.broadcasted = []
def reset(self):
"""It resets all the gathered data to zeros."""
self.block = [0] * self.config.blockSize * self.config.blockSize
self.goldenData = [0] * self.config.blockSize * self.config.blockSize
self.rows = [0] * self.config.blockSize
@ -25,6 +26,7 @@ class Observer:
self.broadcasted = Block(self.config.blockSize)
def checkRowsColumns(self, validators):
"""It checks how many validators have been assigned to each row and column."""
for val in validators:
if val.amIproposer == 0:
for r in val.rowIDs:
@ -38,10 +40,12 @@ class Observer:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
def setGoldenData(self, block):
"""Stores the original real data to compare it with future situations."""
for i in range(self.config.blockSize*self.config.blockSize):
self.goldenData[i] = block.data[i]
def checkBroadcasted(self):
"""It checks how many broadcasted samples are still missing in the network."""
zeros = 0
for i in range(self.blockSize * self.blockSize):
if self.broadcasted.data[i] == 0:
@ -51,6 +55,7 @@ class Observer:
return zeros
def checkStatus(self, validators):
"""It checks the status of how many expected and arrived samples globally."""
arrived = 0
expected = 0
for val in validators:

View File

@ -1,3 +1,7 @@
bitarray==2.6.0
DAS==0.28.7
DAS==0.29.0
dicttoxml==1.7.16
matplotlib==3.6.2
networkx==3.0
numpy==1.23.5
seaborn==0.12.2

View File

@ -5,19 +5,17 @@ from xml.dom import minidom
from dicttoxml import dicttoxml
class Result:
shape = []
missingVector = []
blockAvailable = -1
tta = -1
"""This class stores and process/store the results of a simulation."""
def __init__(self, shape):
"""It initializes the instance with a specific shape."""
self.shape = shape
self.blockAvailable = -1
self.tta = -1
self.missingVector = []
def populate(self, shape, missingVector):
"""It populates part of the result data inside a vector."""
self.shape = shape
self.missingVector = missingVector
missingSamples = missingVector[-1]
@ -29,6 +27,7 @@ class Result:
self.tta = -1
def dump(self, execID):
"""It dumps the results of the simulation in an XML file."""
if not os.path.exists("results"):
os.makedirs("results")
if not os.path.exists("results/"+execID):
@ -40,11 +39,6 @@ class Result:
resXml = dicttoxml(resd1)
xmlstr = minidom.parseString(resXml)
xmlPretty = xmlstr.toprettyxml()
filePath = "results/"+execID+"/nbv-"+str(self.shape.numberValidators)+\
"-bs-"+str(self.shape.blockSize)+\
"-nd-"+str(self.shape.netDegree)+\
"-fr-"+str(self.shape.failureRate)+\
"-chi-"+str(self.shape.chi)+\
"-r-"+str(self.shape.run)+".xml"
filePath = "results/"+execID+"/"+str(self.shape)+".xml"
with open(filePath, "w") as f:
f.write(xmlPretty)

View File

@ -1,21 +1,42 @@
#!/bin/python3
class Shape:
run = 0
numberValidators = 0
blockSize = 0
failureRate = 0
netDegree = 0
chi = 0
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSize, numberValidators, failureRate, chi, netDegree, run):
def __init__(self, blockSize, numberNodes, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
"""Initializes the shape with the parameters passed in argument."""
self.run = run
self.numberValidators = numberValidators
self.numberNodes = numberNodes
self.blockSize = blockSize
self.failureRate = failureRate
self.netDegree = netDegree
self.class1ratio = class1ratio
self.chi = chi
self.vpn1 = vpn1
self.vpn2 = vpn2
self.bwUplinkProd = bwUplinkProd
self.bwUplink1 = bwUplink1
self.bwUplink2 = bwUplink2
self.randomSeed = ""
def __repr__(self):
"""Returns a printable representation of the shape"""
shastr = ""
shastr += "bs-"+str(self.blockSize)
shastr += "-nn-"+str(self.numberNodes)
shastr += "-fr-"+str(self.failureRate)
shastr += "-c1r-"+str(self.class1ratio)
shastr += "-chi-"+str(self.chi)
shastr += "-vpn1-"+str(self.vpn1)
shastr += "-vpn2-"+str(self.vpn2)
shastr += "-bwupprod-"+str(self.bwUplinkProd)
shastr += "-bwup1-"+str(self.bwUplink1)
shastr += "-bwup2-"+str(self.bwUplink2)
shastr += "-nd-"+str(self.netDegree)
shastr += "-r-"+str(self.run)
return shastr
def setSeed(self, seed):
"""Adds the random seed to the shape"""
self.randomSeed = seed

View File

@ -3,96 +3,175 @@
import networkx as nx
import logging, random
from datetime import datetime
from statistics import mean
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.validator import *
class Simulator:
"""This class implements the main DAS simulator."""
proposerID = 0
logLevel = logging.INFO
validators = []
glob = []
result = []
shape = []
logger = []
format = {}
def __init__(self, shape):
def __init__(self, shape, config):
"""It initializes the simulation with a set of parameters (shape)."""
self.shape = shape
self.config = config
self.format = {"entity": "Simulator"}
self.result = Result(self.shape)
self.validators = []
self.logger = []
self.logLevel = config.logLevel
self.proposerID = 0
self.glob = []
def initValidators(self):
"""It initializes all the validators in the network."""
self.glob = Observer(self.logger, self.shape)
self.glob.reset()
self.validators = []
rows = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
columns = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
random.shuffle(rows)
random.shuffle(columns)
for i in range(self.shape.numberValidators):
val = Validator(i, int(not i!=0), self.logger, self.shape, rows, columns)
if self.config.evenLineDistribution:
lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1)
heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2)
totalValidators = lightVal + heavyVal
rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1)
columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1)
offset = heavyVal*self.shape.chi
random.shuffle(rows)
random.shuffle(columns)
for i in range(self.shape.numberNodes):
if self.config.evenLineDistribution:
if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes
start = i *self.shape.chi*self.shape.vpn2
end = (i+1)*self.shape.chi*self.shape.vpn2
else: # Then the solo stakers
j = i - int(heavyVal/self.shape.vpn2)
start = offset+( j *self.shape.chi)
end = offset+((j+1)*self.shape.chi)
r = rows[start:end]
c = columns[start:end]
val = Validator(i, int(not i!=0), self.logger, self.shape, r, c)
else:
val = Validator(i, int(not i!=0), self.logger, self.shape)
if i == self.proposerID:
val.initBlock()
self.glob.setGoldenData(val.block)
else:
val.logIDs()
self.validators.append(val)
self.logger.debug("Validators initialized.", extra=self.format)
def initNetwork(self):
"""It initializes the simulated network."""
rowChannels = [[] for i in range(self.shape.blockSize)]
columnChannels = [[] for i in range(self.shape.blockSize)]
for v in self.validators:
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
if not (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
# Check rows/columns distribution
#totalR = 0
#totalC = 0
#for r in rowChannels:
# totalR += len(r)
#for c in columnChannels:
# totalC += len(c)
for id in range(self.shape.blockSize):
if (len(rowChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(rowChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
# If the number of nodes in a channel is smaller or equal to the
# requested degree, a fully connected graph is used. For n>d, a random
# d-regular graph is set up. (For n=d+1, the two are the same.)
if not rowChannels[id]:
self.logger.error("No nodes for row %d !" % id, extra=self.format)
continue
elif (len(rowChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(rowChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(rowChannels[id]))
else:
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for row %d !" % id, extra=self.format)
for u, v in G.edges:
val1=rowChannels[id][u]
val2=rowChannels[id][v]
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)})
if (len(columnChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not columnChannels[id]:
self.logger.error("No nodes for column %d !" % id, extra=self.format)
continue
elif (len(columnChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(columnChannels[id]))
else:
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for column %d !" % id, extra=self.format)
for u, v in G.edges:
val1=columnChannels[id][u]
val2=columnChannels[id][v]
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)})
for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
count = min(self.proposerPublishTo, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)})
for id in v.columnIDs:
count = min(self.proposerPublishTo, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)})
if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberNodes):
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
def initLogger(self):
"""It initializes the logger."""
logger = logging.getLogger("DAS")
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
if len(logger.handlers) == 0:
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
self.logger = logger
def resetShape(self, shape):
"""It resets the parameters of the simulation."""
self.shape = shape
self.result = Result(self.shape)
for val in self.validators:
val.shape.failureRate = shape.failureRate
val.shape.chi = shape.chi
val.shape.vpn1 = shape.vpn1
val.shape.vpn2 = shape.vpn2
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock()
arrived, expected = self.glob.checkStatus(self.validators)
@ -102,17 +181,29 @@ class Simulator:
while(True):
missingVector.append(missingSamples)
oldMissingSamples = missingSamples
for i in range(0,self.shape.numberValidators):
self.validators[i].sendRows()
self.validators[i].sendColumns()
for i in range(1,self.shape.numberValidators):
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].send()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()
for i in range(1,self.shape.numberValidators):
self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].restoreRows()
self.validators[i].restoreColumns()
for i in range(0,self.shape.numberValidators):
self.logger.debug("PHASE LOG %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].logRows()
self.validators[i].logColumns()
# log TX and RX statistics
statsTxInSlot = [v.statsTxInSlot for v in self.validators]
statsRxInSlot = [v.statsRxInSlot for v in self.validators]
self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" %
(steps, statsTxInSlot[0], statsRxInSlot[0],
mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].updateStats()
arrived, expected = self.glob.checkStatus(self.validators)
@ -120,7 +211,7 @@ class Simulator:
missingRate = missingSamples*100/expected
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples:
#self.logger.info("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
missingVector.append(missingSamples)
break
elif missingSamples == 0:

View File

@ -1,27 +1,87 @@
#!/bin/python3
import logging
import sys
import random
from bitarray.util import zeros
class CustomFormatter():
"""This class defines the terminal output formatting."""
class CustomFormatter(logging.Formatter):
blue = "\x1b[34;20m"
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "%(levelname)s : %(entity)s : %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: blue + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def __init__(self):
"""Initializes 5 different formats for logging with different colors."""
self.blue = "\x1b[34;20m"
self.grey = "\x1b[38;20m"
self.yellow = "\x1b[33;20m"
self.red = "\x1b[31;20m"
self.bold_red = "\x1b[31;1m"
self.reset = "\x1b[0m"
self.reformat = "%(levelname)s : %(entity)s : %(message)s"
self.FORMATS = {
logging.DEBUG: self.grey + self.reformat + self.reset,
logging.INFO: self.blue + self.reformat + self.reset,
logging.WARNING: self.yellow + self.reformat + self.reset,
logging.ERROR: self.red + self.reformat + self.reset,
logging.CRITICAL: self.bold_red + self.reformat + self.reset
}
def format(self, record):
"""Returns the formatter with the format corresponding to record."""
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def shuffled(lis, shuffle=True):
"""Generator yielding list in shuffled order."""
# based on https://stackoverflow.com/a/60342323
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
"""Generator yielding dictionary in shuffled order.
Shuffle, except if not (optional parameter useful for experiment setup).
"""
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
"""Sample up to 'limit' bits from a bitarray.
Since this is quite expensive, we use a number of heuristics to get it fast.
"""
if limit == sys.maxsize :
return line
else:
w = line.count(1)
if limit >= w :
return line
else:
l = len(line)
r = zeros(l)
if w < l/10 or limit > l/2 :
indices = [ i for i in range(l) if line[i] ]
sample = random.sample(indices, limit)
for i in sample:
r[i] = 1
return r
else:
while limit:
i = random.randrange(0, l)
if line[i] and not r[i]:
r[i] = 1
limit -= 1
return r
def unionOfSamples(population, sampleSize, times):
selected = set()
for t in range(times):
selected |= set(random.sample(population, sampleSize))
return selected

View File

@ -4,57 +4,71 @@ import random
import collections
import logging
from DAS.block import *
from bitarray import bitarray
from DAS.tools import shuffled, shuffledDict, unionOfSamples
from bitarray.util import zeros
from collections import deque
from itertools import chain
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data.
It represents one side of a P2P link in the overlay. Sent and received
segments are monitored to avoid sending twice or sending back what was
received from a link.
"""
def __repr__(self):
return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1))
"""It returns the amount of sent and received data."""
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
def __init__(self, v, blockSize):
def __init__(self, v, dim, blockSize):
"""It initializes the neighbor with the node and sets counters to zero."""
self.node = v
self.dim = dim # 0:row 1:col
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
self.sendQueue = deque()
class Validator:
ID = 0
amIproposer = 0
shape = []
format = {}
logger = []
"""This class implements a validator/node in the network."""
def __repr__(self):
"""It returns the validator ID."""
return str(self.ID)
def __init__(self, ID, amIproposer, logger, shape, rows, columns):
def __init__(self, ID, amIproposer, logger, shape, rows = None, columns = None):
"""It initializes the validator with the logger shape and rows/columns.
If rows/columns are specified these are observed, otherwise (default)
chi rows and columns are selected randomly.
"""
self.shape = shape
FORMAT = "%(levelname)s : %(entity)s : %(message)s"
self.ID = ID
self.format = {"entity": "Val "+str(self.ID)}
self.block = Block(self.shape.blockSize)
self.receivedBlock = Block(self.shape.blockSize)
self.receivedQueue = deque()
self.sendQueue = deque()
self.amIproposer = amIproposer
self.logger = logger
if self.shape.chi < 1:
self.logger.error("Chi has to be greater than 0", extra=self.format)
elif self.shape.chi > self.shape.blockSize:
self.logger.error("Chi has to be smaller than %d" % blockSize, extra=self.format)
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format)
else:
if amIproposer:
self.rowIDs = range(shape.blockSize)
self.columnIDs = range(shape.blockSize)
else:
self.rowIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
self.columnIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
#if shape.deterministic:
# random.seed(self.ID)
#self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
#self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
self.changedRow = {id:False for id in self.rowIDs}
self.changedColumn = {id:False for id in self.columnIDs}
vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2
self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn)
self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn)
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
@ -64,7 +78,29 @@ class Validator:
self.statsRxInSlot = 0
self.statsRxPerSlot = []
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
# TODO: this should be a parameter
if self.amIproposer:
self.bwUplink = shape.bwUplinkProd
elif self.ID <= shape.numberNodes * shape.class1ratio:
self.bwUplink = shape.bwUplink1
else:
self.bwUplink = shape.bwUplink2
self.repairOnTheFly = True
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
self.dumbRandomScheduler = False # dumb random scheduler
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
self.logger.warning("I am a block proposer."% self.ID)
else:
@ -72,14 +108,19 @@ class Validator:
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
def initBlock(self):
self.logger.debug("I am a block proposer.", extra=self.format)
self.block = Block(self.shape.blockSize)
self.block.fill()
#self.block.print()
"""It initializes the block for the proposer."""
if self.amIproposer == 1:
self.logger.debug("I am a block proposer.", extra=self.format)
self.block = Block(self.shape.blockSize)
self.block.fill()
#self.block.print()
else:
self.logger.warning("I am not a block proposer."% self.ID)
def broadcastBlock(self):
"""The block proposer broadcasts the block to all validators."""
if self.amIproposer == 0:
self.logger.error("I am NOT a block proposer", extra=self.format)
self.logger.warning("I am not a block proposer", extra=self.format)
else:
self.logger.debug("Broadcasting my block...", extra=self.format)
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
@ -91,132 +132,352 @@ class Validator:
else:
self.block.data[i] = 0
self.changedRow = {id:True for id in self.rowIDs}
self.changedColumn = {id:True for id in self.columnIDs}
nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
#broadcasted.print()
def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
def getRow(self, index):
"""It returns a given row."""
return self.block.getRow(index)
def receiveColumn(self, id, column, src):
if id in self.columnIDs:
# register receive so that we are not sending back
self.columnNeighbors[id][src].receiving |= column
self.receivedBlock.mergeColumn(id, column)
self.statsRxInSlot += column.count(1)
def receiveSegment(self, rID, cID, src):
"""Receive a segment, register it, and queue for forwarding as needed."""
# register receive so that we are not sending back
if rID in self.rowIDs:
if src in self.rowNeighbors[rID]:
self.rowNeighbors[rID][src].receiving[cID] = 1
if cID in self.columnIDs:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
else:
pass
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1
self.statsRxInSlot += 1
def receiveRow(self, id, row, src):
if id in self.rowIDs:
# register receive so that we are not sending back
self.rowNeighbors[id][src].receiving |= row
self.receivedBlock.mergeRow(id, row)
self.statsRxInSlot += row.count(1)
else:
pass
def addToSendQueue(self, rID, cID):
"""Queue a segment for forwarding."""
if self.perNodeQueue:
self.sendQueue.append((rID, cID))
if self.perNeighborQueue:
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
neigh.sendQueue.append(cID)
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
neigh.sendQueue.append(rID)
def receiveRowsColumns(self):
"""Finalize time step by merging newly received segments in state."""
if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.changedRow = { id:
self.getRow(id) != self.receivedBlock.getRow(id)
for id in self.rowIDs
}
self.changedColumn = { id:
self.getColumn(id) != self.receivedBlock.getColumn(id)
for id in self.columnIDs
}
self.block.merge(self.receivedBlock)
for neighs in self.rowNeighbors.values():
for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()):
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
for neighs in self.columnNeighbors.values():
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
# add newly received segments to the send queue
if self.perNodeQueue or self.perNeighborQueue:
while self.receivedQueue:
(rID, cID) = self.receivedQueue.popleft()
self.addToSendQueue(rID, cID)
def updateStats(self):
"""It updates the stats related to sent and received data."""
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
self.statsRxPerSlot.append(self.statsRxInSlot)
self.statsTxPerSlot.append(self.statsTxInSlot)
self.statsRxInSlot = 0
self.statsTxInSlot = 0
def checkSegmentToNeigh(self, rID, cID, neigh):
"""Check if a segment should be sent to a neighbor."""
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
return False # sent enough, other side can restore
i = rID if neigh.dim else cID
if not neigh.sent[i] and not neigh.received[i] :
return True
else:
return False # received or already sent
def sendColumn(self, columnID):
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in self.columnNeighbors[columnID].values():
def sendSegmentToNeigh(self, rID, cID, neigh):
"""Send segment to a neighbor (without checks)."""
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveColumn(columnID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
def checkSendSegmentToNeigh(self, rID, cID, neigh):
"""Check and send a segment to a neighbor if needed."""
if self.checkSegmentToNeigh(rID, cID, neigh):
self.sendSegmentToNeigh(rID, cID, neigh)
return True
else:
return False
def sendRow(self, rowID):
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in self.rowNeighbors[rowID].values():
def processSendQueue(self):
"""Send out segments from queue until bandwidth limit reached.
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveRow(rowID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
SendQueue is a centralized queue from which segments are sent out
in FIFO order to all interested neighbors.
"""
while self.sendQueue:
(rID, cID) = self.sendQueue[0]
def sendRows(self):
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
if self.changedRow[r]:
self.sendRow(r)
if rID in self.rowIDs:
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
def sendColumns(self):
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
if self.changedColumn[c]:
self.sendColumn(c)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
self.sendQueue.popleft()
def processPerNeighborSendQueue(self):
"""Send out segments from per-neighbor queues until bandwidth limit reached.
Segments are dispatched from per-neighbor transmission queues in a shuffled
round-robin order, emulating a type of fair queuing. Since neighborhood is
handled at the topic (column or row) level, fair queuing is also at the level
of flows per topic and per peer. A per-peer model might be closer to the
reality of libp2p implementations where topics between two nodes are
multiplexed over the same transport.
"""
progress = True
while (progress):
progress = False
queues = []
# collect and shuffle
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
for dim, lineID, neigh in shuffled(queues, self.shuffleQueues):
if dim == 0:
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
def runSegmentShuffleScheduler(self):
""" Schedule chunks for sending.
This scheduler check which owned segments needs sending (at least
one neighbor needing it). Then it sends each segment that's worth sending
once, in shuffled order. This is repeated until bw limit.
"""
def collectSegmentsToSend():
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = []
for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((0, rID, i))
for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((1, cID, i))
return segmentsToSend
def nextSegment():
while True:
# send each collected segment once
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
for dim, lineID, id in self.segmentShuffleGen:
if dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(lineID, id, neigh):
yield((lineID, id, neigh))
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(id, lineID, neigh):
yield((id, lineID, neigh))
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend()
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend:
break
else:
self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines)
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
return
def runDumbRandomScheduler(self, tries = 100):
"""Random scheduler picking segments at random.
This scheduler implements a simple random scheduling order picking
segments at random and peers potentially interested in that segment
also at random.
It serves more as a performance baseline than as a realistic model.
"""
def nextSegment():
t = tries
while t:
if self.rowIDs:
rID = random.choice(self.rowIDs)
cID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.rowNeighbors[rID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
if self.columnIDs:
cID = random.choice(self.columnIDs)
rID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.columnNeighbors[cID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
t -= 1
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
# process node level send queue
self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process neighbor level send queues in shuffled breadth-first order
self.processPerNeighborSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process possible segments to send in shuffled breadth-first order
if self.segmentShuffleScheduler:
self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
if self.dumbRandomScheduler:
self.runDumbRandomScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
def logRows(self):
"""It logs the rows assigned to the validator."""
if self.logger.isEnabledFor(logging.DEBUG):
for id in self.rowIDs:
self.logger.debug("Row %d: %s", id, self.getRow(id), extra=self.format)
def logColumns(self):
"""It logs the columns assigned to the validator."""
if self.logger.isEnabledFor(logging.DEBUG):
for id in self.columnIDs:
self.logger.debug("Column %d: %s", id, self.getColumn(id), extra=self.format)
def restoreRows(self):
for id in self.rowIDs:
self.block.repairRow(id)
"""It restores the rows assigned to the validator, that can be repaired."""
if self.repairOnTheFly:
for id in self.rowIDs:
self.restoreRow(id)
def restoreRow(self, id):
"""Restore a given row if repairable."""
rep = self.block.repairRow(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1)
def restoreColumns(self):
for id in self.columnIDs:
self.block.repairColumn(id)
"""It restores the columns assigned to the validator, that can be repaired."""
if self.repairOnTheFly:
for id in self.columnIDs:
self.restoreColumn(id)
def restoreColumn(self, id):
"""Restore a given column if repairable."""
rep = self.block.repairColumn(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1)
def checkStatus(self):
"""It checks how many expected/arrived samples are for each assigned row/column."""
arrived = 0
expected = 0
for id in self.columnIDs:

View File

@ -12,7 +12,8 @@ class Visualizer:
def __init__(self, execID):
self.execID = execID
self.folderPath = "results/"+self.execID
self.parameters = ['run', 'blockSize', 'failureRate', 'numberValidators', 'netDegree', 'chi']
self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree',
'chi', 'vpn1', 'vpn2', 'bwUplinkProd', 'bwUplink1', 'bwUplink2']
self.minimumDataPoints = 2
def plottingData(self):
@ -27,22 +28,28 @@ class Visualizer:
run = int(root.find('run').text)
blockSize = int(root.find('blockSize').text)
failureRate = int(root.find('failureRate').text)
numberValidators = int(root.find('numberValidators').text)
numberNodes = int(root.find('numberNodes').text)
netDegree = int(root.find('netDegree').text)
chi = int(root.find('chi').text)
vpn1 = int(root.find('vpn1').text)
vpn2 = int(root.find('vpn2').text)
bwUplinkProd = int(root.find('bwUplinkProd').text)
bwUplink1 = int(root.find('bwUplink1').text)
bwUplink2 = int(root.find('bwUplink2').text)
tta = int(root.find('tta').text)
"""Loop over all possible combinations of length 4 of the parameters"""
for combination in combinations(self.parameters, 4):
"""Get the indices and values of the parameters in the combination"""
for combination in combinations(self.parameters, len(self.parameters)-2):
# Get the indices and values of the parameters in the combination
indices = [self.parameters.index(element) for element in combination]
selectedValues = [run, blockSize, failureRate, numberValidators, netDegree, chi]
selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, bwUplinkProd, bwUplink1, bwUplink2]
values = [selectedValues[index] for index in indices]
names = [self.parameters[i] for i in indices]
keyComponents = [f"{name}_{value}" for name, value in zip(names, values)]
key = tuple(keyComponents[:4])
key = tuple(keyComponents[:len(self.parameters)-2])
"""Get the names of the other 2 parameters that are not included in the key"""
otherParams = [self.parameters[i] for i in range(6) if i not in indices]
otherParams = [self.parameters[i] for i in range(len(self.parameters)) if i not in indices]
"""Append the values of the other 2 parameters and the ttas to the lists for the key"""
otherIndices = [i for i in range(len(self.parameters)) if i not in indices]
@ -119,7 +126,7 @@ class Visualizer:
"""Label formatting for the figures"""
result = ''.join([f" {char}" if char.isupper() else char for char in label])
return result.title()
def formatTitle(self, key):
"""Title formatting for the figures"""
name = ''.join([f" {char}" if char.isupper() else char for char in key.split('_')[0]])
@ -132,7 +139,7 @@ class Visualizer:
data = self.averageRuns(data)
filteredKeys = self.similarKeys(data)
print("Plotting heatmaps...")
"""Create the directory if it doesn't exist already"""
heatmapsFolder = self.folderPath + '/heatmaps'
if not os.path.exists(heatmapsFolder):

View File

@ -1,28 +0,0 @@
[Simulation Space]
numberValidatorStart = 256
numberValidatorStop = 512
numberValidatorStep = 128
failureRateStart = 10
failureRateStop = 90
failureRateStep = 40
blockSizeStart = 32
blockSizeStop = 64
blockSizeStep = 16
netDegreeStart = 6
netDegreeStop = 8
netDegreeStep = 1
chiStart = 4
chiStop = 8
chiStep = 2
[Advanced]
deterministic = 0
numberRuns = 2
dumpXML = 1

76
config_example.py Normal file
View File

@ -0,0 +1,76 @@
"""Example configuration file
This file illustrates how to define options and simulation parameter ranges.
It also defines the traversal order of the simulation space. As the file
extension suggests, configuration is pure python code, allowing complex
setups. Use at your own risk.
To use this example, run
python3 study.py config_example
Otherwise copy it and modify as needed. The default traversal order defined
in the nested loop of nextShape() is good for most cases, but customizable
if needed.
"""
import logging
import itertools
import numpy as np
from DAS.shape import Shape
dumpXML = 1
visualization = 1
logLevel = logging.INFO
# number of parallel workers. -1: all cores; 1: sequential
# for more details, see joblib.Parallel
numJobs = 3
# distribute rows/columns evenly between validators (True)
# or generate it using local randomness (False)
evenLineDistribution = True
# Number of simulation runs with the same parameters for statistical relevance
runs = range(10)
# Number of validators
numberNodes = range(256, 513, 128)
# Percentage of block not released by producer
failureRates = range(10, 91, 40)
# Block size in one dimension in segments. Block is blockSizes * blockSizes segments.
blockSizes = range(32,65,16)
# Per-topic mesh neighborhood size
netDegrees = range(6, 9, 2)
# number of rows and columns a validator is interested in
chis = range(1, 5, 2)
# ratio of class1 nodes (see below for parameters per class)
class1ratios = np.arange(0, 1, .2)
# Number of validators per beacon node
validatorsPerNode1 = [1]
validatorsPerNode2 = [2, 4, 8, 16, 32]
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
bwUplinksProd = [2200]
bwUplinks1 = [110]
bwUplinks2 = [2200]
# Set to True if you want your run to be deterministic, False if not
deterministic = False
# If your run is deterministic you can decide the random seed. This is ignore otherwise.
randomSeed = "DAS"
def nextShape():
for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(
runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):
# Network Degree has to be an even number
if netDegree % 2 == 0:
shape = Shape(blockSize, nn, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run)
yield shape

20
doc/Makefile Normal file
View File

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

64
doc/conf.py Normal file
View File

@ -0,0 +1,64 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../DAS'))
# -- Project information -----------------------------------------------------
project = 'DAS simulator'
copyright = '2023, Leonardo A. Bautista-Gomez, Csaba Kiraly'
author = 'Leonardo A. Bautista-Gomez, Csaba Kiraly'
# The full version, including alpha/beta/rc tags
release = '1'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc'
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'myenv']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# -- Options for autodoc -------------------------------------------------
autodoc_mock_imports = ["django", "dicttoxml", "bitarray", "DAS", "networkx"]

44
doc/index.rst Normal file
View File

@ -0,0 +1,44 @@
.. DAS simulator documentation master file, created by
sphinx-quickstart on Wed Feb 8 20:56:44 2023.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to DAS simulator's documentation!
=========================================
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
.. toctree::
:maxdepth: 2
:caption: Contents:
.. automodule:: block
:members:
.. automodule:: configuration
:members:
.. automodule:: observer
:members:
.. automodule:: results
:members:
.. automodule:: shape
:members:
.. automodule:: simulator
:members:
.. automodule:: tools
:members:
.. automodule:: validator
:members:

35
doc/make.bat Normal file
View File

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@ -1,58 +1,65 @@
#! /bin/python3
import time, sys, random, copy
import importlib
from joblib import Parallel, delayed
from DAS import *
# Parallel execution:
# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see
# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop
# For fixing logging issues in parallel execution, see
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017
def runOnce(sim, config, shape):
if config.deterministic:
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
sim.initLogger()
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
return result
def study():
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)
config = Configuration(sys.argv[1])
shape = Shape(0, 0, 0, 0, 0, 0)
sim = Simulator(shape)
try:
config = importlib.import_module(sys.argv[1])
except ModuleNotFoundError as e:
try:
config = importlib.import_module(str(sys.argv[1]).replace(".py", ""))
except ModuleNotFoundError as e:
print(e)
print("You need to pass a configuration file in parameter")
exit(1)
shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
sim = Simulator(shape, config)
sim.initLogger()
results = []
simCnt = 0
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
sim.logger.info("Starting simulations:", extra=sim.format)
start = time.time()
for run in range(config.numberRuns):
for nv in range(config.nvStart, config.nvStop+1, config.nvStep):
for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep):
for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep):
for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep):
for chi in range(config.chiStart, config.chiStop+1, config.chiStep):
if not config.deterministic:
random.seed(datetime.now())
# Network Degree has to be an even number
if netDegree % 2 == 0:
shape = Shape(blockSize, nv, fr, chi, netDegree, run)
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d" % (str(sim.shape.__dict__), result.blockAvailable), extra=sim.format)
results.append(copy.deepcopy(result))
simCnt += 1
results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape())
end = time.time()
sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format)
sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format)
if config.dumpXML:
for res in results:
res.dump(execID)
sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format)
visualization = 1
if visualization:
if config.visualization:
vis = Visualizer(execID)
vis.plotHeatmaps()