Merge pull request #55 from codex-storage/node-and-validator

Node and validator
This commit is contained in:
Csaba Kiraly 2024-03-01 12:34:37 +01:00 committed by GitHub
commit dc56ba0c44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 167 additions and 126 deletions

View File

@ -32,52 +32,69 @@ class Neighbor:
class Validator: class Validator:
"""This class implements a validator/node in the network.""" def __init__(self, rowIDs, columnIDs):
self.rowIDs = rowIDs
self.columnIDs = columnIDs
def initValidator(nbRows, custodyRows, nbCols, custodyCols):
rowIDs = set(random.sample(range(nbRows), custodyRows))
columnIDs = set(random.sample(range(nbCols), custodyCols))
return Validator(rowIDs, columnIDs)
class Node:
"""This class implements a node in the network."""
def __repr__(self): def __repr__(self):
"""It returns the validator ID.""" """It returns the node ID."""
return str(self.ID) return str(self.ID)
def __init__(self, ID, amIproposer, logger, shape, config, rows = None, columns = None): def __init__(self, ID, amIproposer, logger, shape, config,
"""It initializes the validator with the logger shape and rows/columns. validators, rows = set(), columns = set()):
"""It initializes the node, and eventual validators, following the simulation configuration in shape and config.
If rows/columns are specified these are observed, otherwise (default) If rows/columns are specified these are observed, otherwise (default)
chiR rows and chiC columns are selected randomly. custodyRows rows and custodyCols columns are selected randomly.
""" """
self.shape = shape self.shape = shape
FORMAT = "%(levelname)s : %(entity)s : %(message)s" FORMAT = "%(levelname)s : %(entity)s : %(message)s"
self.ID = ID self.ID = ID
self.format = {"entity": "Val "+str(self.ID)} self.format = {"entity": "Val "+str(self.ID)}
self.block = Block(self.shape.blockSizeR, self.shape.blockSizeRK, self.shape.blockSizeC, self.shape.blockSizeCK) self.block = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK)
self.receivedBlock = Block(self.shape.blockSizeR, self.shape.blockSizeRK, self.shape.blockSizeC, self.shape.blockSizeCK) self.receivedBlock = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK)
self.receivedQueue = deque() self.receivedQueue = deque()
self.sendQueue = deque() self.sendQueue = deque()
self.amIproposer = amIproposer self.amIproposer = amIproposer
self.logger = logger self.logger = logger
if self.shape.chiR < 1 and self.shape.chiC < 1: self.validators = validators
self.logger.error("Chi has to be greater than 0", extra=self.format)
elif self.shape.chiC > self.shape.blockSizeR: if amIproposer:
self.logger.error("ChiC has to be smaller than %d" % self.shape.blockSizeR, extra=self.format) self.nodeClass = 0
elif self.shape.chiR > self.shape.blockSizeC: self.rowIDs = range(shape.nbRows)
self.logger.error("ChiR has to be smaller than %d" % self.shape.blockSizeC, extra=self.format) self.columnIDs = range(shape.nbCols)
else: else:
if amIproposer: self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2
self.nodeClass = 0 self.vpn = len(validators) #TODO: needed by old code, change to fn
self.rowIDs = range(shape.blockSizeC)
self.columnIDs = range(shape.blockSizeR) self.rowIDs = set(rows)
self.columnIDs = set(columns)
if config.validatorBasedCustody:
for v in validators:
self.rowIDs = self.rowIDs.union(v.rowIDs)
self.columnIDs = self.columnIDs.union(v.columnIDs)
else: else:
#if shape.deterministic: if (self.vpn * self.shape.custodyRows) > self.shape.nbRows:
# random.seed(self.ID) self.logger.warning("Row custody (*vpn) larger than number of rows!", extra=self.format)
self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 self.rowIDs = range(self.shape.nbRows)
self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 else:
self.vRowIDs = [] self.rowIDs = set(random.sample(range(self.shape.nbRows), self.vpn*self.shape.custodyRows))
self.vColumnIDs = []
for i in range(self.vpn): if (self.vpn * self.shape.custodyCols) > self.shape.nbCols:
self.vRowIDs.append(set(rows[i*self.shape.chiR:(i+1)*self.shape.chiR]) if rows else set(random.sample(range(self.shape.blockSizeC), self.shape.chiR))) self.logger.warning("Column custody (*vpn) larger than number of columns!", extra=self.format)
self.vColumnIDs.append(set(columns[i*self.shape.chiC:(i+1)*self.shape.chiC]) if columns else set(random.sample(range(self.shape.blockSizeR), self.shape.chiC))) self.columnIDs = range(self.shape.nbCols)
self.rowIDs = set.union(*self.vRowIDs) else:
self.columnIDs = set.union(*self.vColumnIDs) self.columnIDs = set(random.sample(range(self.shape.nbCols), self.vpn*self.shape.custodyCols))
self.rowNeighbors = collections.defaultdict(dict) self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict)
@ -89,7 +106,7 @@ class Validator:
self.statsRxDupInSlot = 0 self.statsRxDupInSlot = 0
self.statsRxDupPerSlot = [] self.statsRxDupPerSlot = []
# Set uplink bandwidth. # Set uplink bandwidth.
# Assuming segments of ~560 bytes and timesteps of 50ms, we get # Assuming segments of ~560 bytes and timesteps of 50ms, we get
# 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep # 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep
if self.amIproposer: if self.amIproposer:
@ -101,8 +118,8 @@ class Validator:
self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize
self.repairOnTheFly = True self.repairOnTheFly = True
self.sendLineUntilR = self.shape.blockSizeRK # stop sending on a p2p link if at least this amount of samples passed self.sendLineUntilR = self.shape.nbColsK # stop sending on a p2p link if at least this amount of samples passed
self.sendLineUntilC = self.shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed self.sendLineUntilC = self.shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
@ -115,7 +132,7 @@ class Validator:
def logIDs(self): def logIDs(self):
"""It logs the assigned rows and columns.""" """It logs the assigned rows and columns."""
if self.amIproposer == 1: if self.amIproposer == 1:
self.logger.warning("I am a block proposer."% self.ID) self.logger.warning("I am a block proposer.", extra=self.format)
else: else:
self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format)
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
@ -127,53 +144,53 @@ class Validator:
else: else:
self.logger.debug("Creating block...", extra=self.format) self.logger.debug("Creating block...", extra=self.format)
if self.shape.failureModel == "random": if self.shape.failureModel == "random":
order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] order = [i for i in range(self.shape.nbCols * self.shape.nbRows)]
order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order)))
for i in order: for i in order:
self.block.data[i] = 1 self.block.data[i] = 1
elif self.shape.failureModel == "sequential": elif self.shape.failureModel == "sequential":
order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] order = [i for i in range(self.shape.nbCols * self.shape.nbRows)]
order = order[:int((1 - self.shape.failureRate/100) * len(order))] order = order[:int((1 - self.shape.failureRate/100) * len(order))]
for i in order: for i in order:
self.block.data[i] = 1 self.block.data[i] = 1
elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if r > self.shape.blockSizeRK or c > self.shape.blockSizeCK: if r > self.shape.nbColsK or c > self.shape.nbRowsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if r > self.shape.blockSizeRK or c > self.shape.blockSizeCK: if r > self.shape.nbColsK or c > self.shape.nbRowsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
self.block.setSegment(0, 0) self.block.setSegment(0, 0)
elif self.shape.failureModel == "DEP": elif self.shape.failureModel == "DEP":
assert(self.shape.blockSizeR == self.shape.blockSizeC and self.shape.blockSizeRK == self.shape.blockSizeCK) assert(self.shape.nbCols == self.shape.nbRows and self.shape.nbColsK == self.shape.nbRowsK)
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if (r+c) % self.shape.blockSizeR > self.shape.blockSizeRK: if (r+c) % self.shape.nbCols > self.shape.nbColsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
elif self.shape.failureModel == "DEP+1": elif self.shape.failureModel == "DEP+1":
assert(self.shape.blockSizeR == self.shape.blockSizeC and self.shape.blockSizeRK == self.shape.blockSizeCK) assert(self.shape.nbCols == self.shape.nbRows and self.shape.nbColsK == self.shape.nbRowsK)
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if (r+c) % self.shape.blockSizeR > self.shape.blockSizeRK: if (r+c) % self.shape.nbCols > self.shape.nbColsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
self.block.setSegment(0, 0) self.block.setSegment(0, 0)
elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if r < self.shape.blockSizeRK or c < self.shape.blockSizeCK: if r < self.shape.nbColsK or c < self.shape.nbRowsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable
for r in range(self.shape.blockSizeR): for r in range(self.shape.nbCols):
for c in range(self.shape.blockSizeC): for c in range(self.shape.nbRows):
if r < self.shape.blockSizeRK or c < self.shape.blockSizeCK: if r < self.shape.nbColsK or c < self.shape.nbRowsK:
self.block.setSegment(r,c) self.block.setSegment(r,c)
self.block.setSegment(0, 0, 0) self.block.setSegment(0, 0, 0)
nbFailures = self.block.data.count(0) nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSizeR * self.shape.blockSizeC) measuredFailureRate = nbFailures * 100 / (self.shape.nbCols * self.shape.nbRows)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
def getColumn(self, index): def getColumn(self, index):
@ -347,7 +364,7 @@ class Validator:
segmentsToSend = [] segmentsToSend = []
for rID, neighs in self.rowNeighbors.items(): for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID) line = self.getRow(rID)
needed = zeros(self.shape.blockSizeR) needed = zeros(self.shape.nbCols)
for neigh in neighs.values(): for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntilR: if sentOrReceived.count(1) < self.sendLineUntilR:
@ -360,7 +377,7 @@ class Validator:
for cID, neighs in self.columnNeighbors.items(): for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID) line = self.getColumn(cID)
needed = zeros(self.shape.blockSizeC) needed = zeros(self.shape.nbRows)
for neigh in neighs.values(): for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntilC: if sentOrReceived.count(1) < self.sendLineUntilC:
@ -422,7 +439,7 @@ class Validator:
while t: while t:
if self.rowIDs: if self.rowIDs:
rID = random.choice(self.rowIDs) rID = random.choice(self.rowIDs)
cID = random.randrange(0, self.shape.blockSizeR) cID = random.randrange(0, self.shape.nbCols)
if self.block.getSegment(rID, cID) : if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.rowNeighbors[rID].values())) neigh = random.choice(list(self.rowNeighbors[rID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh): if self.checkSegmentToNeigh(rID, cID, neigh):
@ -430,7 +447,7 @@ class Validator:
t = tries t = tries
if self.columnIDs: if self.columnIDs:
cID = random.choice(self.columnIDs) cID = random.choice(self.columnIDs)
rID = random.randrange(0, self.shape.blockSizeC) rID = random.randrange(0, self.shape.nbRows)
if self.block.getSegment(rID, cID) : if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.columnNeighbors[cID].values())) neigh = random.choice(list(self.columnNeighbors[cID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh): if self.checkSegmentToNeigh(rID, cID, neigh):
@ -537,8 +554,8 @@ class Validator:
self.logger.debug("status: %d / %d", arrived, expected, extra=self.format) self.logger.debug("status: %d / %d", arrived, expected, extra=self.format)
validated = 0 validated = 0
for i in range(self.vpn): for v in self.validators:
a, e = checkStatus(self.vColumnIDs[i], self.vRowIDs[i]) a, e = checkStatus(v.columnIDs, v.rowIDs)
if a == e: if a == e:
validated+=1 validated+=1

View File

@ -11,11 +11,11 @@ class Observer:
self.config = config self.config = config
self.format = {"entity": "Observer"} self.format = {"entity": "Observer"}
self.logger = logger self.logger = logger
self.block = [0] * self.config.blockSizeR * self.config.blockSizeC self.block = [0] * self.config.nbCols * self.config.nbRows
self.rows = [0] * self.config.blockSizeC self.rows = [0] * self.config.nbRows
self.columns = [0] * self.config.blockSizeR self.columns = [0] * self.config.nbCols
self.broadcasted = Block(self.config.blockSizeR, self.config.blockSizeRK, self.broadcasted = Block(self.config.nbCols, self.config.nbColsK,
self.config.blockSizeC, self.config.blockSizeCK) self.config.nbRows, self.config.nbRowsK)
def checkRowsColumns(self, validators): def checkRowsColumns(self, validators):
@ -27,7 +27,7 @@ class Observer:
for c in val.columnIDs: for c in val.columnIDs:
self.columns[c] += 1 self.columns[c] += 1
for i in range(self.config.blockSizeC): for i in range(self.config.nbRows):
self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format) self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format)
if self.rows[i] == 0 or self.columns[i] == 0: if self.rows[i] == 0 or self.columns[i] == 0:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format) self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
@ -35,7 +35,7 @@ class Observer:
def checkBroadcasted(self): def checkBroadcasted(self):
"""It checks how many broadcasted samples are still missing in the network.""" """It checks how many broadcasted samples are still missing in the network."""
zeros = 0 zeros = 0
for i in range(self.blockSizeR * self.blockSizeC): for i in range(self.nbCols * self.nbRows):
if self.broadcasted.data[i] == 0: if self.broadcasted.data[i] == 0:
zeros += 1 zeros += 1
if zeros > 0: if zeros > 0:

