Merge pull request #48 from status-im/develop

promote current development branch to master
This commit is contained in:
Csaba Kiraly 2023-05-06 11:25:56 +02:00 committed by GitHub
commit e88c2f310b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1552 additions and 346 deletions

5
.gitignore vendored
View File

@ -1,2 +1,7 @@
*.swp
*.pyc
results/*
myenv
doc/_build
!results/plots.py
Frontend/

View File

@ -1,3 +1,3 @@
from DAS.simulator import *
from DAS.configuration import *
from DAS.shape import *
from DAS.visualizer import *

View File

@ -5,43 +5,73 @@ from bitarray import bitarray
from bitarray.util import zeros
class Block:
blockSize = 0
data = bitarray()
"""This class represents a block in the Ethereum blockchain."""
def __init__(self, blockSize):
"""Initialize the block with a data array of blocksize^2 zeros."""
self.blockSize = blockSize
self.data = zeros(self.blockSize*self.blockSize)
def fill(self):
"""It fills the block data with ones."""
self.data.setall(1)
def merge(self, merged):
"""It merges (OR) the existing block with the received one."""
self.data |= merged.data
def getSegment(self, rowID, columnID):
"""Check whether a segment is included"""
return self.data[rowID*self.blockSize + columnID]
def setSegment(self, rowID, columnID, value = 1):
"""Set value for a segment (default 1)"""
self.data[rowID*self.blockSize + columnID] = value
def getColumn(self, columnID):
"""It returns the block column corresponding to columnID."""
return self.data[columnID::self.blockSize]
def mergeColumn(self, columnID, column):
"""It merges (OR) the existing column with the received one."""
self.data[columnID::self.blockSize] |= column
def repairColumn(self, id):
success = self.data[id::self.blockSize].count(1)
"""It repairs the entire column if it has at least blockSize/2 ones.
Returns: list of repaired segments
"""
line = self.data[id::self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id::self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def getRow(self, rowID):
"""It returns the block row corresponding to rowID."""
return self.data[rowID*self.blockSize:(rowID+1)*self.blockSize]
def mergeRow(self, rowID, row):
"""It merges (OR) the existing row with the received one."""
self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row
def repairRow(self, id):
success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1)
"""It repairs the entire row if it has at least blockSize/2 ones.
Returns: list of repaired segments.
"""
line = self.data[id*self.blockSize:(id+1)*self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id*self.blockSize:(id+1)*self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def print(self):
"""It prints the block in the terminal (outside of the logger rules))."""
dash = "-" * (self.blockSize+2)
print(dash)
for i in range(self.blockSize):

View File

@ -1,48 +0,0 @@
#!/bin/python3
import configparser
class Configuration:
deterministic = 0
def __init__(self, fileName):
config = configparser.RawConfigParser()
config.read(fileName)
self.nvStart = int(config.get("Simulation Space", "numberValidatorStart"))
self.nvStop = int(config.get("Simulation Space", "numberValidatorStop"))
self.nvStep = int(config.get("Simulation Space", "numberValidatorStep"))
self.blockSizeStart = int(config.get("Simulation Space", "blockSizeStart"))
self.blockSizeStop = int(config.get("Simulation Space", "blockSizeStop"))
self.blockSizeStep = int(config.get("Simulation Space", "blockSizeStep"))
self.netDegreeStart = int(config.get("Simulation Space", "netDegreeStart"))
self.netDegreeStop = int(config.get("Simulation Space", "netDegreeStop"))
self.netDegreeStep = int(config.get("Simulation Space", "netDegreeStep"))
self.failureRateStart = int(config.get("Simulation Space", "failureRateStart"))
self.failureRateStop = int(config.get("Simulation Space", "failureRateStop"))
self.failureRateStep = int(config.get("Simulation Space", "failureRateStep"))
self.chiStart = int(config.get("Simulation Space", "chiStart"))
self.chiStop = int(config.get("Simulation Space", "chiStop"))
self.chiStep = int(config.get("Simulation Space", "chiStep"))
self.numberRuns = int(config.get("Advanced", "numberRuns"))
self.deterministic = config.get("Advanced", "deterministic")
if self.nvStop < (self.blockSizeStart*4):
print("ERROR: The number of validators cannot be lower than the block size * 4")
exit(1)
if self.chiStart < 1:
print("Chi has to be greater than 0")
exit(1)
if self.chiStop > self.blockSizeStart:
print("Chi (%d) has to be smaller or equal to block the size (%d)" % (self.chiStop, self.blockSizeStart))
exit(1)

View File

@ -1,30 +1,24 @@
#!/bin/python3
import numpy as np
from DAS.block import *
class Observer:
block = []
rows = []
columns = []
goldenData = []
broadcasted = []
config = []
logger = []
"""This class gathers global data from the simulation, like an 'all-seen god'."""
def __init__(self, logger, config):
"""It initializes the observer with a logger and given configuration."""
self.config = config
self.format = {"entity": "Observer"}
self.logger = logger
def reset(self):
self.block = [0] * self.config.blockSize * self.config.blockSize
self.goldenData = [0] * self.config.blockSize * self.config.blockSize
self.rows = [0] * self.config.blockSize
self.columns = [0] * self.config.blockSize
self.broadcasted = Block(self.config.blockSize)
def checkRowsColumns(self, validators):
"""It checks how many validators have been assigned to each row and column."""
for val in validators:
if val.amIproposer == 0:
for r in val.rowIDs:
@ -37,11 +31,8 @@ class Observer:
if self.rows[i] == 0 or self.columns[i] == 0:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
def setGoldenData(self, block):
for i in range(self.config.blockSize*self.config.blockSize):
self.goldenData[i] = block.data[i]
def checkBroadcasted(self):
"""It checks how many broadcasted samples are still missing in the network."""
zeros = 0
for i in range(self.blockSize * self.blockSize):
if self.broadcasted.data[i] == 0:
@ -51,11 +42,61 @@ class Observer:
return zeros
def checkStatus(self, validators):
"""It checks the status of how many expected and arrived samples globally."""
arrived = 0
expected = 0
ready = 0
validatedall = 0
validated = 0
for val in validators:
if val.amIproposer == 0:
(a, e) = val.checkStatus()
(a, e, v) = val.checkStatus()
arrived += a
expected += e
return (arrived, expected)
if a == e:
ready += 1
validatedall += val.vpn
validated += v
return (arrived, expected, ready, validatedall, validated)
def getProgress(self, validators):
"""Calculate current simulation progress with different metrics.
Returns:
- missingSamples: overall number of sample instances missing in nodes.
Sample are counted on both rows and columns, so intersections of interest are counted twice.
- sampleProgress: previous expressed as progress ratio
- nodeProgress: ratio of nodes having all segments interested in
- validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator,
but counts a validator only if its support node's all validators see all interesting segments
TODO: add real per validator progress counter
"""
arrived, expected, ready, validatedall, validated = self.checkStatus(validators)
missingSamples = expected - arrived
sampleProgress = arrived / expected
nodeProgress = ready / (len(validators)-1)
validatorCnt = sum([v.vpn for v in validators[1:]])
validatorAllProgress = validatedall / validatorCnt
validatorProgress = validated / validatorCnt
return missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress
def getTrafficStats(self, validators):
"""Summary statistics of traffic measurements in a timestep."""
def maxOrNan(l):
return np.max(l) if l else np.NaN
def meanOrNan(l):
return np.mean(l) if l else np.NaN
trafficStats = {}
for cl in range(0,3):
Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl]
Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl]
RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl]
trafficStats[cl] = {
"Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)},
"Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)},
"RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)},
}
return trafficStats

View File

@ -1,3 +1,8 @@
bitarray==2.6.0
DAS==0.28.7
dicttoxml==1.7.16
matplotlib==3.6.2
mplfinance==0.12.9b7
networkx==3.0
numpy==1.23.5
seaborn==0.12.2
joblib==1.2.0

View File

@ -1,17 +1,52 @@
#!/bin/python3
import os
import bisect
from xml.dom import minidom
from dicttoxml import dicttoxml
class Result:
"""This class stores and process/store the results of a simulation."""
config = []
missingVector = []
blockAvailable = -1
def __init__(self, config):
self.config = config
def __init__(self, shape, execID):
"""It initializes the instance with a specific shape."""
self.shape = shape
self.execID = execID
self.blockAvailable = -1
self.tta = -1
self.missingVector = []
self.metrics = {}
def addMissing(self, missingVector):
def populate(self, shape, config, missingVector):
"""It populates part of the result data inside a vector."""
self.shape = shape
self.missingVector = missingVector
v = self.metrics["progress"]["validators ready"]
tta = bisect.bisect(v, config.successCondition)
if v[-1] >= config.successCondition:
self.blockAvailable = 1
self.tta = tta * (config.stepDuration)
else:
self.blockAvailable = 0
self.tta = -1
def addMetric(self, name, metric):
"""Generic function to add a metric to the results."""
self.metrics[name] = metric
def dump(self):
"""It dumps the results of the simulation in an XML file."""
if not os.path.exists("results"):
os.makedirs("results")
if not os.path.exists("results/"+self.execID):
os.makedirs("results/"+self.execID)
resd1 = self.shape.__dict__
resd2 = self.__dict__.copy()
resd2.pop("shape")
resd1.update(resd2)
resXml = dicttoxml(resd1)
xmlstr = minidom.parseString(resXml)
xmlPretty = xmlstr.toprettyxml()
filePath = "results/"+self.execID+"/"+str(self.shape)+".xml"
with open(filePath, "w") as f:
f.write(xmlPretty)

View File

@ -1,19 +1,44 @@
#!/bin/python3
class Shape:
numberValidators = 0
failureRate = 0
blockSize = 0
netDegree = 0
chi = 0
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSize, numberValidators, failureRate, chi, netDegree):
self.numberValidators = numberValidators
self.failureRate = failureRate
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
"""Initializes the shape with the parameters passed in argument."""
self.run = run
self.numberNodes = numberNodes
self.blockSize = blockSize
self.failureModel = failureModel
self.failureRate = failureRate
self.netDegree = netDegree
self.class1ratio = class1ratio
self.chi = chi
self.vpn1 = vpn1
self.vpn2 = vpn2
self.bwUplinkProd = bwUplinkProd
self.bwUplink1 = bwUplink1
self.bwUplink2 = bwUplink2
self.randomSeed = ""
def __repr__(self):
"""Returns a printable representation of the shape"""
shastr = ""
shastr += "bs-"+str(self.blockSize)
shastr += "-nn-"+str(self.numberNodes)
shastr += "-fm-"+str(self.failureModel)
shastr += "-fr-"+str(self.failureRate)
shastr += "-c1r-"+str(self.class1ratio)
shastr += "-chi-"+str(self.chi)
shastr += "-vpn1-"+str(self.vpn1)
shastr += "-vpn2-"+str(self.vpn2)
shastr += "-bwupprod-"+str(self.bwUplinkProd)
shastr += "-bwup1-"+str(self.bwUplink1)
shastr += "-bwup2-"+str(self.bwUplink2)
shastr += "-nd-"+str(self.netDegree)
shastr += "-r-"+str(self.run)
return shastr
def setSeed(self, seed):
"""Adds the random seed to the shape"""
self.randomSeed = seed

View File

@ -2,6 +2,8 @@
import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from datetime import datetime
from DAS.tools import *
from DAS.results import *
@ -9,130 +11,299 @@ from DAS.observer import *
from DAS.validator import *
class Simulator:
"""This class implements the main DAS simulator."""
proposerID = 0
logLevel = logging.INFO
validators = []
glob = []
result = []
shape = []
logger = []
format = {}
def __init__(self, shape):
def __init__(self, shape, config, execID):
"""It initializes the simulation with a set of parameters (shape)."""
self.shape = shape
self.config = config
self.format = {"entity": "Simulator"}
self.result = Result(self.shape)
self.execID = execID
self.result = Result(self.shape, self.execID)
self.validators = []
self.logger = []
self.logLevel = config.logLevel
self.proposerID = 0
self.glob = []
self.execID = execID
self.distR = []
self.distC = []
self.nodeRows = []
self.nodeColumns = []
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def initValidators(self):
"""It initializes all the validators in the network."""
self.glob = Observer(self.logger, self.shape)
self.glob.reset()
self.validators = []
rows = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
columns = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
random.shuffle(rows)
random.shuffle(columns)
for i in range(self.shape.numberValidators):
val = Validator(i, int(not i!=0), self.logger, self.shape, rows, columns)
if self.config.evenLineDistribution:
lightNodes = int(self.shape.numberNodes * self.shape.class1ratio)
heavyNodes = self.shape.numberNodes - lightNodes
lightVal = lightNodes * self.shape.vpn1
heavyVal = heavyNodes * self.shape.vpn2
totalValidators = lightVal + heavyVal
totalRows = totalValidators * self.shape.chi
rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
rows = rows[0:totalRows]
columns = columns[0:totalRows]
random.shuffle(rows)
random.shuffle(columns)
offset = lightVal*self.shape.chi
self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format)
self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format)
self.logger.debug("Shuffling a total of %d rows/columns to be assigned (X=%d)" % (len(rows), self.shape.chi), extra=self.format)
self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format)
self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format)
assignedRows = []
assignedCols = []
for i in range(self.shape.numberNodes):
if self.config.evenLineDistribution:
if i < int(lightVal/self.shape.vpn1): # First start with the light nodes
start = i *self.shape.chi*self.shape.vpn1
end = (i+1)*self.shape.chi*self.shape.vpn1
else:
j = i - int(lightVal/self.shape.vpn1)
start = offset+( j *self.shape.chi*self.shape.vpn2)
end = offset+((j+1)*self.shape.chi*self.shape.vpn2)
r = rows[start:end]
c = columns[start:end]
val = Validator(i, int(not i!=0), self.logger, self.shape, self.config, r, c)
self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format)
self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format)
assignedRows = assignedRows + list(r)
assignedCols = assignedCols + list(c)
self.nodeRows.append(val.rowIDs)
self.nodeColumns.append(val.columnIDs)
else:
val = Validator(i, int(not i!=0), self.logger, self.shape, self.config)
if i == self.proposerID:
val.initBlock()
self.glob.setGoldenData(val.block)
else:
val.logIDs()
self.validators.append(val)
assignedRows.sort()
assignedCols.sort()
self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format)
self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format)
self.logger.debug("Validators initialized.", extra=self.format)
def initNetwork(self):
self.shape.netDegree = 6
"""It initializes the simulated network."""
rowChannels = [[] for i in range(self.shape.blockSize)]
columnChannels = [[] for i in range(self.shape.blockSize)]
for v in self.validators:
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
if not (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
# Check rows/columns distribution
for r in rowChannels:
self.distR.append(len(r))
for c in columnChannels:
self.distC.append(len(c))
self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format)
self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format)
for id in range(self.shape.blockSize):
if (len(rowChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(rowChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
# If the number of nodes in a channel is smaller or equal to the
# requested degree, a fully connected graph is used. For n>d, a random
# d-regular graph is set up. (For n=d+1, the two are the same.)
if not rowChannels[id]:
self.logger.error("No nodes for row %d !" % id, extra=self.format)
continue
elif (len(rowChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(rowChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(rowChannels[id]))
else:
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for row %d !" % id, extra=self.format)
for u, v in G.edges:
val1=rowChannels[id][u]
val2=rowChannels[id][v]
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)})
if (len(columnChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not columnChannels[id]:
self.logger.error("No nodes for column %d !" % id, extra=self.format)
continue
elif (len(columnChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(columnChannels[id]))
else:
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for column %d !" % id, extra=self.format)
for u, v in G.edges:
val1=columnChannels[id][u]
val2=columnChannels[id][v]
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)})
for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
count = min(self.proposerPublishTo, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)})
for id in v.columnIDs:
count = min(self.proposerPublishTo, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)})
if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberNodes):
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
def initLogger(self):
"""It initializes the logger."""
logging.TRACE = 5
logging.addLevelName(logging.TRACE, 'TRACE')
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
logger = logging.getLogger("DAS")
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
if len(logger.handlers) == 0:
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
self.logger = logger
def resetShape(self, shape):
self.shape = shape
def printDiagnostics(self):
"""Print all required diagnostics to check when a block does not become available"""
for val in self.validators:
val.shape.failureRate = shape.failureRate
val.shape.chi = shape.chi
(a, e) = val.checkStatus()
if e-a > 0 and val.ID != 0:
self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format)
for r in val.rowIDs:
row = val.getRow(r)
if row.count() < len(row):
self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format)
neiR = val.rowNeighbors[r]
for nr in neiR:
self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format)
self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format)
for c in val.columnIDs:
col = val.getColumn(c)
if col.count() < len(col):
self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format)
neiC = val.columnNeighbors[c]
for nc in neiC:
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock()
arrived, expected = self.glob.checkStatus(self.validators)
for i in range(0,self.shape.numberNodes):
if i == self.proposerID:
self.validators[i].initBlock()
else:
self.validators[i].logIDs()
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingVector = []
progressVector = []
trafficStatsVector = []
steps = 0
while(True):
missingVector.append(missingSamples)
oldMissingSamples = missingSamples
for i in range(0,self.shape.numberValidators):
self.validators[i].sendRows()
self.validators[i].sendColumns()
for i in range(1,self.shape.numberValidators):
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].send()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()
for i in range(1,self.shape.numberValidators):
self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].restoreRows()
self.validators[i].restoreColumns()
for i in range(0,self.shape.numberValidators):
self.logger.debug("PHASE LOG %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].logRows()
self.validators[i].logColumns()
# log TX and RX statistics
trafficStats = self.glob.getTrafficStats(self.validators)
self.logger.debug("step %d: %s" %
(steps, trafficStats), extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].updateStats()
trafficStatsVector.append(trafficStats)
missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress = self.glob.getProgress(self.validators)
self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validatedall %0.02f %%, , validated %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100, validatorAllProgress*100, validatorProgress*100), extra=self.format)
cnS = "samples received"
cnN = "nodes ready"
cnV = "validators ready"
cnT0 = "TX builder mean"
cnT1 = "TX class1 mean"
cnT2 = "TX class2 mean"
cnR1 = "RX class1 mean"
cnR2 = "RX class2 mean"
cnD1 = "Dup class1 mean"
cnD2 = "Dup class2 mean"
progressVector.append({
cnS:sampleProgress,
cnN:nodeProgress,
cnV:validatorProgress,
cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"],
cnR1: trafficStats[1]["Rx"]["mean"],
cnR2: trafficStats[2]["Rx"]["mean"],
cnD1: trafficStats[1]["RxDup"]["mean"],
cnD2: trafficStats[2]["RxDup"]["mean"],
})
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingRate = missingSamples*100/expected
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples:
break
if len(missingVector) > self.config.steps4StopCondition:
if missingSamples == missingVector[-self.config.steps4StopCondition]:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
if self.config.diagnostics:
self.printDiagnostics()
break
missingVector.append(missingSamples)
elif missingSamples == 0:
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
missingVector.append(missingSamples)
break
else:
steps += 1
steps += 1
self.result.addMissing(missingVector)
if missingSamples == 0:
self.result.blockAvailable = 1
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
return self.result
else:
self.result.blockAvailable = 0
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
return self.result
progress = pd.DataFrame(progressVector)
if self.config.saveRCdist:
self.result.addMetric("rowDist", self.distR)
self.result.addMetric("columnDist", self.distC)
if self.config.saveProgress:
self.result.addMetric("progress", progress.to_dict(orient='list'))
self.result.populate(self.shape, self.config, missingVector)
return self.result

View File

@ -1,27 +1,87 @@
#!/bin/python3
import logging
import sys
import random
from bitarray.util import zeros
class CustomFormatter():
"""This class defines the terminal output formatting."""
class CustomFormatter(logging.Formatter):
blue = "\x1b[34;20m"
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "%(levelname)s : %(entity)s : %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: blue + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def __init__(self):
"""Initializes 5 different formats for logging with different colors."""
self.blue = "\x1b[34;20m"
self.grey = "\x1b[38;20m"
self.yellow = "\x1b[33;20m"
self.red = "\x1b[31;20m"
self.bold_red = "\x1b[31;1m"
self.reset = "\x1b[0m"
self.reformat = "%(levelname)s : %(entity)s : %(message)s"
self.FORMATS = {
logging.DEBUG: self.grey + self.reformat + self.reset,
logging.INFO: self.blue + self.reformat + self.reset,
logging.WARNING: self.yellow + self.reformat + self.reset,
logging.ERROR: self.red + self.reformat + self.reset,
logging.CRITICAL: self.bold_red + self.reformat + self.reset
}
def format(self, record):
"""Returns the formatter with the format corresponding to record."""
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def shuffled(lis, shuffle=True):
"""Generator yielding list in shuffled order."""
# based on https://stackoverflow.com/a/60342323
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
"""Generator yielding dictionary in shuffled order.
Shuffle, except if not (optional parameter useful for experiment setup).
"""
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
"""Sample up to 'limit' bits from a bitarray.
Since this is quite expensive, we use a number of heuristics to get it fast.
"""
if limit == sys.maxsize :
return line
else:
w = line.count(1)
if limit >= w :
return line
else:
l = len(line)
r = zeros(l)
if w < l/10 or limit > l/2 :
indices = [ i for i in range(l) if line[i] ]
sample = random.sample(indices, limit)
for i in sample:
r[i] = 1
return r
else:
while limit:
i = random.randrange(0, l)
if line[i] and not r[i]:
r[i] = 1
limit -= 1
return r
def unionOfSamples(population, sampleSize, times):
selected = set()
for t in range(times):
selected |= set(random.sample(population, sampleSize))
return selected

View File

@ -4,57 +4,78 @@ import random
import collections
import logging
from DAS.block import *
from bitarray import bitarray
from DAS.tools import shuffled, shuffledDict, unionOfSamples
from bitarray.util import zeros
from collections import deque
from itertools import chain
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data.
It represents one side of a P2P link in the overlay. Sent and received
segments are monitored to avoid sending twice or sending back what was
received from a link.
"""
def __repr__(self):
return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1))
"""It returns the amount of sent and received data."""
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
def __init__(self, v, blockSize):
def __init__(self, v, dim, blockSize):
"""It initializes the neighbor with the node and sets counters to zero."""
self.node = v
self.dim = dim # 0:row 1:col
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
self.sendQueue = deque()
class Validator:
ID = 0
amIproposer = 0
shape = []
format = {}
logger = []
"""This class implements a validator/node in the network."""
def __repr__(self):
"""It returns the validator ID."""
return str(self.ID)
def __init__(self, ID, amIproposer, logger, shape, rows, columns):
def __init__(self, ID, amIproposer, logger, shape, config, rows = None, columns = None):
"""It initializes the validator with the logger shape and rows/columns.
If rows/columns are specified these are observed, otherwise (default)
chi rows and columns are selected randomly.
"""
self.shape = shape
FORMAT = "%(levelname)s : %(entity)s : %(message)s"
self.ID = ID
self.format = {"entity": "Val "+str(self.ID)}
self.block = Block(self.shape.blockSize)
self.receivedBlock = Block(self.shape.blockSize)
self.receivedQueue = deque()
self.sendQueue = deque()
self.amIproposer = amIproposer
self.logger = logger
if self.shape.chi < 1:
self.logger.error("Chi has to be greater than 0", extra=self.format)
elif self.shape.chi > self.shape.blockSize:
self.logger.error("Chi has to be smaller than %d" % blockSize, extra=self.format)
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format)
else:
if amIproposer:
self.nodeClass = 0
self.rowIDs = range(shape.blockSize)
self.columnIDs = range(shape.blockSize)
else:
self.rowIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
self.columnIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
#if shape.deterministic:
# random.seed(self.ID)
#self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
#self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
self.changedRow = {id:False for id in self.rowIDs}
self.changedColumn = {id:False for id in self.columnIDs}
self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2
self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2
self.vRowIDs = []
self.vColumnIDs = []
for i in range(self.vpn):
self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSize), self.shape.chi)))
self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSize), self.shape.chi)))
self.rowIDs = set.union(*self.vRowIDs)
self.columnIDs = set.union(*self.vColumnIDs)
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
@ -63,8 +84,33 @@ class Validator:
self.statsTxPerSlot = []
self.statsRxInSlot = 0
self.statsRxPerSlot = []
self.statsRxDupInSlot = 0
self.statsRxDupPerSlot = []
# Set uplink bandwidth.
# Assuming segments of ~560 bytes and timesteps of 50ms, we get
# 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep
if self.amIproposer:
self.bwUplink = shape.bwUplinkProd
elif self.nodeClass == 1:
self.bwUplink = shape.bwUplink1
else:
self.bwUplink = shape.bwUplink2
self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize
self.repairOnTheFly = True
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
self.dumbRandomScheduler = False # dumb random scheduler
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
self.logger.warning("I am a block proposer."% self.ID)
else:
@ -72,161 +118,429 @@ class Validator:
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
def initBlock(self):
self.logger.debug("I am a block proposer.", extra=self.format)
self.block = Block(self.shape.blockSize)
self.block.fill()
#self.block.print()
def broadcastBlock(self):
"""It initializes the block for the proposer."""
if self.amIproposer == 0:
self.logger.error("I am NOT a block proposer", extra=self.format)
self.logger.warning("I am not a block proposer", extra=self.format)
else:
self.logger.debug("Broadcasting my block...", extra=self.format)
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
random.shuffle(order)
while(order):
i = order.pop()
if (random.randint(0,99) >= self.shape.failureRate):
self.logger.debug("Creating block...", extra=self.format)
if self.shape.failureModel == "random":
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order)))
for i in order:
self.block.data[i] = 1
else:
self.block.data[i] = 0
self.changedRow = {id:True for id in self.rowIDs}
self.changedColumn = {id:True for id in self.columnIDs}
elif self.shape.failureModel == "sequential":
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
order = order[:int((1 - self.shape.failureRate/100) * len(order))]
for i in order:
self.block.data[i] = 1
elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if r > k or c > k:
self.block.setSegment(r,c)
elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if r > k or c > k:
self.block.setSegment(r,c)
self.block.setSegment(0, 0)
elif self.shape.failureModel == "DEP":
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if (r+c) % self.shape.blockSize > k:
self.block.setSegment(r,c)
elif self.shape.failureModel == "DEP+1":
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if (r+c) % self.shape.blockSize > k:
self.block.setSegment(r,c)
self.block.setSegment(0, 0)
elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if r < k and c < k:
self.block.setSegment(r,c)
elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable
for r in range(self.shape.blockSize):
for c in range(self.shape.blockSize):
k = self.shape.blockSize/2
if r < k and c < k:
self.block.setSegment(r,c)
self.block.setSegment(0, 0, 0)
nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
#broadcasted.print()
def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
def getRow(self, index):
"""It returns a given row."""
return self.block.getRow(index)
def receiveColumn(self, id, column, src):
if id in self.columnIDs:
# register receive so that we are not sending back
self.columnNeighbors[id][src].receiving |= column
self.receivedBlock.mergeColumn(id, column)
self.statsRxInSlot += column.count(1)
def receiveSegment(self, rID, cID, src):
"""Receive a segment, register it, and queue for forwarding as needed."""
# register receive so that we are not sending back
if rID in self.rowIDs:
if src in self.rowNeighbors[rID]:
self.rowNeighbors[rID][src].receiving[cID] = 1
if cID in self.columnIDs:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
else:
pass
self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.statsRxDupInSlot += 1
self.statsRxInSlot += 1
def receiveRow(self, id, row, src):
if id in self.rowIDs:
# register receive so that we are not sending back
self.rowNeighbors[id][src].receiving |= row
self.receivedBlock.mergeRow(id, row)
self.statsRxInSlot += row.count(1)
else:
pass
def addToSendQueue(self, rID, cID):
"""Queue a segment for forwarding."""
if self.perNodeQueue:
self.sendQueue.append((rID, cID))
if self.perNeighborQueue:
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
neigh.sendQueue.append(cID)
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
neigh.sendQueue.append(rID)
def receiveRowsColumns(self):
"""Finalize time step by merging newly received segments in state."""
if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Receiving the data...", extra=self.format)
self.logger.trace("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.changedRow = { id:
self.getRow(id) != self.receivedBlock.getRow(id)
for id in self.rowIDs
}
self.changedColumn = { id:
self.getColumn(id) != self.receivedBlock.getColumn(id)
for id in self.columnIDs
}
self.block.merge(self.receivedBlock)
for neighs in self.rowNeighbors.values():
for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()):
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
for neighs in self.columnNeighbors.values():
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
# add newly received segments to the send queue
if self.perNodeQueue or self.perNeighborQueue:
while self.receivedQueue:
(rID, cID) = self.receivedQueue.popleft()
self.addToSendQueue(rID, cID)
def updateStats(self):
"""It updates the stats related to sent and received data."""
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
self.statsRxPerSlot.append(self.statsRxInSlot)
self.statsRxDupPerSlot.append(self.statsRxDupInSlot)
self.statsTxPerSlot.append(self.statsTxInSlot)
self.statsRxInSlot = 0
self.statsRxDupInSlot = 0
self.statsTxInSlot = 0
def checkSegmentToNeigh(self, rID, cID, neigh):
"""Check if a segment should be sent to a neighbor."""
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
return False # sent enough, other side can restore
i = rID if neigh.dim else cID
if not neigh.sent[i] and not neigh.received[i] :
return True
else:
return False # received or already sent
def sendColumn(self, columnID):
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in self.columnNeighbors[columnID].values():
def sendSegmentToNeigh(self, rID, cID, neigh):
"""Send segment to a neighbor (without checks)."""
self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveColumn(columnID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
def checkSendSegmentToNeigh(self, rID, cID, neigh):
"""Check and send a segment to a neighbor if needed."""
if self.checkSegmentToNeigh(rID, cID, neigh):
self.sendSegmentToNeigh(rID, cID, neigh)
return True
else:
return False
def sendRow(self, rowID):
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in self.rowNeighbors[rowID].values():
def processSendQueue(self):
"""Send out segments from queue until bandwidth limit reached.
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveRow(rowID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
SendQueue is a centralized queue from which segments are sent out
in FIFO order to all interested neighbors.
"""
while self.sendQueue:
(rID, cID) = self.sendQueue[0]
def sendRows(self):
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
if self.changedRow[r]:
self.sendRow(r)
if rID in self.rowIDs:
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
def sendColumns(self):
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
if self.changedColumn[c]:
self.sendColumn(c)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
self.sendQueue.popleft()
def processPerNeighborSendQueue(self):
"""Send out segments from per-neighbor queues until bandwidth limit reached.
Segments are dispatched from per-neighbor transmission queues in a shuffled
round-robin order, emulating a type of fair queuing. Since neighborhood is
handled at the topic (column or row) level, fair queuing is also at the level
of flows per topic and per peer. A per-peer model might be closer to the
reality of libp2p implementations where topics between two nodes are
multiplexed over the same transport.
"""
progress = True
while (progress):
progress = False
queues = []
# collect and shuffle
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
for dim, lineID, neigh in shuffled(queues, self.shuffleQueues):
if dim == 0:
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
def runSegmentShuffleScheduler(self):
""" Schedule chunks for sending.
This scheduler check which owned segments needs sending (at least
one neighbor needing it). Then it sends each segment that's worth sending
once, in shuffled order. This is repeated until bw limit.
"""
def collectSegmentsToSend():
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = []
for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((0, rID, i))
for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((1, cID, i))
return segmentsToSend
def nextSegment():
while True:
# send each collected segment once
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
for dim, lineID, id in self.segmentShuffleGen:
if dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(lineID, id, neigh):
yield((lineID, id, neigh))
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(id, lineID, neigh):
yield((id, lineID, neigh))
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend()
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend:
break
else:
self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines)
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
return
def runDumbRandomScheduler(self, tries = 100):
"""Random scheduler picking segments at random.
This scheduler implements a simple random scheduling order picking
segments at random and peers potentially interested in that segment
also at random.
It serves more as a performance baseline than as a realistic model.
"""
def nextSegment():
t = tries
while t:
if self.rowIDs:
rID = random.choice(self.rowIDs)
cID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.rowNeighbors[rID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
if self.columnIDs:
cID = random.choice(self.columnIDs)
rID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.columnNeighbors[cID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
t -= 1
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
# process node level send queue
self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process neighbor level send queues in shuffled breadth-first order
self.processPerNeighborSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process possible segments to send in shuffled breadth-first order
if self.segmentShuffleScheduler:
self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
if self.dumbRandomScheduler:
self.runDumbRandomScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
def logRows(self):
"""It logs the rows assigned to the validator."""
if self.logger.isEnabledFor(logging.DEBUG):
for id in self.rowIDs:
self.logger.debug("Row %d: %s", id, self.getRow(id), extra=self.format)
def logColumns(self):
"""It logs the columns assigned to the validator."""
if self.logger.isEnabledFor(logging.DEBUG):
for id in self.columnIDs:
self.logger.debug("Column %d: %s", id, self.getColumn(id), extra=self.format)
def restoreRows(self):
for id in self.rowIDs:
self.block.repairRow(id)
"""It restores the rows assigned to the validator, that can be repaired."""
if self.repairOnTheFly:
for id in self.rowIDs:
self.restoreRow(id)
def restoreRow(self, id):
"""Restore a given row if repairable."""
rep = self.block.repairRow(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.trace("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1)
def restoreColumns(self):
for id in self.columnIDs:
self.block.repairColumn(id)
"""It restores the columns assigned to the validator, that can be repaired."""
if self.repairOnTheFly:
for id in self.columnIDs:
self.restoreColumn(id)
def restoreColumn(self, id):
"""Restore a given column if repairable."""
rep = self.block.repairColumn(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.trace("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1)
def checkStatus(self):
arrived = 0
expected = 0
for id in self.columnIDs:
line = self.getColumn(id)
arrived += line.count(1)
expected += len(line)
for id in self.rowIDs:
line = self.getRow(id)
arrived += line.count(1)
expected += len(line)
"""It checks how many expected/arrived samples are for each assigned row/column."""
def checkStatus(columnIDs, rowIDs):
arrived = 0
expected = 0
for id in columnIDs:
line = self.getColumn(id)
arrived += line.count(1)
expected += len(line)
for id in rowIDs:
line = self.getRow(id)
arrived += line.count(1)
expected += len(line)
return arrived, expected
arrived, expected = checkStatus(self.columnIDs, self.rowIDs)
self.logger.debug("status: %d / %d", arrived, expected, extra=self.format)
return (arrived, expected)
validated = 0
for i in range(self.vpn):
a, e = checkStatus(self.vColumnIDs[i], self.vRowIDs[i])
if a == e:
validated+=1
return arrived, expected, validated

277
DAS/visualizer.py Normal file
View File

@ -0,0 +1,277 @@
#!/bin/python3
import os, sys
import time
import xml.etree.ElementTree as ET
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from itertools import combinations
from mplfinance.original_flavor import candlestick_ohlc
import os
class Visualizer:
def __init__(self, execID, config):
self.execID = execID
self.config = config
self.folderPath = "results/"+self.execID
self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2']
self.minimumDataPoints = 2
self.maxTTA = 11000
def plottingData(self):
"""Store data with a unique key for each params combination"""
data = {}
bw = []
print("Getting data from the folder...")
"""Loop over the xml files in the folder"""
for filename in os.listdir(self.folderPath):
"""Loop over the xmls and store the data in variables"""
if filename.endswith('.xml'):
tree = ET.parse(os.path.join(self.folderPath, filename))
root = tree.getroot()
run = int(root.find('run').text)
blockSize = int(root.find('blockSize').text)
failureRate = int(root.find('failureRate').text)
numberNodes = int(root.find('numberNodes').text)
class1ratio = float(root.find('class1ratio').text)
netDegree = int(root.find('netDegree').text)
chi = int(root.find('chi').text)
vpn1 = int(root.find('vpn1').text)
vpn2 = int(root.find('vpn2').text)
bwUplinkProd = int(root.find('bwUplinkProd').text)
bwUplink1 = int(root.find('bwUplink1').text)
bwUplink2 = int(root.find('bwUplink2').text)
tta = float(root.find('tta').text)
"""Store BW"""
bw.append(bwUplinkProd)
"""Loop over all possible combinations of length of the parameters minus x, y params"""
for combination in combinations(self.parameters, len(self.parameters)-2):
# Get the indices and values of the parameters in the combination
indices = [self.parameters.index(element) for element in combination]
selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2]
values = [selectedValues[index] for index in indices]
names = [self.parameters[i] for i in indices]
keyComponents = [f"{name}_{value}" for name, value in zip(names, values)]
key = tuple(keyComponents[:len(self.parameters)-2])
"""Get the names of the other parameters that are not included in the key"""
otherParams = [self.parameters[i] for i in range(len(self.parameters)) if i not in indices]
"""Append the values of the other parameters and the ttas to the lists for the key"""
otherIndices = [i for i in range(len(self.parameters)) if i not in indices]
"""Initialize the dictionary for the key if it doesn't exist yet"""
if key not in data:
data[key] = {}
"""Initialize lists for the other parameters and the ttas with the key"""
data[key][otherParams[0]] = []
data[key][otherParams[1]] = []
data[key]['ttas'] = []
if otherParams[0] in data[key]:
data[key][otherParams[0]].append(selectedValues[otherIndices[0]])
else:
data[key][otherParams[0]] = [selectedValues[otherIndices[0]]]
if otherParams[1] in data[key]:
data[key][otherParams[1]].append(selectedValues[otherIndices[1]])
else:
data[key][otherParams[1]] = [selectedValues[otherIndices[1]]]
data[key]['ttas'].append(tta)
return data
def averageRuns(self, data, runs):
"""Get the average of all runs for each key"""
newData = {}
print("Getting the average of the runs...")
for key, value in data.items():
runExists = False
"""Check if the key contains 'run_' with a numerical value"""
for item in key:
if item.startswith('run_'):
runExists = True
break
if runExists:
ps = list(data[key].keys())
for item in key:
"""Create a new key with the other items in the tuple"""
if item.startswith('run_'):
newKey = tuple([x for x in key if x != item])
"""Average the similar key values"""
tta_sums = {}
nbRuns = {}
ttRuns = []
total = []
p0 = []
p1 = []
p2 = []
p3 = []
for i in range(runs):
key0 = (f'run_{i}',) + newKey
#Create a dictionary to store the sums of ttas for each unique pair of values in subkeys
for i in range(len(data[key0][ps[0]])):
keyPair = (data[key0][ps[0]][i], data[key0][ps[1]][i])
if data[key0]["ttas"][i] == -1:
data[key0]["ttas"][i] = self.maxTTA
try:
tta_sums[keyPair] += data[key0]['ttas'][i]
if data[key0]["ttas"][i] != self.maxTTA:
nbRuns[keyPair] += 1
except KeyError:
tta_sums[keyPair] = data[key0]['ttas'][i]
if data[key0]["ttas"][i] != self.maxTTA:
nbRuns[keyPair] = 1
else:
nbRuns[keyPair] = 0
for k, tta in tta_sums.items():
p0.append(k[0])
p1.append(k[1])
total.append(tta)
for k, run in nbRuns.items():
p2.append(k[0])
p3.append(k[1])
ttRuns.append(run)
for i in range(len(total)):
if(ttRuns[i] == 0): # All tta = -1
total[i] = self.maxTTA
elif ttRuns[i] < runs: # Some tta = -1
total[i] -= (runs-ttRuns[i]) * self.maxTTA
total[i] = total[i]/ttRuns[i]
else: # No tta = -1
total[i] = total[i]/ttRuns[i]
averages = {}
averages[ps[0]] = p0
averages[ps[1]] = p1
averages['ttas'] = total
newData[newKey] = averages
return newData
def similarKeys(self, data):
"""Get the keys for all data with the same x and y labels"""
filteredKeys = {}
for key1, value1 in data.items():
subKeys1 = list(value1.keys())
filteredKeys[(subKeys1[0], subKeys1[1])] = [key1]
for key2, value2 in data.items():
subKeys2 = list(value2.keys())
if key1 != key2 and subKeys1[0] == subKeys2[0] and subKeys1[1] == subKeys2[1]:
try:
filteredKeys[(subKeys1[0], subKeys1[1])].append(key2)
except KeyError:
filteredKeys[(subKeys1[0], subKeys1[1])] = [key2]
print("Getting filtered keys from data...")
return filteredKeys
def formatLabel(self, label):
"""Label formatting for the figures"""
result = ''.join([f" {char}" if char.isupper() else char for char in label])
return result.title()
def formatTitle(self, key):
"""Title formatting for the figures"""
name = ''.join([f" {char}" if char.isupper() else char for char in key.split('_')[0]])
number = key.split('_')[1]
return f"{name.title()}: {number} "
def plotHeatmaps(self):
"""Plot and store the 2D heatmaps in subfolders"""
data= self.plottingData()
"""Average the runs if needed"""
if(len(self.config.runs) > 1):
data = self.averageRuns(data, len(self.config.runs))
filteredKeys = self.similarKeys(data)
vmin, vmax = 0, self.maxTTA+1000
print("Plotting heatmaps...")
"""Create the directory if it doesn't exist already"""
heatmapsFolder = self.folderPath + '/heatmaps'
if not os.path.exists(heatmapsFolder):
os.makedirs(heatmapsFolder)
"""Plot"""
for labels, keys in filteredKeys.items():
for key in keys:
xlabels = np.sort(np.unique(data[key][labels[0]]))
ylabels = np.sort(np.unique(data[key][labels[1]]))
if len(xlabels) < self.minimumDataPoints or len(ylabels) < self.minimumDataPoints:
continue
hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas'])
hist = hist.T
fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax)
plt.xlabel(self.formatLabel(labels[0]))
plt.ylabel(self.formatLabel(labels[1]))
filename = ""
title = ""
paramValueCnt = 0
for param in self.parameters:
if param != labels[0] and param != labels[1] and param != 'run':
filename += f"{key[paramValueCnt]}"
formattedTitle = self.formatTitle(key[paramValueCnt])
title += formattedTitle
if (paramValueCnt+1) % 5 == 0:
title += "\n"
paramValueCnt += 1
title = "Time to Block Availability (ms)"
title_obj = plt.title(title)
font_size = 16 * fig.get_size_inches()[0] / 10
title_obj.set_fontsize(font_size)
filename = filename + ".png"
targetFolder = os.path.join(heatmapsFolder, f"{labels[0]}Vs{labels[1]}")
if not os.path.exists(targetFolder):
os.makedirs(targetFolder)
plt.savefig(os.path.join(targetFolder, filename))
plt.close()
plt.clf()
def plotHist(self, bandwidth):
"""Plot Bandwidth Frequency Histogram"""
plt.hist(bandwidth, bins=5)
plt.xlabel('Bandwidth')
plt.ylabel('Frequency')
plt.title('Bandwidth Histogram')
"""Create the directory if it doesn't exist already"""
histogramFolder = self.folderPath + '/histogram'
if not os.path.exists(histogramFolder):
os.makedirs(histogramFolder)
filename = os.path.join(histogramFolder, 'histogram.png')
plt.savefig(filename)
plt.clf()
def plotHist(self, bandwidth):
"""Plot Bandwidth Frequency Histogram"""
plt.hist(bandwidth, bins=5)
plt.xlabel('Bandwidth')
plt.ylabel('Frequency')
plt.title('Bandwidth Histogram')
"""Create the directory if it doesn't exist already"""
histogramFolder = self.folderPath + '/histogram'
if not os.path.exists(histogramFolder):
os.makedirs(histogramFolder)
filename = os.path.join(histogramFolder, 'histogram.png')
plt.savefig(filename)
plt.clf()
def plotCandleStick(self, TX_prod, TX_avg, TX_max):
#x-axis corresponding to steps
steps = range(len(TX_prod))
#Plot the candlestick chart
ohlc = []
for i in range(len(TX_prod)):
ohlc.append([steps[i], TX_prod[i], TX_max[i], TX_avg[i]])
fig, ax = plt.subplots()
candlestick_ohlc(ax, ohlc, width=0.6, colorup='green', colordown='red')
#Ticks, title and labels
plt.xticks(steps, ['run{}'.format(i) for i in steps], rotation=45)
plt.title('Candlestick Chart')
plt.xlabel('Step')
plt.ylabel('Price')
#Test
plt.show()

View File

@ -1,27 +0,0 @@
[Simulation Space]
numberValidatorStart = 256
numberValidatorStop = 512
numberValidatorStep = 128
failureRateStart = 10
failureRateStop = 90
failureRateStep = 40
blockSizeStart = 32
blockSizeStop = 64
blockSizeStep = 16
netDegreeStart = 6
netDegreeStop = 8
netDegreeStep = 1
chiStart = 4
chiStop = 8
chiStep = 2
[Advanced]
deterministic = 0
numberRuns = 2

20
doc/Makefile Normal file
View File

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

64
doc/conf.py Normal file
View File

@ -0,0 +1,64 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../DAS'))
# -- Project information -----------------------------------------------------
project = 'DAS simulator'
copyright = '2023, Leonardo A. Bautista-Gomez, Csaba Kiraly'
author = 'Leonardo A. Bautista-Gomez, Csaba Kiraly'
# The full version, including alpha/beta/rc tags
release = '1'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc'
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'myenv']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# -- Options for autodoc -------------------------------------------------
autodoc_mock_imports = ["django", "dicttoxml", "bitarray", "DAS", "networkx"]

44
doc/index.rst Normal file
View File

@ -0,0 +1,44 @@
.. DAS simulator documentation master file, created by
sphinx-quickstart on Wed Feb 8 20:56:44 2023.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to DAS simulator's documentation!
=========================================
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
.. toctree::
:maxdepth: 2
:caption: Contents:
.. automodule:: block
:members:
.. automodule:: configuration
:members:
.. automodule:: observer
:members:
.. automodule:: results
:members:
.. automodule:: shape
:members:
.. automodule:: simulator
:members:
.. automodule:: tools
:members:
.. automodule:: validator
:members:

35
doc/make.bat Normal file
View File

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

110
smallConf.py Normal file
View File

@ -0,0 +1,110 @@
"""Example configuration file
This file illustrates how to define options and simulation parameter ranges.
It also defines the traversal order of the simulation space. As the file
extension suggests, configuration is pure python code, allowing complex
setups. Use at your own risk.
To use this example, run
python3 study.py config_example
Otherwise copy it and modify as needed. The default traversal order defined
in the nested loop of nextShape() is good for most cases, but customizable
if needed.
"""
import logging
import itertools
import numpy as np
from DAS.shape import Shape
# Dump results into XML files
dumpXML = 1
# save progress and row/column distribution vectors to XML
saveProgress = 1
# plot progress for each run to PNG
plotProgress = 1
# Save row and column distributions
saveRCdist = 1
# Plot all figures
visualization = 1
# Verbosity level
logLevel = logging.INFO
# number of parallel workers. -1: all cores; 1: sequential
# for more details, see joblib.Parallel
numJobs = -1
# distribute rows/columns evenly between validators (True)
# or generate it using local randomness (False)
evenLineDistribution = True
# Number of simulation runs with the same parameters for statistical relevance
runs = range(3)
# Number of validators
numberNodes = range(128, 513, 128)
# select failure model between: "random, sequential, MEP, MEP+1, DEP, DEP+1, MREP, MREP-1"
failureModels = ["random"]
# Percentage of block not released by producer
failureRates = range(40, 81, 20)
# Block size in one dimension in segments. Block is blockSizes * blockSizes segments.
blockSizes = range(64, 113, 128)
# Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2)
# number of rows and columns a validator is interested in
chis = range(2, 3, 2)
# ratio of class1 nodes (see below for parameters per class)
class1ratios = [0.8]
# Number of validators per beacon node
validatorsPerNode1 = [1]
validatorsPerNode2 = [500]
# Set uplink bandwidth in megabits/second
bwUplinksProd = [200]
bwUplinks1 = [10]
bwUplinks2 = [200]
# Step duration in miliseconds (Classic RTT is about 100ms)
stepDuration = 50
# Segment size in bytes (with proof)
segmentSize = 560
# Set to True if you want your run to be deterministic, False if not
deterministic = True
# If your run is deterministic you can decide the random seed. This is ignore otherwise.
randomSeed = "DAS"
# Number of steps without progress to stop simulation
steps4StopCondition = 7
# Number of validators ready to asume block is available
successCondition = 0.9
# If True, print diagnostics when the block is not available
diagnostics = False
# True to save git diff and git commit
saveGit = False
def nextShape():
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):
# Network Degree has to be an even number
if netDegree % 2 == 0:
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run)
yield shape

105
study.py
View File

@ -1,46 +1,91 @@
#! /bin/python3
import time, sys
import time, sys, random, copy
import importlib
import subprocess
from joblib import Parallel, delayed
from DAS import *
# Parallel execution:
# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see
# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop
# For fixing logging issues in parallel execution, see
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017
def initLogger(config):
"""It initializes the logger."""
logger = logging.getLogger("Study")
logger.setLevel(config.logLevel)
ch = logging.StreamHandler()
ch.setLevel(config.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
return logger
def runOnce(config, shape, execID):
if config.deterministic:
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
sim = Simulator(shape, config, execID)
sim.initLogger()
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dumpXML:
result.dump()
return result
def study():
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)
config = Configuration(sys.argv[1])
sim = Simulator(config)
sim.initLogger()
try:
config = importlib.import_module(sys.argv[1])
except ModuleNotFoundError as e:
try:
config = importlib.import_module(str(sys.argv[1]).replace(".py", ""))
except ModuleNotFoundError as e:
print(e)
print("You need to pass a configuration file in parameter")
exit(1)
logger = initLogger(config)
format = {"entity": "Study"}
results = []
simCnt = 0
sim.logger.info("Starting simulations:", extra=sim.format)
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
# save config and code state for reproducibility
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
subprocess.run(["cp", sys.argv[1], dir+"/"])
logger.info("Starting simulations:", extra=format)
start = time.time()
for run in range(config.numberRuns):
for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep):
for chi in range(config.chiStart, config.chiStop+1, config.chiStep):
for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep):
for nv in range(config.nvStart, config.nvStop+1, config.nvStep):
for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep):
if not config.deterministic:
random.seed(datetime.now())
shape = Shape(blockSize, nv, fr, chi, netDegree)
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Run %d, FR: %d %%, Chi: %d, BlockSize: %d, Nb.Val: %d, netDegree: %d ... Block Available: %d" % (run, fr, chi, blockSize, nv, netDegree, result.blockAvailable), extra=sim.format)
results.append(result)
simCnt += 1
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
end = time.time()
sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format)
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID, config)
vis.plotHeatmaps()
study()
if __name__ == "__main__":
study()