Merge pull request #32 from status-im/measure-progress

Measure and save progress and traffic statistics.
This commit is contained in:
Leo 2023-03-30 11:18:19 +02:00 committed by GitHub
commit 57b0ee89f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 14 deletions

View File

@ -1,5 +1,6 @@
#!/bin/python3 #!/bin/python3
import numpy as np
from DAS.block import * from DAS.block import *
class Observer: class Observer:
@ -44,9 +45,55 @@ class Observer:
"""It checks the status of how many expected and arrived samples globally.""" """It checks the status of how many expected and arrived samples globally."""
arrived = 0 arrived = 0
expected = 0 expected = 0
ready = 0
validated = 0
for val in validators: for val in validators:
if val.amIproposer == 0: if val.amIproposer == 0:
(a, e) = val.checkStatus() (a, e) = val.checkStatus()
arrived += a arrived += a
expected += e expected += e
return (arrived, expected) if a == e:
ready += 1
validated += val.vpn
return (arrived, expected, ready, validated)
def getProgress(self, validators):
"""Calculate current simulation progress with different metrics.
Returns:
- missingSamples: overall number of sample instances missing in nodes.
Sample are counted on both rows and columns, so intersections of interest are counted twice.
- sampleProgress: previous expressed as progress ratio
- nodeProgress: ratio of nodes having all segments interested in
- validatorProgress: same as above, but vpn weighted average. I.e. it counts per validator,
but counts a validator only if its support node's all validators see all interesting segments
TODO: add real per validator progress counter
"""
arrived, expected, ready, validated = self.checkStatus(validators)
missingSamples = expected - arrived
sampleProgress = arrived / expected
nodeProgress = ready / (len(validators)-1)
validatorCnt = sum([v.vpn for v in validators[1:]])
validatorProgress = validated / validatorCnt
return missingSamples, sampleProgress, nodeProgress, validatorProgress
def getTrafficStats(self, validators):
"""Summary statistics of traffic measurements in a timestep."""
def maxOrNan(l):
return np.max(l) if l else np.NaN
def meanOrNan(l):
return np.mean(l) if l else np.NaN
trafficStats = {}
for cl in range(0,3):
Tx = [v.statsTxInSlot for v in validators if v.nodeClass == cl]
Rx = [v.statsRxInSlot for v in validators if v.nodeClass == cl]
RxDup = [v.statsRxDupInSlot for v in validators if v.nodeClass == cl]
trafficStats[cl] = {
"Tx": {"mean": meanOrNan(Tx), "max": maxOrNan(Tx)},
"Rx": {"mean": meanOrNan(Rx), "max": maxOrNan(Rx)},
"RxDup": {"mean": meanOrNan(RxDup), "max": maxOrNan(RxDup)},
}
return trafficStats

View File

@ -13,6 +13,7 @@ class Result:
self.blockAvailable = -1 self.blockAvailable = -1
self.tta = -1 self.tta = -1
self.missingVector = [] self.missingVector = []
self.metrics = {}
def populate(self, shape, missingVector): def populate(self, shape, missingVector):
"""It populates part of the result data inside a vector.""" """It populates part of the result data inside a vector."""
@ -26,6 +27,10 @@ class Result:
self.blockAvailable = 0 self.blockAvailable = 0
self.tta = -1 self.tta = -1
def addMetric(self, name, metric):
"""Generic function to add a metric to the results."""
self.metrics[name] = metric
def dump(self, execID): def dump(self, execID):
"""It dumps the results of the simulation in an XML file.""" """It dumps the results of the simulation in an XML file."""
if not os.path.exists("results"): if not os.path.exists("results"):

View File