View File

@ -3,21 +3,21 @@
class Shape: class Shape:
"""This class represents a set of parameters for a specific simulation.""" """This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, def __init__(self, nbCols, nbColsK, nbRows, nbRowsK,
numberNodes, failureModel, failureRate, class1ratio, chiR, chiC, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): numberNodes, failureModel, failureRate, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
"""Initializes the shape with the parameters passed in argument.""" """Initializes the shape with the parameters passed in argument."""
self.run = run self.run = run
self.numberNodes = numberNodes self.numberNodes = numberNodes
self.blockSizeR = blockSizeR self.nbCols = nbCols
self.blockSizeRK = blockSizeRK self.nbColsK = nbColsK
self.blockSizeC = blockSizeC self.nbRows = nbRows
self.blockSizeCK = blockSizeCK self.nbRowsK = nbRowsK
self.failureModel = failureModel self.failureModel = failureModel
self.failureRate = failureRate self.failureRate = failureRate
self.netDegree = netDegree self.netDegree = netDegree
self.class1ratio = class1ratio self.class1ratio = class1ratio
self.chiR = chiR self.custodyRows = custodyRows
self.chiC = chiC self.custodyCols = custodyCols
self.vpn1 = vpn1 self.vpn1 = vpn1
self.vpn2 = vpn2 self.vpn2 = vpn2
self.bwUplinkProd = bwUplinkProd self.bwUplinkProd = bwUplinkProd
@ -28,16 +28,16 @@ class Shape:
def __repr__(self): def __repr__(self):
"""Returns a printable representation of the shape""" """Returns a printable representation of the shape"""
shastr = "" shastr = ""
shastr += "bsrn-"+str(self.blockSizeR) shastr += "bsrn-"+str(self.nbCols)
shastr += "-bsrk-"+str(self.blockSizeRK) shastr += "-bsrk-"+str(self.nbColsK)
shastr += "-bscn-"+str(self.blockSizeC) shastr += "-bscn-"+str(self.nbRows)
shastr += "-bsck-"+str(self.blockSizeCK) shastr += "-bsck-"+str(self.nbRowsK)
shastr += "-nn-"+str(self.numberNodes) shastr += "-nn-"+str(self.numberNodes)
shastr += "-fm-"+str(self.failureModel) shastr += "-fm-"+str(self.failureModel)
shastr += "-fr-"+str(self.failureRate) shastr += "-fr-"+str(self.failureRate)
shastr += "-c1r-"+str(self.class1ratio) shastr += "-c1r-"+str(self.class1ratio)
shastr += "-chir-"+str(self.chiR) shastr += "-cusr-"+str(self.custodyRows)
shastr += "-chic-"+str(self.chiC) shastr += "-cusc-"+str(self.custodyCols)
shastr += "-vpn1-"+str(self.vpn1) shastr += "-vpn1-"+str(self.vpn1)
shastr += "-vpn2-"+str(self.vpn2) shastr += "-vpn2-"+str(self.vpn2)
shastr += "-bwupprod-"+str(self.bwUplinkProd) shastr += "-bwupprod-"+str(self.bwUplinkProd)

View File

@ -8,7 +8,7 @@ from datetime import datetime
from DAS.tools import * from DAS.tools import *
from DAS.results import * from DAS.results import *
from DAS.observer import * from DAS.observer import *
from DAS.validator import * from DAS.node import *
class Simulator: class Simulator:
"""This class implements the main DAS simulator.""" """This class implements the main DAS simulator."""
@ -54,20 +54,20 @@ class Simulator:
lightVal = lightNodes * self.shape.vpn1 lightVal = lightNodes * self.shape.vpn1
heavyVal = heavyNodes * self.shape.vpn2 heavyVal = heavyNodes * self.shape.vpn2
totalValidators = lightVal + heavyVal totalValidators = lightVal + heavyVal
totalRows = totalValidators * self.shape.chiR totalRows = totalValidators * self.shape.custodyRows
totalColumns = totalValidators * self.shape.chiC totalColumns = totalValidators * self.shape.custodyCols
rows = list(range(self.shape.blockSizeC)) * (int(totalRows/self.shape.blockSizeC)+1) rows = list(range(self.shape.nbRows)) * (int(totalRows/self.shape.nbRows)+1)
columns = list(range(self.shape.blockSizeR)) * (int(totalColumns/self.shape.blockSizeR)+1) columns = list(range(self.shape.nbCols)) * (int(totalColumns/self.shape.nbCols)+1)
rows = rows[0:totalRows] rows = rows[0:totalRows]
columns = columns[0:totalRows] columns = columns[0:totalRows]
random.shuffle(rows) random.shuffle(rows)
random.shuffle(columns) random.shuffle(columns)
offsetR = lightVal*self.shape.chiR offsetR = lightVal*self.shape.custodyRows
offsetC = lightVal*self.shape.chiC offsetC = lightVal*self.shape.custodyCols
self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format) self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format)
self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format) self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format)
self.logger.debug("Shuffling a total of %d rows to be assigned (X=%d)" % (len(rows), self.shape.chiR), extra=self.format) self.logger.debug("Shuffling a total of %d rows to be assigned (X=%d)" % (len(rows), self.shape.custodyRows), extra=self.format)
self.logger.debug("Shuffling a total of %d columns to be assigned (X=%d)" % (len(columns), self.shape.chiC), extra=self.format) self.logger.debug("Shuffling a total of %d columns to be assigned (X=%d)" % (len(columns), self.shape.custodyCols), extra=self.format)
self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format)
self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format)
@ -76,19 +76,19 @@ class Simulator:
for i in range(self.shape.numberNodes): for i in range(self.shape.numberNodes):
if self.config.evenLineDistribution: if self.config.evenLineDistribution:
if i < int(lightVal/self.shape.vpn1): # First start with the light nodes if i < int(lightVal/self.shape.vpn1): # First start with the light nodes
startR = i *self.shape.chiR*self.shape.vpn1 startR = i *self.shape.custodyRows*self.shape.vpn1
endR = (i+1)*self.shape.chiR*self.shape.vpn1 endR = (i+1)*self.shape.custodyRows*self.shape.vpn1
startC = i *self.shape.chiC*self.shape.vpn1 startC = i *self.shape.custodyCols*self.shape.vpn1
endC = (i+1)*self.shape.chiC*self.shape.vpn1 endC = (i+1)*self.shape.custodyCols*self.shape.vpn1
else: else:
j = i - int(lightVal/self.shape.vpn1) j = i - int(lightVal/self.shape.vpn1)
startR = offsetR+( j *self.shape.chiR*self.shape.vpn2) startR = offsetR+( j *self.shape.custodyRows*self.shape.vpn2)
endR = offsetR+((j+1)*self.shape.chiR*self.shape.vpn2) endR = offsetR+((j+1)*self.shape.custodyRows*self.shape.vpn2)
startC = offsetC+( j *self.shape.chiC*self.shape.vpn2) startC = offsetC+( j *self.shape.custodyCols*self.shape.vpn2)
endC = offsetC+((j+1)*self.shape.chiC*self.shape.vpn2) endC = offsetC+((j+1)*self.shape.custodyCols*self.shape.vpn2)
r = rows[startR:endR] r = rows[startR:endR]
c = columns[startC:endC] c = columns[startC:endC]
val = Validator(i, int(not i!=0), self.logger, self.shape, self.config, r, c) val = Node(i, int(not i!=0), self.logger, self.shape, self.config, r, c)
self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format)
self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format)
assignedRows = assignedRows + list(r) assignedRows = assignedRows + list(r)
@ -97,7 +97,17 @@ class Simulator:
self.nodeColumns.append(val.columnIDs) self.nodeColumns.append(val.columnIDs)
else: else:
val = Validator(i, int(not i!=0), self.logger, self.shape, self.config) if self.shape.custodyCols > self.shape.nbCols:
self.logger.error("custodyCols has to be smaller than %d" % self.shape.nbCols)
elif self.shape.custodyRows > self.shape.nbRows:
self.logger.error("custodyRows has to be smaller than %d" % self.shape.nbRows)
vs = []
nodeClass = 1 if (i <= self.shape.numberNodes * self.shape.class1ratio) else 2
vpn = self.shape.vpn1 if (nodeClass == 1) else self.shape.vpn2
for v in range(vpn):
vs.append(initValidator(self.shape.nbRows, self.shape.custodyRows, self.shape.nbCols, self.shape.custodyCols))
val = Node(i, int(not i!=0), self.logger, self.shape, self.config, vs)
if i == self.proposerID: if i == self.proposerID:
val.initBlock() val.initBlock()
else: else:
@ -112,8 +122,8 @@ class Simulator:
def initNetwork(self): def initNetwork(self):
"""It initializes the simulated network.""" """It initializes the simulated network."""
rowChannels = [[] for i in range(self.shape.blockSizeC)] rowChannels = [[] for i in range(self.shape.nbRows)]
columnChannels = [[] for i in range(self.shape.blockSizeR)] columnChannels = [[] for i in range(self.shape.nbCols)]
for v in self.validators: for v in self.validators:
if not (self.proposerPublishOnly and v.amIproposer): if not (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs: for id in v.rowIDs:
@ -129,7 +139,7 @@ class Simulator:
self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format) self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format)
self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format) self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format)
for id in range(self.shape.blockSizeC): for id in range(self.shape.nbRows):
# If the number of nodes in a channel is smaller or equal to the # If the number of nodes in a channel is smaller or equal to the
# requested degree, a fully connected graph is used. For n>d, a random # requested degree, a fully connected graph is used. For n>d, a random
@ -147,10 +157,10 @@ class Simulator:
for u, v in G.edges: for u, v in G.edges:
val1=rowChannels[id][u] val1=rowChannels[id][u]
val2=rowChannels[id][v] val2=rowChannels[id][v]
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSizeR)}) val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.nbCols)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSizeR)}) val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.nbCols)})
for id in range(self.shape.blockSizeR): for id in range(self.shape.nbCols):
if not columnChannels[id]: if not columnChannels[id]:
self.logger.error("No nodes for column %d !" % id, extra=self.format) self.logger.error("No nodes for column %d !" % id, extra=self.format)
@ -165,8 +175,8 @@ class Simulator:
for u, v in G.edges: for u, v in G.edges:
val1=columnChannels[id][u] val1=columnChannels[id][u]
val2=columnChannels[id][v] val2=columnChannels[id][v]
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSizeC)}) val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.nbRows)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSizeC)}) val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.nbRows)})
for v in self.validators: for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer): if (self.proposerPublishOnly and v.amIproposer):
@ -174,12 +184,12 @@ class Simulator:
count = min(self.proposerPublishTo, len(rowChannels[id])) count = min(self.proposerPublishTo, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count) publishTo = random.sample(rowChannels[id], count)
for vi in publishTo: for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSizeR)}) v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.nbCols)})
for id in v.columnIDs: for id in v.columnIDs:
count = min(self.proposerPublishTo, len(columnChannels[id])) count = min(self.proposerPublishTo, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count) publishTo = random.sample(columnChannels[id], count)
for vi in publishTo: for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSizeC)}) v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.nbRows)})
if self.logger.isEnabledFor(logging.DEBUG): if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberNodes): for i in range(0, self.shape.numberNodes):
@ -280,10 +290,18 @@ class Simulator:
cnD1 = "Dup class1 mean" cnD1 = "Dup class1 mean"
cnD2 = "Dup class2 mean" cnD2 = "Dup class2 mean"
# if custody is based on the requirements of underlying individual
# validators, we can get detailed data on how many validated.
# Otherwise, we can only use the weighted average.
if self.config.validatorBasedCustody:
cnVv = validatorProgress
else:
cnVv = validatorAllProgress
progressVector.append({ progressVector.append({
cnS:sampleProgress, cnS:sampleProgress,
cnN:nodeProgress, cnN:nodeProgress,
cnV:validatorProgress, cnV:cnVv,
cnT0: trafficStats[0]["Tx"]["mean"], cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"], cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"], cnT2: trafficStats[2]["Tx"]["mean"],

View File

@ -16,7 +16,7 @@ class Visualizer:
self.execID = execID self.execID = execID
self.config = config self.config = config
self.folderPath = "results/"+self.execID self.folderPath = "results/"+self.execID
self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'cus', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2']
self.minimumDataPoints = 2 self.minimumDataPoints = 2
self.maxTTA = 11000 self.maxTTA = 11000
@ -32,12 +32,12 @@ class Visualizer:
tree = ET.parse(os.path.join(self.folderPath, filename)) tree = ET.parse(os.path.join(self.folderPath, filename))
root = tree.getroot() root = tree.getroot()
run = int(root.find('run').text) run = int(root.find('run').text)
blockSize = int(root.find('blockSizeR').text) # TODO: maybe we want both dimensions blockSize = int(root.find('nbCols').text) # TODO: maybe we want both dimensions
failureRate = int(root.find('failureRate').text) failureRate = int(root.find('failureRate').text)
numberNodes = int(root.find('numberNodes').text) numberNodes = int(root.find('numberNodes').text)
class1ratio = float(root.find('class1ratio').text) class1ratio = float(root.find('class1ratio').text)
netDegree = int(root.find('netDegree').text) netDegree = int(root.find('netDegree').text)
chi = int(root.find('chiR').text) # TODO: maybe we want both dimensions custodyRows = int(root.find('custodyRows').text) # TODO: maybe we want both dimensions
vpn1 = int(root.find('vpn1').text) vpn1 = int(root.find('vpn1').text)
vpn2 = int(root.find('vpn2').text) vpn2 = int(root.find('vpn2').text)
bwUplinkProd = int(root.find('bwUplinkProd').text) bwUplinkProd = int(root.find('bwUplinkProd').text)
@ -53,7 +53,7 @@ class Visualizer:
# Get the indices and values of the parameters in the combination # Get the indices and values of the parameters in the combination
indices = [self.parameters.index(element) for element in combination] indices = [self.parameters.index(element) for element in combination]
selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2] selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, custodyRows, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2]
values = [selectedValues[index] for index in indices] values = [selectedValues[index] for index in indices]
names = [self.parameters[i] for i in indices] names = [self.parameters[i] for i in indices]
keyComponents = [f"{name}_{value}" for name, value in zip(names, values)] keyComponents = [f"{name}_{value}" for name, value in zip(names, values)]

View File

@ -42,7 +42,7 @@ numJobs = -1
# distribute rows/columns evenly between validators (True) # distribute rows/columns evenly between validators (True)
# or generate it using local randomness (False) # or generate it using local randomness (False)
evenLineDistribution = True evenLineDistribution = False
# Number of simulation runs with the same parameters for statistical relevance # Number of simulation runs with the same parameters for statistical relevance
runs = range(3) runs = range(3)
@ -62,15 +62,21 @@ blockSizes = range(64, 113, 128)
# Per-topic mesh neighborhood size # Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2) netDegrees = range(8, 9, 2)
# number of rows and columns a validator is interested in # the overall number of row/columns taken into custody by a node is determined by
chis = range(2, 3, 2) # a base number (custody) and a class specific multiplier (validatorsPerNode).
# We support two models:
# - validatorsBasedCustody: each validator has a unique subset of size custody,
# and custody is the union of these. I.e. VPN is a "probabilistic multiplier"
# - !validatorsBasedCustody: VPN is interpreted as a simple custody multiplier
validatorBasedCustody = False
custody = [2]
# ratio of class1 nodes (see below for parameters per class) # ratio of class1 nodes (see below for parameters per class)
class1ratios = [0.8] class1ratios = [0.8]
# Number of validators per beacon node # Number of validators per beacon node
validatorsPerNode1 = [1] validatorsPerNode1 = [1]
validatorsPerNode2 = [500] validatorsPerNode2 = [5]
# Set uplink bandwidth in megabits/second # Set uplink bandwidth in megabits/second
bwUplinksProd = [200] bwUplinksProd = [200]
@ -102,12 +108,12 @@ diagnostics = False
saveGit = False saveGit = False
def nextShape(): def nextShape():
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( for run, fm, fr, class1ratio, cust, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): runs, failureModels, failureRates, class1ratios, custody, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):
# Network Degree has to be an even number # Network Degree has to be an even number
if netDegree % 2 == 0: if netDegree % 2 == 0:
blockSizeR = blockSizeC = blockSize nbCols = nbRows = blockSize
blockSizeRK = blockSizeCK = blockSize // 2 nbColsK = nbRowsK = blockSize // 2
chiR = chiC = chi custodyRows = custodyCols = cust
shape = Shape(blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, nn, fm, fr, class1ratio, chiR, chiC, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) shape = Shape(nbCols, nbColsK, nbRows, nbRowsK, nn, fm, fr, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run)
yield shape yield shape