From 037c4cd67a516646e7636d961d179d978eec8385 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Mar 2023 12:32:39 +0100 Subject: [PATCH 1/8] count number of validators having all rows/columns Signed-off-by: Csaba Kiraly --- DAS/observer.py | 5 ++++- DAS/simulator.py | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 88fc8c4..ce435dd 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -44,9 +44,12 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + validated = 0 for val in validators: if val.amIproposer == 0: (a, e) = val.checkStatus() arrived += a expected += e - return (arrived, expected) + if a == e: + validated += 1 + return (arrived, expected, validated) diff --git a/DAS/simulator.py b/DAS/simulator.py index a6788b8..9582492 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -166,7 +166,7 @@ class Simulator: """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] steps = 0 @@ -198,10 +198,12 @@ class Simulator: for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) + self.logger.debug("step %d, missing %d of %d (%0.02f %%), validated (%0.02f %%)" + % (steps, missingSamples, expected, missingRate, + validated/(len(self.validators)-1)*100), extra=self.format) if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) From 119777787eb6c3c381ef69c6ef0f8a2d41ebd5d8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 12:04:11 +0100 Subject: [PATCH 2/8] add progress meters to observer Signed-off-by: Csaba Kiraly --- DAS/observer.py | 16 ++++++++++++++-- DAS/simulator.py | 11 ++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index ce435dd..0b80638 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -44,6 +44,7 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + ready = 0 validated = 0 for val in validators: if val.amIproposer == 0: @@ -51,5 +52,16 @@ class Observer: arrived += a expected += e if a == e: - validated += 1 - return (arrived, expected, validated) + ready += 1 + validated += val.vpn + return (arrived, expected, ready, validated) + + def getProgress(self, validators): + arrived, expected, ready, validated = self.checkStatus(validators) + missingSamples = expected - arrived + sampleProgress = arrived / expected + nodeProgress = ready / (len(validators)-1) + validatorCnt = sum([v.vpn for v in validators[1:]]) + validatorProgress = validated / validatorCnt + + return missingSamples, sampleProgress, nodeProgress, validatorProgress diff --git a/DAS/simulator.py b/DAS/simulator.py index 9582492..1744bf8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -166,7 +166,7 @@ class Simulator: """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected, validated = self.glob.checkStatus(self.validators) + arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] steps = 0 @@ -198,12 +198,9 @@ class Simulator: for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() - arrived, expected, validated = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%), validated (%0.02f %%)" - % (steps, missingSamples, expected, missingRate, - validated/(len(self.validators)-1)*100), extra=self.format) + missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) From 6616cc799f016a0ead7d1a3b703a414d45938193 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 14:26:42 +0100 Subject: [PATCH 3/8] move traffic stats calculation to observer Signed-off-by: Csaba Kiraly --- DAS/observer.py | 13 +++++++++++++ DAS/simulator.py | 10 +++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 0b80638..26b0632 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,5 +1,6 @@ #!/bin/python3 +from statistics import mean from DAS.block import * class Observer: @@ -65,3 +66,15 @@ class Observer: validatorProgress = validated / validatorCnt return missingSamples, sampleProgress, nodeProgress, validatorProgress + + def getTrafficStats(self, validators): + statsTxInSlot = [v.statsTxInSlot for v in validators] + statsRxInSlot = [v.statsRxInSlot for v in validators] + TX_prod = statsTxInSlot[0] + RX_prod = statsRxInSlot[0] + TX_avg = mean(statsTxInSlot[1:]) + TX_max = max(statsTxInSlot[1:]) + Rx_avg = mean(statsRxInSlot[1:]) + Rx_max = max(statsRxInSlot[1:]) + + return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max) diff --git a/DAS/simulator.py b/DAS/simulator.py index 1744bf8..82d9594 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -4,7 +4,6 @@ import networkx as nx import logging, random from functools import partial, partialmethod from datetime import datetime -from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -189,12 +188,9 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - statsTxInSlot = [v.statsTxInSlot for v in self.validators] - statsRxInSlot = [v.statsRxInSlot for v in self.validators] - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, statsTxInSlot[0], statsRxInSlot[0], - mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), - mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() From 7e4074938a48e5181efcabf27e5f5a6704521e90 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 20 Mar 2023 14:33:35 +0100 Subject: [PATCH 4/8] add duplicate statistics Signed-off-by: Csaba Kiraly --- DAS/observer.py | 5 ++++- DAS/simulator.py | 6 +++--- DAS/validator.py | 6 +++++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 26b0632..6af4507 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -70,11 +70,14 @@ class Observer: def getTrafficStats(self, validators): statsTxInSlot = [v.statsTxInSlot for v in validators] statsRxInSlot = [v.statsRxInSlot for v in validators] + statsRxDupInSlot = [v.statsRxDupInSlot for v in validators] TX_prod = statsTxInSlot[0] RX_prod = statsRxInSlot[0] TX_avg = mean(statsTxInSlot[1:]) TX_max = max(statsTxInSlot[1:]) Rx_avg = mean(statsRxInSlot[1:]) Rx_max = max(statsRxInSlot[1:]) + RxDup_avg = mean(statsRxDupInSlot[1:]) + RxDup_max = max(statsRxDupInSlot[1:]) - return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max) + return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max) diff --git a/DAS/simulator.py b/DAS/simulator.py index 82d9594..e860d17 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -188,9 +188,9 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max = self.glob.getTrafficStats(self.validators) - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max), extra=self.format) + TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max = self.glob.getTrafficStats(self.validators) + self.logger.info("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f, RxDup_avg=%.1f, RxDup_max=%.1f" % + (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max ,RxDup_avg, RxDup_max), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() diff --git a/DAS/validator.py b/DAS/validator.py index 49165a0..cfa7fac 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -79,6 +79,8 @@ class Validator: self.statsTxPerSlot = [] self.statsRxInSlot = 0 self.statsRxPerSlot = [] + self.statsRxDupInSlot = 0 + self.statsRxDupPerSlot = [] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -163,7 +165,7 @@ class Validator: self.receivedQueue.append((rID, cID)) else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) - # self.statsRxDuplicateInSlot += 1 + self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): @@ -205,8 +207,10 @@ class Validator: """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsRxDupPerSlot.append(self.statsRxDupInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 + self.statsRxDupInSlot = 0 self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): From 23af30e381b7be97e7d44cf75cc2cc53849313a6 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 21 Mar 2023 08:34:28 +0100 Subject: [PATCH 5/8] add generalized metrics collection Signed-off-by: Csaba Kiraly --- DAS/observer.py | 28 +++++++++++++++------------- DAS/results.py | 4 ++++ DAS/simulator.py | 9 ++++++--- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 6af4507..1bdf4d0 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,6 +1,6 @@ #!/bin/python3 -from statistics import mean +import numpy as np from DAS.block import * class Observer: @@ -68,16 +68,18 @@ class Observer: return missingSamples, sampleProgress, nodeProgress, validatorProgress def getTrafficStats(self, validators): - statsTxInSlot = [v.statsTxInSlot for v in validators] - statsRxInSlot = [v.statsRxInSlot for v in validators] - statsRxDupInSlot = [v.statsRxDupInSlot for v in validators] - TX_prod = statsTxInSlot[0] - RX_prod = statsRxInSlot[0] - TX_avg = mean(statsTxInSlot[1:]) - TX_max = max(statsTxInSlot[1:]) - Rx_avg = mean(statsRxInSlot[1:]) - Rx_max = max(statsRxInSlot[1:]) - RxDup_avg = mean(statsRxDupInSlot[1:]) - RxDup_max = max(statsRxDupInSlot[1:]) + def maxOrNan(l): + return np.max(l) if l else np.NaN - return (TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max) + trafficStats = {} + for cl in range(0,3): + Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl] + Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] + RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] + trafficStats[cl] = { + "Tx": {"mean": np.mean(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": np.mean(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": np.mean(RxDup), "max": maxOrNan(RxDup)}, + } + + return trafficStats diff --git a/DAS/results.py b/DAS/results.py index 0efe26d..61f4f04 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -13,6 +13,7 @@ class Result: self.blockAvailable = -1 self.tta = -1 self.missingVector = [] + self.metrics = {} def populate(self, shape, missingVector): """It populates part of the result data inside a vector.""" @@ -26,6 +27,9 @@ class Result: self.blockAvailable = 0 self.tta = -1 + def addMetric(self, name, metric): + self.metrics[name] = metric + def dump(self, execID): """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): diff --git a/DAS/simulator.py b/DAS/simulator.py index e860d17..4bd0242 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -168,6 +168,7 @@ class Simulator: arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + trafficStatsVector = [] steps = 0 while(True): missingVector.append(missingSamples) @@ -188,11 +189,12 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max, RxDup_avg, RxDup_max = self.glob.getTrafficStats(self.validators) - self.logger.info("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f, RxDup_avg=%.1f, RxDup_max=%.1f" % - (steps, TX_prod, RX_prod, TX_avg, TX_max, Rx_avg, Rx_max ,RxDup_avg, RxDup_max), extra=self.format) + trafficStats = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: %s" % + (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() + trafficStatsVector.append(trafficStats) missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" @@ -209,5 +211,6 @@ class Simulator: steps += 1 self.result.populate(self.shape, missingVector) + self.result.addMetric("trafficStats", trafficStatsVector) return self.result From eb4f451303e3fc14bbf6aa3f0c6de4ea5039d63f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:11:40 +0200 Subject: [PATCH 6/8] save progress and traffic statistics to XML Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 31 ++++++++++++++++++++++++++++++- config_example.py | 3 +++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 4bd0242..677a260 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,6 +2,7 @@ import networkx as nx import logging, random +import pandas as pd from functools import partial, partialmethod from datetime import datetime from DAS.tools import * @@ -168,6 +169,7 @@ class Simulator: arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + progressVector = [] trafficStatsVector = [] steps = 0 while(True): @@ -199,6 +201,31 @@ class Simulator: missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) + + cnS = "samples received" + cnN = "nodes ready" + cnV = "validators ready" + cnT0 = "TX builder mean" + cnT1 = "TX class1 mean" + cnT2 = "TX class2 mean" + cnR1 = "RX class1 mean" + cnR2 = "RX class2 mean" + cnD1 = "Dup class1 mean" + cnD2 = "Dup class2 mean" + + progressVector.append({ + cnS:sampleProgress, + cnN:nodeProgress, + cnV:validatorProgress, + cnT0: trafficStats[0]["Tx"]["mean"], + cnT1: trafficStats[1]["Tx"]["mean"], + cnT2: trafficStats[2]["Tx"]["mean"], + cnR1: trafficStats[1]["Rx"]["mean"], + cnR2: trafficStats[2]["Rx"]["mean"], + cnD1: trafficStats[1]["RxDup"]["mean"], + cnD2: trafficStats[2]["RxDup"]["mean"], + }) + if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) @@ -210,7 +237,9 @@ class Simulator: else: steps += 1 + progress = pd.DataFrame(progressVector) + if self.config.saveProgress: + self.result.addMetric("progress", progress.to_dict(orient='list')) self.result.populate(self.shape, missingVector) - self.result.addMetric("trafficStats", trafficStatsVector) return self.result diff --git a/config_example.py b/config_example.py index af55fc2..5b1a396 100644 --- a/config_example.py +++ b/config_example.py @@ -19,6 +19,9 @@ import numpy as np from DAS.shape import Shape dumpXML = 1 + +# save progress vectors to XML +saveProgress = 1 visualization = 1 logLevel = logging.INFO From cb0a3ea1bac38810d065a6d5e6346ffa6ca71671 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 27 Mar 2023 23:27:49 +0200 Subject: [PATCH 7/8] fixup: avoid warning on mean if empty Signed-off-by: Csaba Kiraly --- DAS/observer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index 1bdf4d0..18c0461 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -70,6 +70,8 @@ class Observer: def getTrafficStats(self, validators): def maxOrNan(l): return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN trafficStats = {} for cl in range(0,3): @@ -77,9 +79,9 @@ class Observer: Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] trafficStats[cl] = { - "Tx": {"mean": np.mean(Tx), "max": maxOrNan(Tx)}, - "Rx": {"mean": np.mean(Rx), "max": maxOrNan(Rx)}, - "RxDup": {"mean": np.mean(RxDup), "max": maxOrNan(RxDup)}, + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, } return trafficStats From 98db10f7a6a095036416df4f991bbdd816127e17 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 30 Mar 2023 00:01:06 +0200 Subject: [PATCH 8/8] Add more documentation Signed-off-by: Csaba Kiraly --- DAS/observer.py | 12 ++++++++++++ DAS/results.py | 1 + 2 files changed, 13 insertions(+) diff --git a/DAS/observer.py b/DAS/observer.py index 18c0461..235ed60 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -58,6 +58,17 @@ class Observer: return (arrived, expected, ready, validated) def getProgress(self, validators): + """Calculate current simulation progress with different metrics. + + Returns: + - missingSamples: overall number of sample instances missing in nodes. + Sample are counted on both rows and columns, so intersections of interest are counted twice. + - sampleProgress: previous expressed as progress ratio + - nodeProgress: ratio of nodes having all segments interested in + - validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator, + but counts a validator only if its support node's all validators see all interesting segments + TODO: add real per validator progress counter + """ arrived, expected, ready, validated = self.checkStatus(validators) missingSamples = expected - arrived sampleProgress = arrived / expected @@ -68,6 +79,7 @@ class Observer: return missingSamples, sampleProgress, nodeProgress, validatorProgress def getTrafficStats(self, validators): + """Summary statistics of traffic measurements in a timestep.""" def maxOrNan(l): return np.max(l) if l else np.NaN def meanOrNan(l): diff --git a/DAS/results.py b/DAS/results.py index 61f4f04..c5687a5 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -28,6 +28,7 @@ class Result: self.tta = -1 def addMetric(self, name, metric): + """Generic function to add a metric to the results.""" self.metrics[name] = metric def dump(self, execID):