diff --git a/DAS/observer.py b/DAS/observer.py index 88fc8c4..235ed60 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -1,5 +1,6 @@ #!/bin/python3 +import numpy as np from DAS.block import * class Observer: @@ -44,9 +45,55 @@ class Observer: """It checks the status of how many expected and arrived samples globally.""" arrived = 0 expected = 0 + ready = 0 + validated = 0 for val in validators: if val.amIproposer == 0: (a, e) = val.checkStatus() arrived += a expected += e - return (arrived, expected) + if a == e: + ready += 1 + validated += val.vpn + return (arrived, expected, ready, validated) + + def getProgress(self, validators): + """Calculate current simulation progress with different metrics. + + Returns: + - missingSamples: overall number of sample instances missing in nodes. + Sample are counted on both rows and columns, so intersections of interest are counted twice. + - sampleProgress: previous expressed as progress ratio + - nodeProgress: ratio of nodes having all segments interested in + - validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator, + but counts a validator only if its support node's all validators see all interesting segments + TODO: add real per validator progress counter + """ + arrived, expected, ready, validated = self.checkStatus(validators) + missingSamples = expected - arrived + sampleProgress = arrived / expected + nodeProgress = ready / (len(validators)-1) + validatorCnt = sum([v.vpn for v in validators[1:]]) + validatorProgress = validated / validatorCnt + + return missingSamples, sampleProgress, nodeProgress, validatorProgress + + def getTrafficStats(self, validators): + """Summary statistics of traffic measurements in a timestep.""" + def maxOrNan(l): + return np.max(l) if l else np.NaN + def meanOrNan(l): + return np.mean(l) if l else np.NaN + + trafficStats = {} + for cl in range(0,3): + Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl] + Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl] + RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl] + trafficStats[cl] = { + "Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)}, + "Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)}, + "RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)}, + } + + return trafficStats diff --git a/DAS/results.py b/DAS/results.py index 0efe26d..c5687a5 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -13,6 +13,7 @@ class Result: self.blockAvailable = -1 self.tta = -1 self.missingVector = [] + self.metrics = {} def populate(self, shape, missingVector): """It populates part of the result data inside a vector.""" @@ -26,6 +27,10 @@ class Result: self.blockAvailable = 0 self.tta = -1 + def addMetric(self, name, metric): + """Generic function to add a metric to the results.""" + self.metrics[name] = metric + def dump(self, execID): """It dumps the results of the simulation in an XML file.""" if not os.path.exists("results"): diff --git a/DAS/simulator.py b/DAS/simulator.py index a6788b8..677a260 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -2,9 +2,9 @@ import networkx as nx import logging, random +import pandas as pd from functools import partial, partialmethod from datetime import datetime -from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -166,9 +166,11 @@ class Simulator: """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) + arrived, expected, ready, validated = self.glob.checkStatus(self.validators) missingSamples = expected - arrived missingVector = [] + progressVector = [] + trafficStatsVector = [] steps = 0 while(True): missingVector.append(missingSamples) @@ -189,19 +191,41 @@ class Simulator: self.validators[i].logColumns() # log TX and RX statistics - statsTxInSlot = [v.statsTxInSlot for v in self.validators] - statsRxInSlot = [v.statsRxInSlot for v in self.validators] - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, statsTxInSlot[0], statsRxInSlot[0], - mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), - mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + trafficStats = self.glob.getTrafficStats(self.validators) + self.logger.debug("step %d: %s" % + (steps, trafficStats), extra=self.format) for i in range(0,self.shape.numberNodes): self.validators[i].updateStats() + trafficStatsVector.append(trafficStats) + + missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators) + self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%" + % (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format) + + cnS = "samples received" + cnN = "nodes ready" + cnV = "validators ready" + cnT0 = "TX builder mean" + cnT1 = "TX class1 mean" + cnT2 = "TX class2 mean" + cnR1 = "RX class1 mean" + cnR2 = "RX class2 mean" + cnD1 = "Dup class1 mean" + cnD2 = "Dup class2 mean" + + progressVector.append({ + cnS:sampleProgress, + cnN:nodeProgress, + cnV:validatorProgress, + cnT0: trafficStats[0]["Tx"]["mean"], + cnT1: trafficStats[1]["Tx"]["mean"], + cnT2: trafficStats[2]["Tx"]["mean"], + cnR1: trafficStats[1]["Rx"]["mean"], + cnR2: trafficStats[2]["Rx"]["mean"], + cnD1: trafficStats[1]["RxDup"]["mean"], + cnD2: trafficStats[2]["RxDup"]["mean"], + }) - arrived, expected = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) if missingSamples == oldMissingSamples: self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) missingVector.append(missingSamples) @@ -213,6 +237,9 @@ class Simulator: else: steps += 1 + progress = pd.DataFrame(progressVector) + if self.config.saveProgress: + self.result.addMetric("progress", progress.to_dict(orient='list')) self.result.populate(self.shape, missingVector) return self.result diff --git a/DAS/validator.py b/DAS/validator.py index 49165a0..cfa7fac 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -79,6 +79,8 @@ class Validator: self.statsTxPerSlot = [] self.statsRxInSlot = 0 self.statsRxPerSlot = [] + self.statsRxDupInSlot = 0 + self.statsRxDupPerSlot = [] # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 @@ -163,7 +165,7 @@ class Validator: self.receivedQueue.append((rID, cID)) else: self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) - # self.statsRxDuplicateInSlot += 1 + self.statsRxDupInSlot += 1 self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): @@ -205,8 +207,10 @@ class Validator: """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsRxDupPerSlot.append(self.statsRxDupInSlot) self.statsTxPerSlot.append(self.statsTxInSlot) self.statsRxInSlot = 0 + self.statsRxDupInSlot = 0 self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): diff --git a/config_example.py b/config_example.py index af55fc2..5b1a396 100644 --- a/config_example.py +++ b/config_example.py @@ -19,6 +19,9 @@ import numpy as np from DAS.shape import Shape dumpXML = 1 + +# save progress vectors to XML +saveProgress = 1 visualization = 1 logLevel = logging.INFO