From 5dc9ea0e593050b573e4c093127457b499058173 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 22 Feb 2024 11:00:41 +0100 Subject: [PATCH] WIP more sampling options Signed-off-by: Csaba Kiraly --- DAS/node.py | 33 +++++++++++++++++++++++++++++++-- DAS/observer.py | 7 ++++++- DAS/simulator.py | 14 ++++++++++---- DAS/visualizor.py | 14 +++++++++----- smallConf.py | 18 +++++++++--------- 5 files changed, 65 insertions(+), 21 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 1ebbdb6..405f08c 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -171,22 +171,51 @@ class Node: nn.add(n.peer.node) return nn + def initDAS(self): """ Minimal sample init function""" - self.DASample = [(random.randrange(self.shape.blockSizeC), random.randrange(self.shape.blockSizeR)) + self.DASample = [(random.randrange(self.shape.blockSizeC), random.randrange(self.shape.blockSizeR), []) for i in range(self.DASampleSize)] + # check which neighbor is supposed to custody which segment + for neigh in self.getNeighbors(): + for rid, cid, sources in self.DASample: + if rid in neigh.rowIDs or cid in neigh.columnIDs: + sources.append(neigh) + + # random shuffle order, so that we can use it as a priority order + for rid, cid, sources in self.DASample: + random.shuffle(sources) + self.logger.info("Potential sources for (%d,%d): %d", rid, cid, len(sources), extra=self.format) + def checkDAS(self): """ Dumb check in all neighbors without messaging """ block = Block.copy(self.block) for neigh in self.getNeighbors(): block.merge(neigh.block) found = 0 - for rid, cid in self.DASample: + for rid, cid, sources in self.DASample: found += block.getSegment(rid, cid) self.logger.debug("DAS found: %d / %d", found, self.DASampleSize, extra=self.format) return (found, self.DASampleSize) + def checkDAS3(self, check=3): + """ Check some neighbors without messaging """ + block = Block.copy(self.block) + for neigh in self.getNeighbors(): + block.merge(neigh.block) + found = [0] * (check+1) + for rid, cid, sources in self.DASample: + if self.block.getSegment(rid, cid): + found[0] += 1 + continue + for i in range(min(check, len(sources))): + if sources[i].block.getSegment(rid, cid): + found[i+1] += 1 + break + self.logger.debug("DAS found: %d / %d", found, self.DASampleSize, extra=self.format) + return (found, self.DASampleSize) + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: diff --git a/DAS/observer.py b/DAS/observer.py index 173a0d9..da912b0 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -91,16 +91,21 @@ class Observer: expected = 0 ready = 0 nodes = 0 + f1 = f2 = f3 = 0 for val in validators: if val.amIproposer == 0: (a, e) = val.checkDAS() + (f, e) = val.checkDAS3() + f1 += f[0] + f[1] + f2 += f[0] + f[1] + f[2] + f3 += f[0] + f[1] + f[2] + f[3] arrived += a expected += e if a == e: ready += 1 nodes += 1 - return (arrived / expected, ready / nodes) + return (arrived / expected, f3/expected, f2/expected, f1/expected, ready / nodes) def getTrafficStats(self, validators): """Summary statistics of traffic measurements in a timestep.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index 4a33ce1..6123338 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -257,7 +257,7 @@ class Simulator: self.validators[i].send() self.logger.debug("PHASE PROGRESS STATS %d" % steps, extra=self.format) missingSamples, sampleProgress, nodeProgress, validatorAllProgress, validatorProgress = self.glob.getProgress(self.validators) - samplingProgress, _ = self.glob.getSamplingProgress(self.validators) + samplingProgressAll, samplingProgress3, samplingProgress2, samplingProgress1, _ = self.glob.getSamplingProgress(self.validators) self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() @@ -282,12 +282,15 @@ class Simulator: self.logger.info("step %d, arrived %0.02f %%, ready %0.02f %%, validatedall %0.02f %%, validated %0.02f %%, sampled %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorAllProgress*100, validatorProgress*100, - samplingProgress*100), extra=self.format) + samplingProgressAll*100), extra=self.format) cnS = "samples received" cnN = "nodes ready" cnV = "validators ready" - cnDAS = "DASampling ready" + cnDASall = "DASampling ready (query all)" + cnDAS3 = "DASampling ready (query 3)" + cnDAS2 = "DASampling ready (query 2)" + cnDAS1 = "DASampling ready (query 1)" cnT0 = "TX builder mean" cnT1 = "TX class1 mean" cnT2 = "TX class2 mean" @@ -300,7 +303,10 @@ class Simulator: cnS:sampleProgress, cnN:nodeProgress, cnV:validatorProgress, - cnDAS:samplingProgress, + cnDASall:samplingProgressAll, + cnDAS3:samplingProgress3, + cnDAS2:samplingProgress2, + cnDAS1:samplingProgress1, cnT0: trafficStats[0]["Tx"]["mean"], cnT1: trafficStats[1]["Tx"]["mean"], cnT2: trafficStats[2]["Tx"]["mean"], diff --git a/DAS/visualizor.py b/DAS/visualizor.py index 58576d5..bcb4898 100644 --- a/DAS/visualizor.py +++ b/DAS/visualizor.py @@ -112,7 +112,10 @@ class Visualizor: vector1 = result.metrics["progress"]["nodes ready"] vector2 = result.metrics["progress"]["validators ready"] vector3 = result.metrics["progress"]["samples received"] - vector4 = result.metrics["progress"]["DASampling ready"] + vector4 = result.metrics["progress"]["DASampling ready (query all)"] + vector5 = result.metrics["progress"]["DASampling ready (query 3)"] + vector6 = result.metrics["progress"]["DASampling ready (query 2)"] + vector7 = result.metrics["progress"]["DASampling ready (query 1)"] conf = {} attrbs = self.__get_attrbs__(result) conf["textBox"] = "Block Size R: "+attrbs['bsrn']+"\nBlock Size C: "+attrbs['bscn']\ @@ -121,11 +124,12 @@ class Visualizor: conf["type"] = "plot" conf["legLoc"] = 2 conf["desLoc"] = 2 - conf["colors"] = ["g-", "b-", "r-", "m-"] - conf["labels"] = ["Nodes", "Validators", "Samples", "DASampling"] + conf["colors"] = ["g-", "b-", "r-", "m-", "m--", "m-.", "m:"] + conf["labels"] = ["Nodes", "Validators", "Custody samples", + "DASampling (query all)", "DASampling (query 3)", "DASampling (query 2)", "DASampling (query 1)"] conf["xlabel"] = "Time (ms)" - conf["ylabel"] = "Percentage (%)" - conf["data"] = [vector1, vector2, vector3, vector4] + conf["ylabel"] = "Ratio of all (0..1)" + conf["data"] = [vector1, vector2, vector3, vector4, vector5, vector6, vector7] conf["xdots"] = [x*self.config.stepDuration for x in range(len(vector1))] conf["path"] = plotPath+"/nodesReady.png" maxi = 0 diff --git a/smallConf.py b/smallConf.py index b1434af..12284ae 100644 --- a/smallConf.py +++ b/smallConf.py @@ -38,39 +38,39 @@ logLevel = logging.INFO # number of parallel workers. -1: all cores; 1: sequential # for more details, see joblib.Parallel -numJobs = -1 +numJobs = 1 # distribute rows/columns evenly between validators (True) # or generate it using local randomness (False) evenLineDistribution = False # Number of simulation runs with the same parameters for statistical relevance -runs = range(3) +runs = range(1) # Number of validators -numberNodes = range(128, 513, 128) +numberNodes = [4000] # select failure model between: "random, sequential, MEP, MEP+1, DEP, DEP+1, MREP, MREP-1" failureModels = ["random"] # Percentage of block not released by producer -failureRates = range(40, 81, 20) +failureRates = [0] # Block size in one dimension in segments. Block is blockSizes * blockSizes segments. -blockSizes = range(64, 113, 128) +blockSizes = [256] # Per-topic mesh neighborhood size -netDegrees = range(8, 9, 2) +netDegrees = [6] # number of rows and columns a validator is interested in -chis = range(2, 3, 2) +chis = [2] # ratio of class1 nodes (see below for parameters per class) class1ratios = [0.8] # Number of validators per beacon node validatorsPerNode1 = [1] -validatorsPerNode2 = [500] +validatorsPerNode2 = [5] # Set uplink bandwidth in megabits/second bwUplinksProd = [200] @@ -99,7 +99,7 @@ successCondition = 0.9 diagnostics = False # True to save git diff and git commit -saveGit = False +saveGit = True def nextShape(): for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(