Merge develop into vis

This commit is contained in:
Leonardo Bautista-Gomez 2023-04-27 13:58:51 +02:00
commit 6eac3eb8e7
8 changed files with 275 additions and 144 deletions

View File

@ -1,5 +1,6 @@
#!/bin/python3 #!/bin/python3
import numpy as np
from DAS.block import * from DAS.block import *
class Observer: class Observer:
@ -10,21 +11,12 @@ class Observer:
self.config = config self.config = config
self.format = {"entity": "Observer"} self.format = {"entity": "Observer"}
self.logger = logger self.logger = logger
self.block = []
self.rows = []
self.columns = []
self.goldenData = []
self.broadcasted = []
def reset(self):
"""It resets all the gathered data to zeros."""
self.block = [0] * self.config.blockSize * self.config.blockSize self.block = [0] * self.config.blockSize * self.config.blockSize
self.goldenData = [0] * self.config.blockSize * self.config.blockSize
self.rows = [0] * self.config.blockSize self.rows = [0] * self.config.blockSize
self.columns = [0] * self.config.blockSize self.columns = [0] * self.config.blockSize
self.broadcasted = Block(self.config.blockSize) self.broadcasted = Block(self.config.blockSize)
def checkRowsColumns(self, validators): def checkRowsColumns(self, validators):
"""It checks how many validators have been assigned to each row and column.""" """It checks how many validators have been assigned to each row and column."""
for val in validators: for val in validators:
@ -39,11 +31,6 @@ class Observer:
if self.rows[i] == 0 or self.columns[i] == 0: if self.rows[i] == 0 or self.columns[i] == 0:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format) self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
def setGoldenData(self, block):
"""Stores the original real data to compare it with future situations."""
for i in range(self.config.blockSize*self.config.blockSize):
self.goldenData[i] = block.data[i]
def checkBroadcasted(self): def checkBroadcasted(self):
"""It checks how many broadcasted samples are still missing in the network.""" """It checks how many broadcasted samples are still missing in the network."""
zeros = 0 zeros = 0
@ -58,9 +45,55 @@ class Observer:
"""It checks the status of how many expected and arrived samples globally.""" """It checks the status of how many expected and arrived samples globally."""
arrived = 0 arrived = 0
expected = 0 expected = 0
ready = 0
validated = 0
for val in validators: for val in validators:
if val.amIproposer == 0: if val.amIproposer == 0:
(a, e) = val.checkStatus() (a, e) = val.checkStatus()
arrived += a arrived += a
expected += e expected += e
return (arrived, expected) if a == e:
ready += 1
validated += val.vpn
return (arrived, expected, ready, validated)
def getProgress(self, validators):
"""Calculate current simulation progress with different metrics.
Returns:
- missingSamples: overall number of sample instances missing in nodes.
Sample are counted on both rows and columns, so intersections of interest are counted twice.
- sampleProgress: previous expressed as progress ratio
- nodeProgress: ratio of nodes having all segments interested in
- validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator,
but counts a validator only if its support node's all validators see all interesting segments
TODO: add real per validator progress counter
"""
arrived, expected, ready, validated = self.checkStatus(validators)
missingSamples = expected - arrived
sampleProgress = arrived / expected
nodeProgress = ready / (len(validators)-1)
validatorCnt = sum([v.vpn for v in validators[1:]])
validatorProgress = validated / validatorCnt
return missingSamples, sampleProgress, nodeProgress, validatorProgress
def getTrafficStats(self, validators):
"""Summary statistics of traffic measurements in a timestep."""
def maxOrNan(l):
return np.max(l) if l else np.NaN
def meanOrNan(l):
return np.mean(l) if l else np.NaN
trafficStats = {}
for cl in range(0,3):
Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl]
Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl]
RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl]
trafficStats[cl] = {
"Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)},
"Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)},
"RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)},
}
return trafficStats

View File

@ -1,8 +1,8 @@
bitarray==2.6.0 bitarray==2.6.0
DAS==0.30.0
dicttoxml==1.7.16 dicttoxml==1.7.16
matplotlib==3.6.2 matplotlib==3.6.2
mplfinance==0.12.9b7 mplfinance==0.12.9b7
networkx==3.0 networkx==3.0
numpy==1.23.5 numpy==1.23.5
seaborn==0.12.2 seaborn==0.12.2
joblib==1.2.0