@ -2,9 +2,9 @@
import networkx as nx import networkx as nx
import logging, random import logging, random
import pandas as pd
from functools import partial, partialmethod from functools import partial, partialmethod
from datetime import datetime from datetime import datetime
from statistics import mean
from DAS.tools import * from DAS.tools import *
from DAS.results import * from DAS.results import *
from DAS.observer import * from DAS.observer import *
@ -166,9 +166,11 @@ class Simulator:
"""It runs the main simulation until the block is available or it gets stucked.""" """It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators) self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock() self.validators[self.proposerID].broadcastBlock()
arrived, expected = self.glob.checkStatus(self.validators) arrived, expected, ready, validated = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived missingSamples = expected - arrived
missingVector = [] missingVector = []
progressVector = []
trafficStatsVector = []
steps = 0 steps = 0
while(True): while(True):
missingVector.append(missingSamples) missingVector.append(missingSamples)
@ -189,19 +191,41 @@ class Simulator:
self.validators[i].logColumns() self.validators[i].logColumns()
# log TX and RX statistics # log TX and RX statistics
statsTxInSlot = [v.statsTxInSlot for v in self.validators] trafficStats = self.glob.getTrafficStats(self.validators)
statsRxInSlot = [v.statsRxInSlot for v in self.validators] self.logger.debug("step %d: %s" %
self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % (steps, trafficStats), extra=self.format)
(steps, statsTxInSlot[0], statsRxInSlot[0],
mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
for i in range(0,self.shape.numberNodes): for i in range(0,self.shape.numberNodes):
self.validators[i].updateStats() self.validators[i].updateStats()
trafficStatsVector.append(trafficStats)
missingSamples, sampleProgress, nodeProgress, validatorProgress = self.glob.getProgress(self.validators)
self.logger.debug("step %d, arrived %0.02f %%, ready %0.02f %%, validated %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100, validatorProgress*100), extra=self.format)
cnS = "samples received"
cnN = "nodes ready"
cnV = "validators ready"
cnT0 = "TX builder mean"
cnT1 = "TX class1 mean"
cnT2 = "TX class2 mean"
cnR1 = "RX class1 mean"
cnR2 = "RX class2 mean"
cnD1 = "Dup class1 mean"
cnD2 = "Dup class2 mean"
progressVector.append({
cnS:sampleProgress,
cnN:nodeProgress,
cnV:validatorProgress,
cnT0: trafficStats[0]["Tx"]["mean"],
cnT1: trafficStats[1]["Tx"]["mean"],
cnT2: trafficStats[2]["Tx"]["mean"],
cnR1: trafficStats[1]["Rx"]["mean"],
cnR2: trafficStats[2]["Rx"]["mean"],
cnD1: trafficStats[1]["RxDup"]["mean"],
cnD2: trafficStats[2]["RxDup"]["mean"],
})
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingRate = missingSamples*100/expected
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples: if missingSamples == oldMissingSamples:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
missingVector.append(missingSamples) missingVector.append(missingSamples)
@ -213,6 +237,9 @@ class Simulator:
else: else:
steps += 1 steps += 1
progress = pd.DataFrame(progressVector)
if self.config.saveProgress:
self.result.addMetric("progress", progress.to_dict(orient='list'))
self.result.populate(self.shape, missingVector) self.result.populate(self.shape, missingVector)
return self.result return self.result

View File

@ -79,6 +79,8 @@ class Validator:
self.statsTxPerSlot = [] self.statsTxPerSlot = []
self.statsRxInSlot = 0 self.statsRxInSlot = 0
self.statsRxPerSlot = [] self.statsRxPerSlot = []
self.statsRxDupInSlot = 0
self.statsRxDupPerSlot = []
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
@ -163,7 +165,7 @@ class Validator:
self.receivedQueue.append((rID, cID)) self.receivedQueue.append((rID, cID))
else: else:
self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1 self.statsRxDupInSlot += 1
self.statsRxInSlot += 1 self.statsRxInSlot += 1
def addToSendQueue(self, rID, cID): def addToSendQueue(self, rID, cID):
@ -205,8 +207,10 @@ class Validator:
"""It updates the stats related to sent and received data.""" """It updates the stats related to sent and received data."""
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
self.statsRxPerSlot.append(self.statsRxInSlot) self.statsRxPerSlot.append(self.statsRxInSlot)
self.statsRxDupPerSlot.append(self.statsRxDupInSlot)
self.statsTxPerSlot.append(self.statsTxInSlot) self.statsTxPerSlot.append(self.statsTxInSlot)
self.statsRxInSlot = 0 self.statsRxInSlot = 0
self.statsRxDupInSlot = 0
self.statsTxInSlot = 0 self.statsTxInSlot = 0
def checkSegmentToNeigh(self, rID, cID, neigh): def checkSegmentToNeigh(self, rID, cID, neigh):

View File

@ -19,6 +19,9 @@ import numpy as np
from DAS.shape import Shape from DAS.shape import Shape
dumpXML = 1 dumpXML = 1
# save progress vectors to XML
saveProgress = 1
visualization = 1 visualization = 1
logLevel = logging.INFO logLevel = logging.INFO