WIP more sampling options

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2024-02-22 11:00:41 +01:00
parent 878b0a1b94
commit 5dc9ea0e59
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
5 changed files with 65 additions and 21 deletions

View File

@ -171,22 +171,51 @@ class Node:
nn.add(n.peer.node)
return nn
def initDAS(self):
""" Minimal sample init function"""
self.DASample = [(random.randrange(self.shape.blockSizeC), random.randrange(self.shape.blockSizeR))
self.DASample = [(random.randrange(self.shape.blockSizeC), random.randrange(self.shape.blockSizeR), [])
for i in range(self.DASampleSize)]
# check which neighbor is supposed to custody which segment
for neigh in self.getNeighbors():
for rid, cid, sources in self.DASample:
if rid in neigh.rowIDs or cid in neigh.columnIDs:
sources.append(neigh)
# random shuffle order, so that we can use it as a priority order
for rid, cid, sources in self.DASample:
random.shuffle(sources)
self.logger.info("Potential sources for (%d,%d): %d", rid, cid, len(sources), extra=self.format)
def checkDAS(self):
""" Dumb check in all neighbors without messaging """
block = Block.copy(self.block)
for neigh in self.getNeighbors():
block.merge(neigh.block)
found = 0
for rid, cid in self.DASample:
for rid, cid, sources in self.DASample:
found += block.getSegment(rid, cid)
self.logger.debug("DAS found: %d / %d", found, self.DASampleSize, extra=self.format)
return (found, self.DASampleSize)
def checkDAS3(self, check=3):
""" Check some neighbors without messaging """
block = Block.copy(self.block)
for neigh in self.getNeighbors():
block.merge(neigh.block)
found = [0] * (check+1)
for rid, cid, sources in self.DASample:
if self.block.getSegment(rid, cid):
found[0] += 1
continue
for i in range(min(check, len(sources))):
if sources[i].block.getSegment(rid, cid):
found[i+1] += 1
break
self.logger.debug("DAS found: %d / %d", found, self.DASampleSize, extra=self.format)
return (found, self.DASampleSize)
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:

View File

@ -91,16 +91,21 @@ class Observer:
expected = 0
ready = 0
nodes = 0
f1 = f2 = f3 = 0
for val in validators:
if val.amIproposer == 0:
(a, e) = val.checkDAS()
(f, e) = val.checkDAS3()
f1 += f[0] + f[1]
f2 += f[0] + f[1] + f[2]
f3 += f[0] + f[1] + f[2] + f[3]
arrived += a
expected += e
if a == e:
ready += 1
nodes += 1
return (arrived / expected, ready / nodes)
return (arrived / expected, f3/expected, f2/expected, f1/expected, ready / nodes)
def getTrafficStats(self, validators):
"""Summary statistics of traffic measurements in a timestep."""

View File

@ -257,7 +257,7 @@ class Simulator:
self.validators[i].send()
self.logger.debug("PHASE PROGRESS STATS %d" % steps, extra=self.format)
missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress = self.glob.getProgress(self.validators)
samplingProgress, _ = self.glob.getSamplingProgress(self.validators)
samplingProgressAll, samplingProgress3, samplingProgress2, samplingProgress1, _ = self.glob.getSamplingProgress(self.validators)
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()
@ -282,12 +282,15 @@ class Simulator:
self.logger.info("step %d, arrived %0.02f %%, ready %0.02f %%, validatedall %0.02f %%, validated %0.02f %%, sampled %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100, validatorAllProgress*100, validatorProgress*100,
samplingProgress*100), extra=self.format)
samplingProgressAll*100), extra=self.format)
cnS = "samples received"
cnN = "nodes ready"
cnV = "validators ready"
cnDAS = "DASampling ready"
cnDASall = "DASampling ready (query all)"
cnDAS3 = "DASampling ready (query 3)"
cnDAS2 = "DASampling ready (query 2)"
cnDAS1 = "DASampling ready (query 1)"
cnT0 = "TX builder mean"
cnT1 = "TX class1 mean"
cnT2 = "TX class2 mean"
@ -300,7 +303,10 @@ class Simulator:
cnS:sampleProgress,
cnN:nodeProgress,
cnV:validatorProgress,
cnDAS:samplingProgress,
cnDASall:samplingProgressAll,
cnDAS3:samplingProgress3,
cnDAS2:samplingProgress2,
cnDAS1:samplingProgress1,
cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"],