View File

@ -1,37 +1,45 @@
#!/bin/python3 #!/bin/python3
import os import os
import bisect
from xml.dom import minidom from xml.dom import minidom
from dicttoxml import dicttoxml from dicttoxml import dicttoxml
class Result: class Result:
"""This class stores and process/store the results of a simulation.""" """This class stores and process/store the results of a simulation."""
def __init__(self, shape): def __init__(self, shape, execID):
"""It initializes the instance with a specific shape.""" """It initializes the instance with a specific shape."""
self.shape = shape self.shape = shape
self.execID = execID
self.blockAvailable = -1 self.blockAvailable = -1
self.tta = -1 self.tta = -1
self.missingVector = [] self.missingVector = []
self.metrics = {}
def populate(self, shape, missingVector): def populate(self, shape, config, missingVector):
"""It populates part of the result data inside a vector.""" """It populates part of the result data inside a vector."""
self.shape = shape self.shape = shape
self.missingVector = missingVector self.missingVector = missingVector
missingSamples = missingVector[-1] v = self.metrics["progress"]["validators ready"]
if missingSamples == 0: tta = bisect.bisect(v, config.successCondition)
if v[-1] >= config.successCondition:
self.blockAvailable = 1 self.blockAvailable = 1
self.tta = len(missingVector) self.tta = tta * (config.stepDuration)
else: else:
self.blockAvailable = 0 self.blockAvailable = 0
self.tta = -1 self.tta = -1
def dump(self, execID): def addMetric(self, name, metric):
"""Generic function to add a metric to the results."""
self.metrics[name] = metric
def dump(self):
"""It dumps the results of the simulation in an XML file.""" """It dumps the results of the simulation in an XML file."""
if not os.path.exists("results"): if not os.path.exists("results"):
os.makedirs("results") os.makedirs("results")
if not os.path.exists("results/"+execID): if not os.path.exists("results/"+self.execID):
os.makedirs("results/"+execID) os.makedirs("results/"+self.execID)
resd1 = self.shape.__dict__ resd1 = self.shape.__dict__
resd2 = self.__dict__.copy() resd2 = self.__dict__.copy()
resd2.pop("shape") resd2.pop("shape")
@ -39,6 +47,6 @@ class Result:
resXml = dicttoxml(resd1) resXml = dicttoxml(resd1)
xmlstr = minidom.parseString(resXml) xmlstr = minidom.parseString(resXml)
xmlPretty = xmlstr.toprettyxml() xmlPretty = xmlstr.toprettyxml()
filePath = "results/"+execID+"/"+str(self.shape)+".xml" filePath = "results/"+self.execID+"/"+str(self.shape)+".xml"
with open(filePath, "w") as f: with open(filePath, "w") as f:
f.write(xmlPretty) f.write(xmlPretty)

View File

