383 lines
20 KiB
383 lines
20 KiB
import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from datetime import datetime
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.node import *
class Simulator:
"""This class implements the main DAS simulator."""
def __init__(self, shape, config, execID):
"""It initializes the simulation with a set of parameters (shape)."""
self.shape = shape
self.config = config
self.format = {"entity": "Simulator"}
self.execID = execID
self.result = Result(self.shape, self.execID)
self.validators = []
self.logger = []
self.logLevel = config.logLevel
self.proposerID = 0
self.glob = []
self.execID = execID
self.distR = []
self.distC = []
self.nodeRows = []
self.nodeColumns = []
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishToR = config.evalConf(self, config.proposerPublishToR, shape)
self.proposerPublishToC = config.evalConf(self, config.proposerPublishToR, shape)
def initValidators(self):
"""It initializes all the validators in the network."""
self.glob = Observer(self.logger, self.shape)
self.validators = []
evenLineDistribution = False
if evenLineDistribution:
lightNodes = int(self.shape.numberNodes * self.shape.class1ratio)
heavyNodes = self.shape.numberNodes - lightNodes
lightVal = lightNodes * self.shape.vpn1
heavyVal = heavyNodes * self.shape.vpn2
totalValidators = lightVal + heavyVal
totalRows = totalValidators * self.shape.custodyRows
totalColumns = totalValidators * self.shape.custodyCols
rows = list(range(self.shape.nbRows)) * (int(totalRows/self.shape.nbRows)+1)
columns = list(range(self.shape.nbCols)) * (int(totalColumns/self.shape.nbCols)+1)
rows = rows[0:totalRows]
columns = columns[0:totalColumns]
offsetR = lightVal*self.shape.custodyRows
offsetC = lightVal*self.shape.custodyCols
self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format)
self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format)
self.logger.debug("Shuffling a total of %d rows to be assigned (X=%d)" % (len(rows), self.shape.custodyRows), extra=self.format)
self.logger.debug("Shuffling a total of %d columns to be assigned (X=%d)" % (len(columns), self.shape.custodyCols), extra=self.format)
self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format)
self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format)
assignedRows = []
assignedCols = []
maliciousNodesCount = int((self.shape.maliciousNodes / 100) * self.shape.numberNodes)
remainingMaliciousNodes = maliciousNodesCount
for i in range(self.shape.numberNodes):
if i == 0:
amImalicious_value = 0
if not self.config.randomizeMaliciousNodes:
# Assign based on predefined pattern when randomization is turned off
if i < maliciousNodesCount + 1:
amImalicious_value = 1
amImalicious_value = 0
# Randomly assign amImalicious_value when randomization is turned on
if remainingMaliciousNodes > 0 and random.random() < (self.shape.maliciousNodes / 100):
amImalicious_value = 1
remainingMaliciousNodes -= 1
amImalicious_value = 0
if evenLineDistribution:
if i < int(lightVal/self.shape.vpn1): # First start with the light nodes
startR = i *self.shape.custodyRows*self.shape.vpn1
endR = (i+1)*self.shape.custodyRows*self.shape.vpn1
startC = i *self.shape.custodyCols*self.shape.vpn1
endC = (i+1)*self.shape.custodyCols*self.shape.vpn1
j = i - int(lightVal/self.shape.vpn1)
startR = offsetR+( j *self.shape.custodyRows*self.shape.vpn2)
endR = offsetR+((j+1)*self.shape.custodyRows*self.shape.vpn2)
startC = offsetC+( j *self.shape.custodyCols*self.shape.vpn2)
endC = offsetC+((j+1)*self.shape.custodyCols*self.shape.vpn2)
r = rows[startR:endR]
c = columns[startC:endC]
val = Node(i, int(not i!=0), amImalicious_value, self.logger, self.shape, self.config, r, c)
self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format)
self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format)
assignedRows = assignedRows + list(r)
assignedCols = assignedCols + list(c)
if self.shape.custodyCols > self.shape.nbCols:
self.logger.error("custodyCols has to be smaller than %d" % self.shape.nbCols)
elif self.shape.custodyRows > self.shape.nbRows:
self.logger.error("custodyRows has to be smaller than %d" % self.shape.nbRows)
vs = []
nodeClass = 1 if (i <= self.shape.numberNodes * self.shape.class1ratio) else 2
vpn = self.shape.vpn1 if (nodeClass == 1) else self.shape.vpn2
for v in range(vpn):
vs.append(initValidator(self.shape.nbRows, self.shape.custodyRows, self.shape.nbCols, self.shape.custodyCols))
val = Node(i, int(not i!=0), amImalicious_value, self.logger, self.shape, self.config, vs)
if i == self.proposerID:
self.logger.debug("Rows assigned: %s" % str(assignedRows), extra=self.format)
self.logger.debug("Columns assigned: %s" % str(assignedCols), extra=self.format)
self.logger.debug("Validators initialized.", extra=self.format)
def initNetwork(self):
"""It initializes the simulated network."""
rowChannels = [[] for i in range(self.shape.nbRows)]
columnChannels = [[] for i in range(self.shape.nbCols)]
for v in self.validators:
if not (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
for id in v.columnIDs:
# Check rows/columns distribution
for r in rowChannels:
for c in columnChannels:
self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format)
self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format)
for id in range(self.shape.nbRows):
# If the number of nodes in a channel is smaller or equal to the
# requested degree, a fully connected graph is used. For n>d, a random
# d-regular graph is set up. (For n=d+1, the two are the same.)
if not rowChannels[id]:
self.logger.error("No nodes for row %d !" % id, extra=self.format)
elif (len(rowChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(rowChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(rowChannels[id]))
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for row %d !" % id, extra=self.format)
for u, v in G.edges:
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.nbCols)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.nbCols)})
for id in range(self.shape.nbCols):
if not columnChannels[id]:
self.logger.error("No nodes for column %d !" % id, extra=self.format)
elif (len(columnChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format)
G = nx.complete_graph(len(columnChannels[id]))
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not nx.is_connected(G):
self.logger.error("Graph not connected for column %d !" % id, extra=self.format)
for u, v in G.edges:
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.nbRows)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.nbRows)})
for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
count = min(self.proposerPublishToR, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.nbCols)})
for id in v.columnIDs:
count = min(self.proposerPublishToC, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.nbRows)})
if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberNodes):
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
def initLogger(self):
"""It initializes the logger."""
logging.TRACE = 5
logging.addLevelName(logging.TRACE, 'TRACE')
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
logger = logging.getLogger("DAS")
if len(logger.handlers) == 0:
ch = logging.StreamHandler()
self.logger = logger
def printDiagnostics(self):
"""Print all required diagnostics to check when a block does not become available"""
for val in self.validators:
(a, e) = val.checkStatus()
if e-a > 0 and val.ID != 0:
self.logger.warning("Node %d is missing %d samples" % (val.ID, e-a), extra=self.format)
for r in val.rowIDs:
row = val.getRow(r)
if row.count() < len(row):
self.logger.debug("Row %d: %s" % (r, str(row)), extra=self.format)
neiR = val.rowNeighbors[r]
for nr in neiR:
self.logger.debug("Row %d, Neighbor %d sent: %s" % (r, val.rowNeighbors[r][nr].node.ID, val.rowNeighbors[r][nr].received), extra=self.format)
self.logger.debug("Row %d, Neighbor %d has: %s" % (r, val.rowNeighbors[r][nr].node.ID, self.validators[val.rowNeighbors[r][nr].node.ID].getRow(r)), extra=self.format)
for c in val.columnIDs:
col = val.getColumn(c)
if col.count() < len(col):
self.logger.debug("Column %d: %s" % (c, str(col)), extra=self.format)
neiC = val.columnNeighbors[c]
for nc in neiC:
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
for i in range(0,self.shape.numberNodes):
if i == self.proposerID:
# self.validators[i].initBlock()
self.logger.warning("I am a block proposer.", extra=self.format)
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingVector = []
progressVector = []
trafficStatsVector = []
malicious_nodes_not_added_count = 0
steps = 0
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.logger.debug("PHASE LOG %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
# log TX and RX statistics
trafficStats = self.glob.getTrafficStats(self.validators)
self.logger.debug("step %d: %s" %
(steps, trafficStats), extra=self.format)
for i in range(0,self.shape.numberNodes):
missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress = self.glob.getProgress(self.validators)
self.logger.info("step %d, arrived %0.02f %%, ready %0.02f %%, validatedall %0.02f %%, , validated %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100, validatorAllProgress*100, validatorProgress*100), extra=self.format)
cnS = "samples received"
cnN = "nodes ready"
cnV = "validators ready"
cnT0 = "TX builder mean"
cnT1 = "TX class1 mean"
cnT2 = "TX class2 mean"
cnR1 = "RX class1 mean"
cnR2 = "RX class2 mean"
cnD1 = "Dup class1 mean"
cnD2 = "Dup class2 mean"
# if custody is based on the requirements of underlying individual
# validators, we can get detailed data on how many validated.
# Otherwise, we can only use the weighted average.
if self.config.validatorBasedCustody:
cnVv = validatorProgress
cnVv = validatorAllProgress
cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"],
cnR1: trafficStats[1]["Rx"]["mean"],
cnR2: trafficStats[2]["Rx"]["mean"],
cnD1: trafficStats[1]["RxDup"]["mean"],
cnD2: trafficStats[2]["RxDup"]["mean"],
if missingSamples == oldMissingSamples:
if len(missingVector) > self.config.steps4StopCondition:
if missingSamples == missingVector[-self.config.steps4StopCondition]:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
if self.config.diagnostics:
elif missingSamples == 0:
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
steps += 1
for i in range(0,self.shape.numberNodes):
if not self.validators[i].amIaddedToQueue :
malicious_nodes_not_added_count += 1
for i in range(0,self.shape.numberNodes):
column_ids = []
row_ids = []
for rID in self.validators[i].rowIDs:
for cID in self.validators[i].columnIDs:
self.logger.debug("List of columnIDs for %d node: %s", i, column_ids, extra=self.format)
self.logger.debug("List of rowIDs for %d node: %s", i, row_ids, extra=self.format)
self.logger.debug("Number of malicious nodes not added to the send queue: %d" % malicious_nodes_not_added_count, extra=self.format)
malicious_nodes_not_added_percentage = (malicious_nodes_not_added_count * 100)/(self.shape.numberNodes)
self.logger.debug("Percentage of malicious nodes not added to the send queue: %d" % malicious_nodes_not_added_percentage, extra=self.format)
progress = pd.DataFrame(progressVector)
if self.config.saveRCdist:
self.result.addMetric("rowDist", self.distR)
self.result.addMetric("columnDist", self.distC)
if self.config.saveProgress:
self.result.addMetric("progress", progress.to_dict(orient='list'))
self.result.populate(self.shape, self.config, missingVector)
return self.result