View File

@ -112,7 +112,10 @@ class Visualizor:
vector1 = result.metrics["progress"]["nodes ready"]
vector2 = result.metrics["progress"]["validators ready"]
vector3 = result.metrics["progress"]["samples received"]
vector4 = result.metrics["progress"]["DASampling ready"]
vector4 = result.metrics["progress"]["DASampling ready (query all)"]
vector5 = result.metrics["progress"]["DASampling ready (query 3)"]
vector6 = result.metrics["progress"]["DASampling ready (query 2)"]
vector7 = result.metrics["progress"]["DASampling ready (query 1)"]
conf = {}
attrbs = self.__get_attrbs__(result)
conf["textBox"] = "Block Size R: "+attrbs['bsrn']+"\nBlock Size C: "+attrbs['bscn']\
@ -121,11 +124,12 @@ class Visualizor:
conf["type"] = "plot"
conf["legLoc"] = 2
conf["desLoc"] = 2
conf["colors"] = ["g-", "b-", "r-", "m-"]
conf["labels"] = ["Nodes", "Validators", "Samples", "DASampling"]
conf["colors"] = ["g-", "b-", "r-", "m-", "m--", "m-.", "m:"]
conf["labels"] = ["Nodes", "Validators", "Custody samples",
"DASampling (query all)", "DASampling (query 3)", "DASampling (query 2)", "DASampling (query 1)"]
conf["xlabel"] = "Time (ms)"
conf["ylabel"] = "Percentage (%)"
conf["data"] = [vector1, vector2, vector3, vector4]
conf["ylabel"] = "Ratio of all (0..1)"
conf["data"] = [vector1, vector2, vector3, vector4, vector5, vector6, vector7]
conf["xdots"] = [x*self.config.stepDuration for x in range(len(vector1))]
conf["path"] = plotPath+"/nodesReady.png"
maxi = 0

View File

@ -38,39 +38,39 @@ logLevel = logging.INFO
# number of parallel workers. -1: all cores; 1: sequential
# for more details, see joblib.Parallel
numJobs = -1
numJobs = 1
# distribute rows/columns evenly between validators (True)
# or generate it using local randomness (False)
evenLineDistribution = False
# Number of simulation runs with the same parameters for statistical relevance
runs = range(3)
runs = range(1)
# Number of validators
numberNodes = range(128, 513, 128)
numberNodes = [4000]
# select failure model between: "random, sequential, MEP, MEP+1, DEP, DEP+1, MREP, MREP-1"
failureModels = ["random"]
# Percentage of block not released by producer
failureRates = range(40, 81, 20)
failureRates = [0]
# Block size in one dimension in segments. Block is blockSizes * blockSizes segments.
blockSizes = range(64, 113, 128)
blockSizes = [256]
# Per-topic mesh neighborhood size
netDegrees = range(8, 9, 2)
netDegrees = [6]
# number of rows and columns a validator is interested in
chis = range(2, 3, 2)
chis = [2]
# ratio of class1 nodes (see below for parameters per class)
class1ratios = [0.8]
# Number of validators per beacon node
validatorsPerNode1 = [1]
validatorsPerNode2 = [500]
validatorsPerNode2 = [5]
# Set uplink bandwidth in megabits/second
bwUplinksProd = [200]
@ -99,7 +99,7 @@ successCondition = 0.9
diagnostics = False
# True to save git diff and git commit
saveGit = False
saveGit = True
def nextShape():
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(