@ -2,8 +2,9 @@
import networkx as nx import networkx as nx
import logging, random import logging, random
import pandas as pd
from functools import partial, partialmethod
from datetime import datetime from datetime import datetime
from statistics import mean
from DAS.tools import * from DAS.tools import *
from DAS.results import * from DAS.results import *
from DAS.observer import * from DAS.observer import *
@ -12,33 +13,54 @@ from DAS.validator import *
class Simulator: class Simulator:
"""This class implements the main DAS simulator.""" """This class implements the main DAS simulator."""
def __init__(self, shape, config): def __init__(self, shape, config, execID):
"""It initializes the simulation with a set of parameters (shape).""" """It initializes the simulation with a set of parameters (shape)."""
self.shape = shape self.shape = shape
self.config = config self.config = config
self.format = {"entity": "Simulator"} self.format = {"entity": "Simulator"}
self.result = Result(self.shape) self.execID = execID
self.result = Result(self.shape, self.execID)
self.validators = [] self.validators = []
self.logger = [] self.logger = []
self.logLevel = config.logLevel self.logLevel = config.logLevel
self.proposerID = 0 self.proposerID = 0
self.glob = [] self.glob = []
self.execID = execID
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def initValidators(self): def initValidators(self):
"""It initializes all the validators in the network.""" """It initializes all the validators in the network."""
self.glob = Observer(self.logger, self.shape) self.glob = Observer(self.logger, self.shape)
self.glob.reset()
self.validators = [] self.validators = []
if self.config.evenLineDistribution: if self.config.evenLineDistribution:
lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1) lightVal = int(self.shape.numberNodes * self.shape.class1ratio * self.shape.vpn1)
heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2) heavyVal = int(self.shape.numberNodes * (1-self.shape.class1ratio) * self.shape.vpn2)
totalValidators = lightVal + heavyVal totalValidators = lightVal + heavyVal
rows = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) totalRows = totalValidators * self.shape.chi
columns = list(range(self.shape.blockSize)) * (int(totalValidators/self.shape.blockSize)+1) rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
offset = heavyVal*self.shape.chi offset = heavyVal*self.shape.chi
random.shuffle(rows) random.shuffle(rows)
random.shuffle(columns) random.shuffle(columns)
self.logger.debug("There is a total of %d validators" % totalValidators, extra=self.format)
self.logger.debug("Shuffling a total of %d rows/columns" % len(rows), extra=self.format)
self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format)
self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format)
assignedRows = []
assignedCols = []
for i in range(self.shape.numberNodes): for i in range(self.shape.numberNodes):
if self.config.evenLineDistribution: if self.config.evenLineDistribution:
if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes if i < int(heavyVal/self.shape.vpn2): # First start with the heavy nodes
@ -48,17 +70,26 @@ class Simulator:
j = i - int(heavyVal/self.shape.vpn2) j = i - int(heavyVal/self.shape.vpn2)
start = offset+( j *self.shape.chi) start = offset+( j *self.shape.chi)
end = offset+((j+1)*self.shape.chi) end = offset+((j+1)*self.shape.chi)
r = rows[start:end] r = set(rows[start:end])
c = columns[start:end] c = set(columns[start:end])
val = Validator(i, int(not i!=0), self.logger, self.shape, r, c) val = Validator(i, int(not i!=0), self.logger, self.shape, r, c)
self.logger.debug("Validators %d row IDs: %s" % (val.ID, val.rowIDs), extra=self.format)
self.logger.debug("Validators %d column IDs: %s" % (val.ID, val.columnIDs), extra=self.format)
assignedRows = assignedRows + list(r)
assignedCols = assignedCols + list(c)
else: else:
val = Validator(i, int(not i!=0), self.logger, self.shape) val = Validator(i, int(not i!=0), self.logger, self.shape)
if i == self.proposerID: if i == self.proposerID:
val.initBlock() val.initBlock()
self.glob.setGoldenData(val.block)
else: else:
val.logIDs() val.logIDs()
self.validators.append(val) self.validators.append(val)
assignedRows.sort()
assignedCols.sort()
self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format)
self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format)
self.logger.debug("Validators initialized.", extra=self.format) self.logger.debug("Validators initialized.", extra=self.format)
def initNetwork(self): def initNetwork(self):
@ -73,12 +104,14 @@ class Simulator:
columnChannels[id].append(v) columnChannels[id].append(v)
# Check rows/columns distribution # Check rows/columns distribution
#totalR = 0 distR = []
#totalC = 0 distC = []
#for r in rowChannels: for r in rowChannels:
# totalR += len(r) distR.append(len(r))
#for c in columnChannels: for c in columnChannels:
# totalC += len(c) distC.append(len(c))
self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(distR), max(distR)), extra=self.format)
self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(distC), max(distC)), extra=self.format)
for id in range(self.shape.blockSize): for id in range(self.shape.blockSize):
@ -137,6 +170,11 @@ class Simulator:
def initLogger(self): def initLogger(self):
"""It initializes the logger.""" """It initializes the logger."""
logging.TRACE = 5
logging.addLevelName(logging.TRACE, 'TRACE')
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
logger = logging.getLogger("DAS") logger = logging.getLogger("DAS")
if len(logger.handlers) == 0: if len(logger.handlers) == 0:
logger.setLevel(self.logLevel) logger.setLevel(self.logLevel)
@ -146,37 +184,37 @@ class Simulator:
logger.addHandler(ch) logger.addHandler(ch)
self.logger = logger self.logger = logger
def printDiagnostics(self):
def resetShape(self, shape): """Print all required diagnostics to check when a block does not become available"""
"""It resets the parameters of the simulation."""
self.shape = shape
self.result = Result(self.shape)
for val in self.validators: for val in self.validators:
val.shape.failureRate = shape.failureRate (a, e) = val.checkStatus()
val.shape.chi = shape.chi if e-a > 0 and val.ID != 0:
val.shape.vpn1 = shape.vpn1 self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format)
val.shape.vpn2 = shape.vpn2 for r in val.rowIDs:
row = val.getRow(r)
# In GossipSub the initiator might push messages without participating in the mesh. if row.count() < len(row):
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format)
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer neiR = val.rowNeighbors[r]
# might get back segments from other peers since links are symmetric. for nr in neiR:
self.proposerPublishOnly = True self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format)
self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format)
# If proposerPublishOnly == True, this regulates how many copies of each segment are for c in val.columnIDs:
# pushed out by the proposer. col = val.getColumn(c)
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total) if col.count() < len(col):
# self.shape.netDegree: default behavior similar (but not same) to previous code self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format)
self.proposerPublishTo = self.shape.netDegree neiC = val.columnNeighbors[c]
for nc in neiC:
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)
def run(self): def run(self):
"""It runs the main simulation until the block is available or it gets stucked.""" """It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators) self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock() arrived, expected, ready, validated = self.glob.checkStatus(self.validators)
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived missingSamples = expected - arrived
missingVector = [] missingVector = []
progressVector = []
trafficStatsVector = []
steps = 0 steps = 0
while(True): while(True):
missingVector.append(missingSamples) missingVector.append(missingSamples)
@ -197,30 +235,58 @@ class Simulator:
self.validators[i].logColumns() self.validators[i].logColumns()
# log TX and RX statistics # log TX and RX statistics
statsTxInSlot = [v.statsTxInSlot for v in self.validators] trafficStats = self.glob.getTrafficStats(self.validators)
statsRxInSlot = [v.statsRxInSlot for v in self.validators] self.logger.debug("step %d: %s" %
self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % (steps, trafficStats), extra=self.format)
(steps, statsTxInSlot[0], statsRxInSlot[0],
mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
for i in range(0,self.shape.numberNodes): for i in range(0,self.shape.numberNodes):
self.validators[i].updateStats() self.validators[i].updateStats()
trafficStatsVector.append(trafficStats)
missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators)
self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format)
cnS = "samples received"
cnN = "nodes ready"
cnV = "validators ready"
cnT0 = "TX builder mean"
cnT1 = "TX class1 mean"
cnT2 = "TX class2 mean"
cnR1 = "RX class1 mean"
cnR2 = "RX class2 mean"
cnD1 = "Dup class1 mean"
cnD2 = "Dup class2 mean"
progressVector.append({
cnS:sampleProgress,
cnN:nodeProgress,
cnV:validatorProgress,
cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"],
cnR1: trafficStats[1]["Rx"]["mean"],
cnR2: trafficStats[2]["Rx"]["mean"],
cnD1: trafficStats[1]["RxDup"]["mean"],
cnD2: trafficStats[2]["RxDup"]["mean"],
})
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingRate = missingSamples*100/expected
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples: if missingSamples == oldMissingSamples:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) if len(missingVector) > self.config.steps4StopCondition:
if missingSamples == missingVector[-self.config.steps4StopCondition]:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
if self.config.diagnostics:
self.printDiagnostics()
break
missingVector.append(missingSamples) missingVector.append(missingSamples)
break
elif missingSamples == 0: elif missingSamples == 0:
#self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
missingVector.append(missingSamples) missingVector.append(missingSamples)
break break
else: steps += 1
steps += 1
self.result.populate(self.shape, missingVector) progress = pd.DataFrame(progressVector)
if self.config.saveProgress:
self.result.addMetric("progress", progress.to_dict(orient='list'))
self.result.populate(self.shape, self.config, missingVector)
return self.result return self.result

View File

@ -61,14 +61,16 @@ class Validator:
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format)
else: else:
if amIproposer: if amIproposer:
self.nodeClass = 0
self.rowIDs = range(shape.blockSize) self.rowIDs = range(shape.blockSize)
self.columnIDs = range(shape.blockSize) self.columnIDs = range(shape.blockSize)
else: else:
#if shape.deterministic: #if shape.deterministic:
# random.seed(self.ID) # random.seed(self.ID)
vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2 self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2
self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2
self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn) self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn)
self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn)
self.rowNeighbors = collections.defaultdict(dict) self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict)
@ -77,13 +79,15 @@ class Validator:
self.statsTxPerSlot = [] self.statsTxPerSlot = []
self.statsRxInSlot = 0 self.statsRxInSlot = 0
self.statsRxPerSlot = [] self.statsRxPerSlot = []
self.statsRxDupInSlot = 0
self.statsRxDupPerSlot = []
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
# TODO: this should be a parameter # TODO: this should be a parameter
if self.amIproposer: if self.amIproposer:
self.bwUplink = shape.bwUplinkProd self.bwUplink = shape.bwUplinkProd
elif self.ID <= shape.numberNodes * shape.class1ratio: elif self.nodeClass == 1:
self.bwUplink = shape.bwUplink1 self.bwUplink = shape.bwUplink1
else: else:
self.bwUplink = shape.bwUplink2 self.bwUplink = shape.bwUplink2
@ -109,33 +113,18 @@ class Validator:
def initBlock(self): def initBlock(self):
"""It initializes the block for the proposer.""" """It initializes the block for the proposer."""
if self.amIproposer == 1:
self.logger.debug("I am a block proposer.", extra=self.format)
self.block = Block(self.shape.blockSize)
self.block.fill()
#self.block.print()
else:
self.logger.warning("I am not a block proposer."% self.ID)
def broadcastBlock(self):
"""The block proposer broadcasts the block to all validators."""
if self.amIproposer == 0: if self.amIproposer == 0:
self.logger.warning("I am not a block proposer", extra=self.format) self.logger.warning("I am not a block proposer", extra=self.format)
else: else:
self.logger.debug("Broadcasting my block...", extra=self.format) self.logger.debug("Creating block...", extra=self.format)
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
random.shuffle(order) order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order)))
while(order): for i in order:
i = order.pop() self.block.data[i] = 1
if (random.randint(0,99) >= self.shape.failureRate):
self.block.data[i] = 1
else:
self.block.data[i] = 0
nbFailures = self.block.data.count(0) nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
#broadcasted.print()
def getColumn(self, index): def getColumn(self, index):
"""It returns a given column.""" """It returns a given column."""
@ -155,13 +144,13 @@ class Validator:
if src in self.columnNeighbors[cID]: if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1 self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID): if not self.receivedBlock.getSegment(rID, cID):
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID) self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue: if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID)) self.receivedQueue.append((rID, cID))
else: else:
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1 self.statsRxDupInSlot += 1
self.statsRxInSlot += 1 self.statsRxInSlot += 1
def addToSendQueue(self, rID, cID): def addToSendQueue(self, rID, cID):
@ -183,7 +172,7 @@ class Validator:
if self.amIproposer == 1: if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format) self.logger.error("I am a block proposer", extra=self.format)
else: else:
self.logger.debug("Receiving the data...", extra=self.format) self.logger.trace("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.block.merge(self.receivedBlock) self.block.merge(self.receivedBlock)
@ -203,8 +192,10 @@ class Validator:
"""It updates the stats related to sent and received data.""" """It updates the stats related to sent and received data."""
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
self.statsRxPerSlot.append(self.statsRxInSlot) self.statsRxPerSlot.append(self.statsRxInSlot)
self.statsRxDupPerSlot.append(self.statsRxDupInSlot)
self.statsTxPerSlot.append(self.statsTxInSlot) self.statsTxPerSlot.append(self.statsTxInSlot)
self.statsRxInSlot = 0 self.statsRxInSlot = 0
self.statsRxDupInSlot = 0
self.statsTxInSlot = 0 self.statsTxInSlot = 0
def checkSegmentToNeigh(self, rID, cID, neigh): def checkSegmentToNeigh(self, rID, cID, neigh):
@ -219,7 +210,7 @@ class Validator:
def sendSegmentToNeigh(self, rID, cID, neigh): def sendSegmentToNeigh(self, rID, cID, neigh):
"""Send segment to a neighbor (without checks).""" """Send segment to a neighbor (without checks)."""
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID i = rID if neigh.dim else cID
neigh.sent[i] = 1 neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID) neigh.node.receiveSegment(rID, cID, self.ID)
@ -454,7 +445,7 @@ class Validator:
# be queued after successful repair. # be queued after successful repair.
for i in range(len(rep)): for i in range(len(rep)):
if rep[i]: if rep[i]:
self.logger.debug("Rep: %d,%d", id, i, extra=self.format) self.logger.trace("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i) self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1) # self.statsRepairInSlot += rep.count(1)
@ -472,7 +463,7 @@ class Validator:
# be queued after successful repair. # be queued after successful repair.
for i in range(len(rep)): for i in range(len(rep)):
if rep[i]: if rep[i]:
self.logger.debug("Rep: %d,%d", i, id, extra=self.format) self.logger.trace("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id) self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1) # self.statsRepairInSlot += rep.count(1)

View File

@ -18,7 +18,7 @@ class Visualizer:
self.folderPath = "results/"+self.execID self.folderPath = "results/"+self.execID
self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2']
self.minimumDataPoints = 2 self.minimumDataPoints = 2
self.maxTTA = 50 self.maxTTA = 11000
def plottingData(self): def plottingData(self):
"""Store data with a unique key for each params combination""" """Store data with a unique key for each params combination"""
@ -43,7 +43,7 @@ class Visualizer:
bwUplinkProd = int(root.find('bwUplinkProd').text) bwUplinkProd = int(root.find('bwUplinkProd').text)
bwUplink1 = int(root.find('bwUplink1').text) bwUplink1 = int(root.find('bwUplink1').text)
bwUplink2 = int(root.find('bwUplink2').text) bwUplink2 = int(root.find('bwUplink2').text)
tta = int(root.find('tta').text) tta = float(root.find('tta').text)
"""Store BW""" """Store BW"""
bw.append(bwUplinkProd) bw.append(bwUplinkProd)
@ -182,7 +182,7 @@ class Visualizer:
if(len(self.config.runs) > 1): if(len(self.config.runs) > 1):
data = self.averageRuns(data, len(self.config.runs)) data = self.averageRuns(data, len(self.config.runs))
filteredKeys = self.similarKeys(data) filteredKeys = self.similarKeys(data)
vmin, vmax = 0, self.maxTTA+10 vmin, vmax = 0, self.maxTTA+1000
print("Plotting heatmaps...") print("Plotting heatmaps...")
"""Create the directory if it doesn't exist already""" """Create the directory if it doesn't exist already"""
@ -200,7 +200,7 @@ class Visualizer:
hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas']) hist, xedges, yedges = np.histogram2d(data[key][labels[0]], data[key][labels[1]], bins=(len(xlabels), len(ylabels)), weights=data[key]['ttas'])
hist = hist.T hist = hist.T
fig, ax = plt.subplots(figsize=(10, 6)) fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax) sns.heatmap(hist, xticklabels=xlabels, yticklabels=ylabels, cmap='hot_r', cbar_kws={'label': 'Time to block availability (ms)'}, linecolor='black', linewidths=0.3, annot=True, fmt=".2f", ax=ax, vmin=vmin, vmax=vmax)
plt.xlabel(self.formatLabel(labels[0])) plt.xlabel(self.formatLabel(labels[0]))
plt.ylabel(self.formatLabel(labels[1])) plt.ylabel(self.formatLabel(labels[1]))
filename = "" filename = ""
@ -209,10 +209,12 @@ class Visualizer:
for param in self.parameters: for param in self.parameters:
if param != labels[0] and param != labels[1] and param != 'run': if param != labels[0] and param != labels[1] and param != 'run':
filename += f"{key[paramValueCnt]}" filename += f"{key[paramValueCnt]}"
#formattedTitle = self.formatTitle(key[paramValueCnt]) formattedTitle = self.formatTitle(key[paramValueCnt])
formattedTitle = "Time to block availability"
title += formattedTitle title += formattedTitle
if (paramValueCnt+1) % 5 == 0:
title += "\n"
paramValueCnt += 1 paramValueCnt += 1
title = "Time to Block Availability (ms)"
title_obj = plt.title(title) title_obj = plt.title(title)
font_size = 16 * fig.get_size_inches()[0] / 10 font_size = 16 * fig.get_size_inches()[0] / 10
title_obj.set_fontsize(font_size) title_obj.set_fontsize(font_size)

View File

@ -18,6 +18,7 @@ import itertools
import numpy as np import numpy as np
from DAS.shape import Shape from DAS.shape import Shape
# Dump results into XML files
dumpXML = 1 dumpXML = 1
# save progress vectors to XML # save progress vectors to XML
@ -26,7 +27,13 @@ saveProgress = 1
# plot progress for each run to PNG # plot progress for each run to PNG
plotProgress = 1 plotProgress = 1
# Save row and column distributions
saveRCdist = 1
# Plot all figures
visualization = 1 visualization = 1
# Verbosity level
logLevel = logging.INFO logLevel = logging.INFO
# number of parallel workers. -1: all cores; 1: sequential # number of parallel workers. -1: all cores; 1: sequential
@ -59,8 +66,8 @@ chis = range(2, 3, 2)
class1ratios = [0.8] class1ratios = [0.8]
# Number of validators per beacon node # Number of validators per beacon node
validatorsPerNode1 = [2] validatorsPerNode1 = [1]
validatorsPerNode2 = [4] validatorsPerNode2 = [500]
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
@ -68,26 +75,27 @@ bwUplinksProd = [2200]
bwUplinks1 = [110] bwUplinks1 = [110]
bwUplinks2 = [2200] bwUplinks2 = [2200]
# Step duration in miliseconds (Classic RTT is about 100ms)
stepDuration = 50
# Set to True if you want your run to be deterministic, False if not # Set to True if you want your run to be deterministic, False if not
deterministic = True deterministic = True
# If your run is deterministic you can decide the random seed. This is ignore otherwise. # If your run is deterministic you can decide the random seed. This is ignore otherwise.
randomSeed = "DAS" randomSeed = "DAS"
saveProgress = 1 # Number of steps without progress to stop simulation
saveRCdist = 1 steps4StopCondition = 7
# Number of validators ready to asume block is available
successCondition = 0.9
# If True, print diagnostics when the block is not available # If True, print diagnostics when the block is not available
diagnostics = False diagnostics = False
# Number of steps without progress to stop simulation # True to save git diff and git commit
steps4StopCondition = 7
saveGit = False saveGit = False
successCondition = 0.9
stepDuration = 50
def nextShape(): def nextShape():
for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( for run, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(
runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): runs, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):

View File

@ -2,6 +2,7 @@
import time, sys, random, copy import time, sys, random, copy
import importlib import importlib
import subprocess
from joblib import Parallel, delayed from joblib import Parallel, delayed
from DAS import * from DAS import *
@ -12,17 +13,32 @@ from DAS import *
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls # https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017 # and https://github.com/joblib/joblib/issues/1017
def runOnce(sim, config, shape): def initLogger(config):
"""It initializes the logger."""
logger = logging.getLogger("Study")
logger.setLevel(config.logLevel)
ch = logging.StreamHandler()
ch.setLevel(config.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
return logger
def runOnce(config, shape, execID):
if config.deterministic: if config.deterministic:
shape.setSeed(config.randomSeed+"-"+str(shape)) shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed) random.seed(shape.randomSeed)
sim = Simulator(shape, config, execID)
sim.initLogger() sim.initLogger()
sim.resetShape(shape)
sim.initValidators() sim.initValidators()
sim.initNetwork() sim.initNetwork()
result = sim.run() result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dumpXML:
result.dump()
return result return result
def study(): def study():
@ -40,29 +56,36 @@ def study():
print("You need to pass a configuration file in parameter") print("You need to pass a configuration file in parameter")
exit(1) exit(1)
shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) logger = initLogger(config)
sim = Simulator(shape, config) format = {"entity": "Study"}
sim.initLogger()
results = [] results = []
now = datetime.now() now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999)) execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
sim.logger.info("Starting simulations:", extra=sim.format) # save config and code state for reproducibility
start = time.time() if not os.path.exists("results"):
results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape()) os.makedirs("results")
end = time.time() dir = "results/"+execID
sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format) if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
subprocess.run(["cp", sys.argv[1], dir+"/"])
if config.dumpXML: logger.info("Starting simulations:", extra=format)
for res in results: start = time.time()
res.dump(execID) results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format) end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization: if config.visualization:
vis = Visualizer(execID, config) vis = Visualizer(execID, config)
vis.plotHeatmaps() vis.plotHeatmaps()
if __name__ == "__main__":
study() study()